[AIG-639] A2A protocol cross-runtime agent interop (M2-11)#28
Conversation
|
Warning Review limit reached
More reviews will be available in 11 minutes and 30 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (13)
📝 WalkthroughWalkthroughThis PR implements the A2A (Agent-to-Agent) protocol v1, enabling peer-to-peer communication between aistack and other agent runtimes. It includes protocol schemas, HTTP transport, client/server implementations, CLI commands, configuration, comprehensive tests, and documentation. ChangesA2A Agent-to-Agent Protocol v1 Implementation
Sequence Diagram(s)sequenceDiagram
participant Client as A2A Client
participant Router as A2ARouter
participant Handler as handleMessage
participant Executor as Executor<br/>(runAgent)
Client->>Router: POST /v1/a2a/message<br/>(with Authorization header)
Router->>Handler: parse body & validate schema
Handler->>Handler: check bearer token<br/>(constant-time compare)
Handler->>Handler: check message replay cache<br/>(deduplicate by messageId)
Handler->>Handler: resolve skillId & check<br/>exposedAgents allowlist
Handler->>Executor: invoke with skillId & prompt
Executor->>Executor: execute agent logic
Executor-->>Handler: return result string
Handler->>Handler: cache 200 response<br/>(not 5xx errors)
Handler-->>Router: A2AResponse<br/>(status: completed/failed)
Router-->>Client: HTTP 200 JSON
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
blackms
left a comment
There was a problem hiding this comment.
/v1/a2a/message, bearer auth, schema validation, Zod round-trip), but several hardening gaps and a coordination issue with sibling PR #25/#26 need to be resolved before merge.
Blocking issues
1. Bearer-token comparison is timing-unsafe (src/a2a/server.ts:106)
if (presented !== a2a.bearerToken) {
return errorResponse(403, 'forbidden', 'Invalid bearer token');
}A plain !== short-circuits on the first differing byte, leaking token length/prefix bits to a network attacker over time. Use crypto.timingSafeEqual on equal-length Buffers (handle the length-mismatch case by comparing against a constant-time fallback). The endpoint is intended to be exposed publicly per the docs ("Public URL (e.g. behind reverse proxy)"), so this is exploitable.
2. Webhook stub will collide with AIG-636 and AIG-637 at merge time
src/transport/webhook.ts is shipped here as a 176-line stub, and the same path is shipped (with a different implementation) on origin/pm-agent/aig-637-issue-to-pr-github. AIG-636 will land the canonical version. The file header acknowledges this ("Once AIG-636 lands... this file should be deleted"), but there is no mechanism to ensure that happens, and merging #28 before #25 with the current contents will silently overwrite whatever AIG-636 ships.
Suggested resolutions (any one is fine):
- Mark #28 as
Depends on #25and rebase after AIG-636 merges, deleting the stub in the same PR. - Extract the stub into a clearly-named file (
src/transport/webhook.stub.ts) and gate the import via a TODO marker so a CI grep can fail the build if it survives the AIG-636 merge. - At minimum, add a
// FIXME(AIG-636): delete on mergecomment that CODEOWNERS will catch.
3. No request size cap on POST /v1/a2a/message
webhook.ts:readBody concatenates chunks unbounded into a Buffer[]. A malicious caller (or a confused remote agent) can POST gigabytes and OOM the process before Zod ever sees it. Add Content-Length check + a streaming size cap (e.g. 1 MiB by default, configurable). This is especially important because the endpoint accepts parts[].data as z.record(z.unknown()) — arbitrary nested JSON.
4. No replay protection and no messageId deduplication
The A2A docs section openly says "Replay protection: None at protocol layer" and defers it to upstream JWTs, but the server also does not deduplicate messageId over a short window. An attacker who captures one valid request can replay it to re-trigger agent runs (which actually spawn subprocesses via runAgent). At minimum: keep an LRU of seen messageId values for the last N minutes and return 409 on collision. Document the residual risk.
Should-fix (non-blocking)
5. Agent card leaks capabilities array verbatim from registry (agent-card.ts:48)
tags: def.capabilities exposes every internal capability string of every registered agent on an unauthenticated GET /.well-known/a2a-agent-card.json. If aistack agents are configured with anything sensitive (internal tool names, customer-specific verbs, etc.) it leaks to anyone who can reach the port. Recommend either (a) an explicit publicCapabilities allowlist on the agent definition, or (b) requiring an opt-in exposedTags field per skill. At minimum, document the privacy implication next to exposedAgents in docs/A2A.md.
6. CORS is completely absent
Neither the card nor the message endpoint sets Access-Control-Allow-Origin or handles OPTIONS. If anyone tries to call the agent from a browser-based A2A client (Mastra has one), they'll get opaque CORS failures. Either explicitly reject browser callers with a documented "server-to-server only" stance, or add a configurable CORS allowlist (default deny). Don't ship a permissive * default.
7. No rate limiting
Each accepted message spawns a Claude/OpenAI subprocess via runAgent (real cost). A token leak immediately becomes a billing incident with no in-process backpressure. Add at minimum a simple token-bucket per bearer token / per source IP, and document a recommended reverse-proxy rate-limit policy.
8. authentication.schemes advertises bearer even when no token is configured (server.ts:60-65)
When bearerToken is undefined the server logs a warning but the card still says "schemes": ["bearer"] by default — that's a lie to remote callers, who will helpfully attach a token that never gets validated. Either advertise ["none"] when auth is effectively disabled, or refuse to start the server without explicit --no-auth.
9. URL validation on agent card is too loose
AgentCardSchema.url = z.string().url() accepts javascript: and file: URIs in Zod's default mode. The card's url field is reflected back to callers; combined with #5 it's a small injection vector. Constrain to http(s):// via .refine().
10. Bearer-token presence is checked at the bytes-stripped level (server.ts:104)
authHeader.slice('Bearer '.length).trim() will accept multiple spaces ("Bearer tok"), which is fine, but combined with #1 makes timing analysis easier (length leak). Worth using a strict regex /^Bearer (\S+)$/ once #1 is fixed.
🧪 Test coverage
Good unit coverage on handleMessage (auth, validation, allowlist, executor failure) and a real bidirectional roundtrip integration test that spins up two ephemeral servers — that's the right shape and proves AC #4.
Gaps to add alongside the fixes above:
- Timing-attack assertion (token-length oracle) once #1 is fixed.
- Oversized-body 413 test once #3 is fixed.
- Replay-rejection test once #4 is fixed.
- CORS preflight test once #6 is decided.
- Card omission of sensitive tags once #5 is decided.
📝 Nits
client.ts:122-128— backoff cap of 4 s is fine butMath.min(1000 * 2 ** (attempt-1), 4000)produces1000, 2000, 4000— works as intended, just worth a one-line comment.client.tsretries on all errors with status>= 500including501 Not Implemented, which is permanent. Consider not retrying 501/505.server.ts:158— errorstatus: 'failed'in the response envelope is paired with HTTP 500; some A2A consumers expect HTTP 200 for in-band agent failures (status=failed) since the protocol call succeeded. Worth a thought; current behavior is defensible but not what e.g. CrewAI does.types.ts:14— link tohttps://a2a-protocol.orgis fine, but pinning to the exact spec version document URL would make future spec drift easier to track.docs/A2A.md— the "Security model" table honestly listing "Replay protection: None" is great; once #4 lands, update it.
Verdict
The protocol surface and overall code quality are good and the AC checklist is genuinely covered. Blocking on #1 (timing-safe compare), #2 (stub-collision strategy), #3 (body size cap), and #4 (replay/dedup) — all small, all worth catching before this hits a public reverse proxy. Items #5–#10 can land as follow-ups but please at least file them.
…work
Resolves the merge conflict between AIG-639's local stub
src/transport/webhook.ts and the real WebhookServer landed by AIG-636
on main. The two had incompatible public APIs (registerRoute returning
WebhookResponse objects vs. a fixed /v1/tasks handler that enqueues to
the daemon).
Resolution
- src/transport/webhook.ts: taken from main verbatim (--theirs); now
owns the daemon task-ingestion endpoint exclusively.
- src/transport/a2a-router.ts (new): the multi-route HTTP server
previously embedded in the stub, renamed to A2ARouter. Same value-
based handler shape (handlers return WebhookResponse) the A2A code
was already written against — only the class name changes.
- src/a2a/server.ts: registerA2ARoutes() now takes an A2ARouter; the
route registration call sites (.registerRoute('GET'/'POST', ...))
are unchanged.
- src/cli/commands/a2a.ts: `aistack a2a serve` instantiates A2ARouter
rather than the daemon WebhookServer.
- tests/unit/a2a/server.test.ts, tests/integration/a2a-roundtrip.test.ts:
swap WebhookServer -> A2ARouter at construction sites; both already
test against the registerRoute / WebhookRequest / WebhookResponse
surface so no behavioural assertions change.
- src/cli/commands/index.ts, src/cli/index.ts: trivial merge — kept
createA2aCommand alongside main's createDaemonCommand /
createWatchCommand / createRunCommand.
- src/types.ts, src/utils/config.ts: trivial merge — kept both
A2AConfig / A2AConfigSchema and main's DaemonConfig /
DaemonConfigSchema as sibling fields.
- docs/A2A.md: doc-only — describe A2ARouter and link to AIG-636's
task-ingestion WebhookServer + AIG-637's IntegrationRouter as
sibling listeners.
The A2A endpoints now run on their own port (default 8787) alongside
the daemon's task-ingestion webhook (also default 8787 in AIG-636's
configuration — operators should bind them to different ports if both
are enabled). No behavior change to bearer auth, schema validation,
skill resolution, or executor delegation.
Addresses three security findings from second-opinion review of PR #28: 1. Timing-safe bearer compare (src/a2a/server.ts) Replace the plain '!==' bearer comparison with a new helper timingSafeEqualString() that pads both operands to equal length and defers to crypto.timingSafeEqual. Mirrors the existing helper inside verifyHmacSignature (src/transport/webhook.ts) so token equality checks no longer leak length or prefix information via early-exit timing. 2. 1 MiB body cap (src/transport/a2a-router.ts) readBody now tracks accumulated byte size and aborts (req.destroy + reject with PayloadTooLargeError) once it exceeds maxBytes. The router catches that error and responds 413 payload_too_large with the limit echoed back. Default is DEFAULT_A2A_MAX_BYTES = 1 MiB, matching the AIG-636 WebhookServer cap; configurable via the new A2ARouterOptions.maxBytes option. 3. Replay/messageId dedup (src/a2a/server.ts) handleMessage now keeps a process-local Map<messageId, {response, expiresAt}> with LRU eviction (max 10,000 entries) and a 5-minute TTL. A second in-flight Map collapses concurrent retries onto a single executor invocation. Cached responses are returned verbatim on retry. Only successful 200s are cached — 5xx failures fall through so transient errors can recover. Test-only __resetReplayCacheForTests() lets unit tests isolate cases. Tests added (vitest): - timingSafeEqualString: equal/unequal/length-mismatch (no throw)/empty - replay dedup: sequential retries -> executor called once - replay dedup: 5 concurrent retries -> executor called once - replay dedup: 500 failures are NOT cached - a2a-router body cap: oversize -> 413 or socket reset (no handler call) - a2a-router body cap: under-limit -> 200 passthrough - a2a-router body cap: handler never invoked on overflow A beforeEach reset was added to the existing server unit suite so the 'm1' sentinel messageId reused across many cases does not cross- pollinate via the new cache.
There was a problem hiding this comment.
Actionable comments posted: 10
🧹 Nitpick comments (1)
tests/unit/transport/a2a-router.test.ts (1)
94-107: ⚡ Quick winAdd an exact-boundary case for
maxBytes.Line 94 says “at or below,” but this only validates a payload below the cap (512). Add an assertion for exactly 1024 bytes to catch off-by-one regressions.
Proposed test extension
it('accepts bodies at or below maxBytes', async () => { router = new A2ARouter({ port: 0, host: '127.0.0.1', maxBytes: 1024 }); @@ expect(parsed.size).toBe(512); + + const atLimit = Buffer.alloc(1024, 0x62); + const resAtLimit = await post(port, '/echo', atLimit); + expect(resAtLimit.status).toBe(200); + const parsedAtLimit = JSON.parse(resAtLimit.body) as { size: number }; + expect(parsedAtLimit.size).toBe(1024); });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/transport/a2a-router.test.ts` around lines 94 - 107, Extend the unit test for A2ARouter to include an exact-boundary payload: after the existing 512-byte assertion, send a Buffer.alloc(1024, 0x62) to the same POST /echo route (using the same router instance and post helper) and assert res.status === 200 and the parsed body size === 1024 so the exact maxBytes boundary on A2ARouter is validated; reference A2ARouter, router.start(), router.registerRoute('POST','/echo', ...), and the post(...) helper to locate where to add this new assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/A2A.md`:
- Around line 206-213: Update the "Replay protection" row in the security table
to reflect the actual server behavior: replace "None at protocol layer" with a
concise description that mentions process-local messageId replay deduplication
with TTL/LRU-scoped dedupe (i.e., in-memory, not cross-process or persistent,
limited lifetime and capacity), and add guidance about limitations and
mitigation (e.g., use short-lived JWTs or upstream nonce tracking for
multi-process/clustered guarantees). Reference the existing term messageId and
the deduplication semantics (TTL/LRU scoped) when editing the row.
- Around line 131-141: The example calling registerA2ARoutes is missing the
required executor argument used by the route wiring; update the snippet to pass
a message executor (e.g., an object/function named executor) into
registerA2ARoutes so the routes can dispatch messages correctly—locate the
registerA2ARoutes call and add the executor property inside the options object
(matching how your route wiring expects an executor parameter).
In `@src/a2a/types.ts`:
- Line 56: Change the AgentCardSchema's protocolVersion from a free string to a
literal constrained to the A2A v1 constant: replace protocolVersion:
z.string().default(A2A_PROTOCOL_VERSION) with protocolVersion:
z.literal(A2A_PROTOCOL_VERSION) (optionally .default(A2A_PROTOCOL_VERSION) if a
default is desired) so AgentCardSchema (used by fetchAgentCard) will reject
cards that do not match the A2A v1 value.
In `@src/cli/commands/a2a.ts`:
- Around line 39-40: The code currently sets host/publicUrl from
opts.host/opts.url defaults and later ignores the actual bound address returned
by server.start(), so update the logic around host and publicUrl (variables
host, publicUrl, and uses of opts.host/opts.url) to: prefer explicit opts.host
or opts.url if provided; otherwise, after calling server.start() read the
returned bound host/port and use those to construct publicUrl (avoid advertising
port 0 or the unbound 127.0.0.1 when the server bound to a different address);
apply the same fix to the other block covering lines 55–71 so all advertised
endpoints use the bound {host, port} from server.start() unless overridden by
opts.url or opts.host.
- Around line 95-106: The timeout value from CLI (opts.timeout) is converted
with Number(opts.timeout) without validation before calling a2aCall; parse it as
an integer in the .action handler (where opts is created), check for NaN or <= 0
and replace with a safe default (e.g., 30000), optionally clamp to a sensible
max; then pass the validated numeric timeoutMs to a2aCall (reference symbols:
opts.timeout, a2aCall, the .action handler and variable token/message).
- Line 24: The CLI uses '-h' for host which conflicts with Commander help;
change the host option in the serve command to a non-conflicting flag (e.g.,
'--host' with a different short like '-H' or no short) in
src/cli/commands/a2a.ts (look for the serve command definition that currently
uses .option('-h, --host <host>', ...)). Also update serve to use a2aConfig.host
and a2aConfig.publicUrl when present and, after starting the server, use the
actual bound host/port returned by A2ARouter.start() (the returned {port, host})
to construct advertised agent-card/public endpoint URLs and logs instead of
pre-bind values. Finally, in the call handling code where opts.timeout is
converted (see call and timeoutMs: Number(opts.timeout)), validate and coerce
timeout to a positive integer (reject or error on NaN/<=0, or clamp to a minimum
positive ms) so invalid values don't produce NaN/instant timeouts.
In `@src/transport/a2a-router.ts`:
- Around line 187-190: The 500 response currently exposes internal exception
text by using error instanceof Error ? error.message : String(error) in the
sendJson(...) call; change it to return a generic client-safe message (e.g.,
message: 'internal server error') while logging the actual error details
separately (use your existing logger or console.error) before calling sendJson;
update the sendJson invocation in the a2a router error handler (the
function/method that calls sendJson with status 500) to remove error.message
from the payload and ensure the real error is recorded only in logs.
- Around line 139-143: The current parsing loop in a2a-router.ts uses
decodeURIComponent on each key/value (rawQuery, query) which can throw on
malformed % sequences and bubble up as a 500; wrap the per-pair
decodeURIComponent calls in a try/catch and treat decoding failures as a client
error by returning/throwing a 400 Bad Request (with a brief message about
malformed query encoding) instead of letting it propagate to the global 500
handler; modify the loop that iterates rawQuery.split('&') (the code that
assigns query[decodeURIComponent(k)] = decodeURIComponent(v)) to
validate/try-catch both decoded key and value and map that failure to a 400
response.
In `@src/utils/config.ts`:
- Around line 383-390: The A2AConfigSchema currently allows an empty string for
bearerToken so empty values disable auth; update the A2AConfigSchema definition
(symbol: A2AConfigSchema) to validate a non-empty token by changing the
bearerToken schema from z.string().optional() to a trimmed non-empty string such
as z.string().trim().min(1).optional() (or z.string().nonempty().optional()) so
blank "" will fail validation rather than be treated as falsy at runtime.
In `@tests/integration/a2a-roundtrip.test.ts`:
- Around line 38-53: The freePort() helper can hang if s.listen fails because
its listen callback never runs; update the freePort() function to attach an
'error' listener on the server (the s object) that rejects the Promise with the
listen error and ensures the server is closed/cleaned up; keep the existing
success path (s.listen -> s.address -> s.close -> resolve(port)) and in the
error handler call reject(err) (optionally closing the server) so the Promise
never remains pending when listen fails.
---
Nitpick comments:
In `@tests/unit/transport/a2a-router.test.ts`:
- Around line 94-107: Extend the unit test for A2ARouter to include an
exact-boundary payload: after the existing 512-byte assertion, send a
Buffer.alloc(1024, 0x62) to the same POST /echo route (using the same router
instance and post helper) and assert res.status === 200 and the parsed body size
=== 1024 so the exact maxBytes boundary on A2ARouter is validated; reference
A2ARouter, router.start(), router.registerRoute('POST','/echo', ...), and the
post(...) helper to locate where to add this new assertion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5fa7bae6-b0e5-4838-94db-c280b117ff82
📒 Files selected for processing (16)
docs/A2A.mdsrc/a2a/agent-card.tssrc/a2a/client.tssrc/a2a/index.tssrc/a2a/server.tssrc/a2a/types.tssrc/cli/commands/a2a.tssrc/cli/commands/index.tssrc/cli/index.tssrc/transport/a2a-router.tssrc/types.tssrc/utils/config.tstests/integration/a2a-roundtrip.test.tstests/unit/a2a/client.test.tstests/unit/a2a/server.test.tstests/unit/transport/a2a-router.test.ts
blackms
left a comment
There was a problem hiding this comment.
Reviewed current head after rebasing onto main. Thread-aware review check shows 0 unresolved active threads; CI is green (build, lint, typecheck, tests) and CodeRabbit is green.
Implements AIG-639.
Milestone: M2 - Differentiation
Estimate: 5 pts
Summary
See commit message for full implementation details (schema, API, CLI, tests, docs).
Notes from PM agent
DEPENDS ON AIG-636 webhook framework (same stub pattern as AIG-637).
Auto-opened by aistack PM agent on 2026-05-28 10:22. Review with /review or human dispatch.
Summary by CodeRabbit
Release Notes
New Features
aistack a2aCLI commands: serve A2A endpoints, call remote agents, and fetch agent cardsDocumentation