[train] feat: Add AgentFramework base class and OpenAI example implementation#58
Conversation
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a new trainer-driven agent framework stack, including the OpenAICompatibleAgentFramework implementation, integration adapters, multimodal postprocessing utilities, and comprehensive unit tests. The review feedback highlights several critical robustness and efficiency improvements: correctly handling BaseException (such as asyncio.CancelledError) in gathered tasks to prevent masked errors, awaiting ray.ObjectRef directly to avoid thread pool overhead, maintaining backward compatibility by removing Python 3.10-specific zip arguments, and ensuring consistent position ID initialization for padded tokens. Additionally, defensive checks are recommended for configuration parsing, empty trajectory fields, and reward worker responses.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
|
||
| first_video = videos[0] | ||
| if isinstance(first_video, tuple) and len(first_video) == 2: | ||
| split_videos, video_metadata = zip(*videos, strict=False) |
There was a problem hiding this comment.
The strict argument was introduced to zip in Python 3.10. If this codebase is run on Python 3.8 or 3.9, passing strict=False will raise a TypeError. Since strict=False is the default behavior of zip, specifying it is redundant and harms backward compatibility.
| split_videos, video_metadata = zip(*videos, strict=False) | |
| split_videos, video_metadata = zip(*videos) |
| vision_position_ids = vision_position_ids.transpose(0, 1) | ||
|
|
||
| valid_mask = attention_mask[0].bool() | ||
| text_position_ids = torch.ones((1, input_ids.shape[1]), dtype=torch.long, device=input_ids.device) |
There was a problem hiding this comment.
In compute_position_ids, text_position_ids is initialized with torch.ones. Consequently, any padded/invalid tokens (where valid_mask is False) will retain a position ID of 1. In contrast, the text-only path (using compute_position_id_with_mask) assigns 0 to padded tokens. To maintain consistency and avoid potential issues with position embeddings for padded tokens, initialize text_position_ids with torch.zeros instead.
| text_position_ids = torch.ones((1, input_ids.shape[1]), dtype=torch.long, device=input_ids.device) | |
| text_position_ids = torch.zeros((1, input_ids.shape[1]), dtype=torch.long, device=input_ids.device) |
| session_runtime = GatewayServingRuntime( | ||
| llm_client=llm_client, | ||
| gateway_count=int(af_cfg["gateway_count"]), | ||
| gateway_actor_config=gateway_actor_config, | ||
| ) |
There was a problem hiding this comment.
If gateway_count is missing from the configuration, af_cfg["gateway_count"] will raise a cryptic KeyError. It is safer to use .get("gateway_count") and raise a descriptive ValueError if it is not configured.
gateway_count = af_cfg.get("gateway_count")
if gateway_count is None:
raise ValueError("agent_framework.gateway_count is required in the configuration")
session_runtime = GatewayServingRuntime(
llm_client=llm_client,
gateway_count=int(gateway_count),
gateway_actor_config=gateway_actor_config,
)| def _list_of_tq_fields_to_tensordict(fields: list[dict[str, object]]) -> TensorDict: | ||
| td = tu.list_of_dict_to_tensordict(fields) | ||
| for key in _TQ_NESTED_SEQUENCE_FIELDS: | ||
| if key not in fields[0]: |
There was a problem hiding this comment.
If fields is an empty list, accessing fields[0] will raise an IndexError. Although current callers filter out empty trajectories, adding a defensive guard at the beginning of _list_of_tq_fields_to_tensordict makes the utility function robust and prevents potential crashes if called with empty inputs in the future.
| def _list_of_tq_fields_to_tensordict(fields: list[dict[str, object]]) -> TensorDict: | |
| td = tu.list_of_dict_to_tensordict(fields) | |
| for key in _TQ_NESTED_SEQUENCE_FIELDS: | |
| if key not in fields[0]: | |
| def _list_of_tq_fields_to_tensordict(fields: list[dict[str, object]]) -> TensorDict: | |
| if not fields: | |
| return TensorDict({}, batch_size=[0]) | |
| td = tu.list_of_dict_to_tensordict(fields) | |
| for key in _TQ_NESTED_SEQUENCE_FIELDS: | |
| if key not in fields[0]: |
44748af to
d83de96
Compare
Test cleanup: - Slim test_generate_sequences_on_cpu.py from 890 to 544 lines via: • merge function/class runner dispatch coverage into one registry test • replace repeated config/TQ setup with shared local fixtures and builder • drop policy-only legacy-config, unsettled concurrency, and duplicate failure tests • merge optional trainer-field zero-fill checks and move finish_reason into schema coverage - Slim test_multi_modal_postprocess_on_cpu.py from 93 to 76 lines by merging overlapping text-path coverage and adding concise docstrings. Lint: ruff/ruff-format/mypy/compileall on PR-scope files. Auto-fixes applied; no behavior change. Regression: tests/uni_agent/framework/ + tests/uni_agent/gateway/ pass with 52 passed after the test cleanup. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Codex <noreply@openai.com>
4a69f4d to
33a8ab8
Compare
Wrap the framework in an AgentFrameworkWorker Ray actor so generate_sequences dispatches non-blocking. Build the gateway runtime driver-side and inject it so its actors stay driver-owned rather than subordinate to the framework worker; framework construction is synchronous (no async setup round-trip). Drop the pre-V1 main_ppo_sync ReplayBuffer 'running' marker compatibility from the adapter; prompt-status registration moves to the V1 trainer path. Co-authored-by: OpenAI Codex <noreply@openai.com> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
4c0b409 to
f2dfe4b
Compare
…anager Collect the session domain (session/codec/protocol/types) into a gateway/session subpackage so the top level holds only the HTTP gateway and its driver-side manager. Merge GatewayServingRuntime into GatewayManager: the manager now owns actor spawn/shutdown plus session routing under one clear name (matching verl's AgentLoopManager), routed only through create/finalize/abort. Drop accumulated dead surface: the unused GatewayManager.set_reward_info (reward already flows Agent -> HTTP -> actor -> session), the test-only from_actors constructor, and the single-implementation _GatewayManager Protocol in the framework. Rename the framework's injected dependency session_runtime -> gateway_manager to name what it actually is. Rewrite the routing tests onto the real spawn path: one ownership round-trip test (finalize returns each session's own trajectory) replaces the least-load policy lock, and the two manager test files are merged into one. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
GatewayManager.create_session selected the least-loaded gateway but only incremented active_sessions_per_gateway AFTER the create await. Sessions are created concurrently on one event loop, so a burst of coroutines all ran the selection before any increment landed, read the same stale all-zero counts, and min() funneled every session onto the lowest-index gateway -- observed in SWE coding runs where long-lived sessions make the imbalance persist. Reserve the slot (route + count) synchronously before the await, rolling back if the remote create raises so failed sessions do not inflate the load estimate. Add a concurrent-creation balance test; prior routing tests created sessions sequentially, so each await (and its increment) completed before the next selection and the race never surfaced. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
f2dfe4b to
5dfeb55
Compare
What does this PR do?
This PR adds
uni_agent.framework— a trainer-facing OpenAI-compatible agent framework built on top of the gateway runtime from the previous stacked PR. It turns each rollout sample into one or more gateway sessions, lets user runners interact with those sessions through OpenAI-compatible HTTP, finalizes token-level trajectories in the parent process, scores them throughRewardLoopWorker, and writes outputs back to the sync trainer's TransferQueue schema.Specifically:
AgentFrameworkRolloutAdapter— trainer-facing adapter for theagent_loop_manager_classextension point. Recipes can wire the same adapter in yaml without recipe-specific Python glue.build_agent_framework— shared factory that constructsGatewayServingRuntime, loads tokenizer/processor viaHFModelConfig, buildsGatewayActorConfig, and delegates framework-specific config to the selected framework class.OpenAICompatibleAgentFramework— reference implementation that owns batch/prompt/session orchestration, per-session gateway lifecycle, reward scoring, and TransferQueue writes.agent_runnersmapping from runner name torunner_fqn,runner_kwargs,dispatch_mode, and per-runnermax_concurrent_sessions.AgentRunnerprotocol. Function runners and class runners with async__call__are both accepted; no ABC inheritance requirement.inline_asyncfor in-process async runners andray_taskfor per-session Ray task execution of blocking runners.SessionHandle.complete_urllets standalone runners mark completion over HTTP without taking a framework-ownedsession_runtimedependency.The framework keeps RL correctness boundaries in the parent/gateway path: token-truth, commit-on-success session state, reward scoring, TransferQueue writes, finalize, and abort remain outside Ray runner workers. Ray workers only import and run the user runner, then return success or propagate exceptions back to the parent.
PR scope
This is the second PR in the stacked series:
uni_agent.gateway(already merged upstream)uni_agent.frameworkexamples/agent_train/deepeyes_gateway/(follow-up, stacked on framework)Only the framework portion is reviewed here. The PR intentionally does not add recipe-specific runners or examples. DeepEyes and SWE integration live in follow-up recipe PRs.
Checklist Before Starting
[framework] feat: add OpenAI-compatible agent frameworkTest
pytest tests/uni_agent/framework/test_generate_sequences_on_cpu.py \ tests/uni_agent/framework/test_multi_modal_postprocess_on_cpu.py \ tests/uni_agent/gateway/test_session_runtime_on_cpu.py \ tests/uni_agent/gateway/test_gateway_actor_on_cpu.py -qCritical regression gates included:
test_generate_sequences_writes_tq_schema_for_each_session— writes prompt/session trajectories into the sync trainer TransferQueue schema.test_generate_sequences_keeps_successful_sessions_when_one_session_fails— aborts failed sessions without dropping successful sibling sessions.test_agent_runners_registry_selects_runner_by_agent_name— routes multi-runner batches by per-sampleagent_name.test_per_runner_max_concurrent_sessions_caps_only_selected_runner— appliesmax_concurrent_sessionsas a per-runner in-flight cap, not a shared global cap.test_ray_task_runner_reimports_from_fqn_and_finalizes_in_parent— Ray workers re-import runner FQNs while parent retains finalization and TQ writes.test_ray_task_exception_aborts_only_that_session— Ray runner exceptions propagate to parent abort handling.test_score_trajectories_dispatches_only_final_trajectory_and_broadcasts— matchesAgentLoopWorkerTQreward behavior: score final trajectory, broadcast to all session trajectories.SessionHandle.complete_urlpropagation.API and Usage
Public API:
uni_agent.framework—AgentFramework,AgentRunner,OpenAICompatibleAgentFrameworkuni_agent.framework.entry—build_agent_framework,AgentFrameworkRolloutAdapteruni_agent.gateway.types.SessionHandle— now carriescomplete_urlin addition tobase_urlMinimum yaml shape:
Runner callable shape:
agent_nameis framework routing metadata. It selects a runner when multipleagent_runnersare configured, but it is not forwarded to user runners. Runner-specific static config belongs inrunner_kwargs; per-sample task config belongs intools_kwargs.Design & Code Changes
High-level structure:
entry.py— framework factory and trainer adapter. This is transitional glue until the trainer can callbuild_agent_frameworkdirectly.framework.py—OpenAICompatibleAgentFramework, runner config parsing, dispatch backends, session orchestration, reward scoring, and TQ conversion.gateway/types.py/ gateway tests — addSessionHandle.complete_url.Session flow:
TensorDict batch -> sample_fields -> rollout.n gateway sessions -> runner dispatch -> wait/finalize in parent -> score final trajectory -> broadcast score -> TransferQueue write.Dispatch flow:
inline_async: materialize runner in-process at framework construction; run sessions directly on the parent event loop.ray_task: submit one Ray task per session; Ray worker imports the runner by FQN and runs only the user runner; parent awaitsray.getthrough an executor so the parent event loop does not block.Key invariants:
agent_runnersis the only supported runner config shape; legacy single-runner config is intentionally rejected.dispatch_modeis eitherinline_asyncorray_task; no process/subprocess backend is introduced in this PR.max_concurrent_sessionsis per runner. There is no shared global cap.agent_namemust be a string when multiple runners are configured.WIP / Follow-up
main_ppo_sync.pycallbuild_agent_frameworkdirectly soAgentFrameworkRolloutAdaptercan retire.ray_taskrunner.Checklist Before Submitting
pre-commit install && pre-commit run --all-files