headers = new LinkedHashMap<>();
+ callContext.getState().forEach((key, value) -> {
+ if (key != null && value != null
+ && (value instanceof String || value instanceof Number || value instanceof Boolean)) {
+ headers.put(key, String.valueOf(value));
+ } else if (key != null && value != null) {
+ log.debug("Skipping non-primitive ServerCallContext state entry: {}", key);
+ }
+ });
+ return headers;
+ }
+
+ private static String sanitizeErrorMessage(String raw) {
+ if (raw == null) {
+ return "(no error detail)";
+ }
+ String truncated = raw.length() > MAX_ERROR_MESSAGE_LENGTH
+ ? raw.substring(0, MAX_ERROR_MESSAGE_LENGTH) + "..."
+ : raw;
+ return truncated.replaceAll("[\\p{Cc}]", "");
+ }
+}
diff --git a/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aAuthProvider.java b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aAuthProvider.java
new file mode 100644
index 0000000..1b8d01c
--- /dev/null
+++ b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aAuthProvider.java
@@ -0,0 +1,40 @@
+package org.adcontextprotocol.adcp.server.a2a;
+
+import jakarta.servlet.http.HttpServletRequest;
+import org.a2aproject.sdk.server.ServerCallContext;
+import org.a2aproject.sdk.spec.A2AError;
+
+/**
+ * Authenticates incoming A2A JSON-RPC requests and produces a
+ * {@link ServerCallContext} for the request handler.
+ *
+ * Implement this interface and pass it to
+ * {@link A2aServlet#A2aServlet(org.a2aproject.sdk.server.requesthandlers.RequestHandler, A2aAuthProvider)}
+ * to enforce authentication on all incoming A2A requests.
+ *
+ *
Example — static bearer token:
+ *
{@code
+ * A2aAuthProvider auth = request -> {
+ * String token = request.getHeader("Authorization");
+ * if (!"Bearer my-secret".equals(token)) {
+ * throw new InvalidRequestError("Unauthorized");
+ * }
+ * return new ServerCallContext(
+ * new AuthenticatedUser(extractPrincipal(token)),
+ * Map.of(), Set.of(), AgentInterface.CURRENT_PROTOCOL_VERSION);
+ * };
+ * new A2aServlet(handler, auth);
+ * }
+ */
+@FunctionalInterface
+public interface A2aAuthProvider {
+
+ /**
+ * Validates the incoming HTTP request and returns an authenticated call context.
+ *
+ * @param request the incoming HTTP request
+ * @return a fully populated {@link ServerCallContext} for this request
+ * @throws A2AError to reject the request with a JSON-RPC error response
+ */
+ ServerCallContext authenticate(HttpServletRequest request) throws A2AError;
+}
diff --git a/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aServerBuilder.java b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aServerBuilder.java
new file mode 100644
index 0000000..fe36e7e
--- /dev/null
+++ b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aServerBuilder.java
@@ -0,0 +1,181 @@
+package org.adcontextprotocol.adcp.server.a2a;
+
+import org.a2aproject.sdk.server.events.InMemoryQueueManager;
+import org.a2aproject.sdk.server.events.MainEventBus;
+import org.a2aproject.sdk.server.events.MainEventBusProcessor;
+import org.a2aproject.sdk.server.requesthandlers.DefaultRequestHandler;
+import org.a2aproject.sdk.server.requesthandlers.RequestHandler;
+import org.a2aproject.sdk.server.tasks.InMemoryPushNotificationConfigStore;
+import org.a2aproject.sdk.server.tasks.InMemoryTaskStore;
+import org.a2aproject.sdk.spec.AgentCapabilities;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.AgentInterface;
+import org.a2aproject.sdk.spec.AgentSkill;
+import org.adcontextprotocol.adcp.error.ConfigurationError;
+import org.adcontextprotocol.adcp.server.AdcpPlatform;
+import org.jspecify.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+/**
+ * Builds A2A server-side request handling backed by an {@link AdcpPlatform}.
+ *
+ * Authentication: This builder produces a {@link DefaultRequestHandler} that
+ * is then wrapped in an {@link A2aServlet}. Authentication is configured on the servlet,
+ * not here. Use {@link A2aServlet#A2aServlet(RequestHandler, A2aAuthProvider)} to
+ * wire a real {@link A2aAuthProvider} before deploying to production.
+ *
+ *
In-memory stores: {@link #build()} creates in-memory task and queue stores
+ * that are unbounded and non-persistent. They are suitable for local
+ * development and testing only. Production deployments should configure external,
+ * bounded task storage to prevent memory exhaustion under sustained load.
+ */
+public final class A2aServerBuilder {
+
+ /**
+ * Default executor: spawns one virtual thread per task. No lifecycle management needed —
+ * each virtual thread is created on demand and terminates when its task completes.
+ * Using a plain lambda avoids the {@link java.util.concurrent.ExecutorService} resource
+ * that {@link java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor()} returns
+ * and would need to be shut down.
+ */
+ private static final Executor VIRTUAL_THREAD_EXECUTOR =
+ task -> Thread.ofVirtual().start(task);
+
+ private final AdcpPlatform platform;
+ private @Nullable String agentName;
+ private @Nullable String agentUrl;
+ private @Nullable String agentVersion;
+ private @Nullable AgentCard builtCard;
+ private @Nullable Executor agentExecutor;
+ private @Nullable Executor eventConsumerExecutor;
+
+ private A2aServerBuilder(AdcpPlatform platform) {
+ this.platform = Objects.requireNonNull(platform, "platform");
+ }
+
+ public static A2aServerBuilder create(AdcpPlatform platform) {
+ return new A2aServerBuilder(platform);
+ }
+
+ public A2aServerBuilder agentName(String agentName) {
+ this.agentName = Objects.requireNonNull(agentName, "agentName");
+ return this;
+ }
+
+ public A2aServerBuilder agentUrl(String agentUrl) {
+ this.agentUrl = Objects.requireNonNull(agentUrl, "agentUrl");
+ return this;
+ }
+
+ public A2aServerBuilder agentVersion(String agentVersion) {
+ this.agentVersion = Objects.requireNonNull(agentVersion, "agentVersion");
+ return this;
+ }
+
+ /**
+ * Sets the executor used for agent execution (the {@link A2aAgentExecutor} call).
+ * Defaults to a virtual-thread-per-task executor.
+ *
+ *
Inject a custom executor in tests to control execution order or assert
+ * that work is dispatched off the caller thread.
+ */
+ public A2aServerBuilder agentExecutor(Executor agentExecutor) {
+ this.agentExecutor = Objects.requireNonNull(agentExecutor, "agentExecutor");
+ return this;
+ }
+
+ /**
+ * Sets the executor used for SSE event consumption.
+ * Defaults to a virtual-thread-per-task executor.
+ */
+ public A2aServerBuilder eventConsumerExecutor(Executor eventConsumerExecutor) {
+ this.eventConsumerExecutor = Objects.requireNonNull(eventConsumerExecutor, "eventConsumerExecutor");
+ return this;
+ }
+
+ public DefaultRequestHandler build() {
+ this.builtCard = buildAgentCard();
+
+ InMemoryTaskStore taskStore = new InMemoryTaskStore();
+ MainEventBus mainEventBus = new MainEventBus();
+ InMemoryQueueManager queueManager = new InMemoryQueueManager(taskStore, mainEventBus);
+ InMemoryPushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
+ MainEventBusProcessor mainEventBusProcessor = new MainEventBusProcessor(
+ mainEventBus,
+ taskStore,
+ (event, snapshot) -> { },
+ queueManager);
+ mainEventBusProcessor.ensureStarted();
+
+ // Use virtual-thread-per-task executors by default so agent execution and SSE event
+ // consumption run off the caller thread. This prevents the streaming response from
+ // being delayed or blocked while the SSE stream is being established.
+ Executor resolvedAgentExecutor =
+ agentExecutor != null ? agentExecutor : VIRTUAL_THREAD_EXECUTOR;
+ Executor resolvedEventExecutor =
+ eventConsumerExecutor != null ? eventConsumerExecutor : VIRTUAL_THREAD_EXECUTOR;
+
+ return DefaultRequestHandler.create(
+ new A2aAgentExecutor(platform),
+ taskStore,
+ queueManager,
+ pushConfigStore,
+ mainEventBusProcessor,
+ resolvedAgentExecutor,
+ resolvedEventExecutor);
+ }
+
+ public AgentCard buildAgentCard() {
+ require(agentName, "agentName");
+ require(agentUrl, "agentUrl");
+ require(agentVersion, "agentVersion");
+
+ Map descriptions = platform.toolDescriptions();
+ List skills = new ArrayList<>();
+ // Sort for stable, deterministic card output across JVM runs
+ platform.supportedTools().stream().sorted().forEach(toolName -> {
+ String description = descriptions.getOrDefault(toolName, toolName);
+ skills.add(AgentSkill.builder()
+ .id(toolName)
+ .name(toolName)
+ .description(description)
+ .tags(List.of())
+ .examples(List.of())
+ .inputModes(List.of("text"))
+ .outputModes(List.of("text"))
+ .build());
+ });
+
+ return AgentCard.builder()
+ .name(agentName)
+ .description("AdCP A2A agent")
+ .version(agentVersion)
+ .url(agentUrl)
+ .preferredTransport("JSONRPC")
+ .capabilities(AgentCapabilities.builder().streaming(true).pushNotifications(false).build())
+ .supportedInterfaces(List.of(new AgentInterface("JSONRPC", agentUrl)))
+ .defaultInputModes(List.of("text"))
+ .defaultOutputModes(List.of("text"))
+ .skills(skills)
+ .build();
+ }
+
+ /** Returns the AgentCard built by this builder after {@link #build()} is called. */
+ public AgentCard getAgentCard() {
+ if (builtCard == null) {
+ throw new IllegalStateException("Call build() before getAgentCard()");
+ }
+ return builtCard;
+ }
+
+ private void require(@Nullable String value, String field) {
+ if (value == null || value.isBlank()) {
+ throw new ConfigurationError("A2aServerBuilder." + field + " is required", field);
+ }
+ }
+}
diff --git a/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aServlet.java b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aServlet.java
new file mode 100644
index 0000000..9424218
--- /dev/null
+++ b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/A2aServlet.java
@@ -0,0 +1,406 @@
+package org.adcontextprotocol.adcp.server.a2a;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import jakarta.servlet.AsyncContext;
+import jakarta.servlet.AsyncEvent;
+import jakarta.servlet.AsyncListener;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import org.a2aproject.sdk.jsonrpc.common.json.JsonProcessingException;
+import org.a2aproject.sdk.jsonrpc.common.json.JsonUtil;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.A2AErrorResponse;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.CancelTaskRequest;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.CancelTaskResponse;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.GetTaskRequest;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.GetTaskResponse;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.SendMessageRequest;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.SendMessageResponse;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.SendStreamingMessageRequest;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.SendStreamingMessageResponse;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.SubscribeToTaskRequest;
+import org.a2aproject.sdk.server.ServerCallContext;
+import org.a2aproject.sdk.server.auth.UnauthenticatedUser;
+import org.a2aproject.sdk.server.requesthandlers.RequestHandler;
+import org.a2aproject.sdk.server.util.sse.SseFormatter;
+import org.a2aproject.sdk.spec.A2AError;
+import org.a2aproject.sdk.spec.A2AMethods;
+import org.a2aproject.sdk.spec.AgentInterface;
+import org.a2aproject.sdk.spec.InternalError;
+import org.a2aproject.sdk.spec.InvalidRequestError;
+import org.a2aproject.sdk.spec.StreamingEventKind;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Minimal Jakarta servlet bridge for A2A JSON-RPC requests.
+ *
+ * Authentication: Use {@link #A2aServlet(RequestHandler, A2aAuthProvider)} to
+ * wire a real {@link A2aAuthProvider}. The single-argument constructor processes every
+ * request as unauthenticated and must not be used in production deployments.
+ *
+ *
Streaming responses require servlet async support; deploy this servlet with
+ * {@code asyncSupported=true}.
+ */
+public final class A2aServlet extends HttpServlet {
+
+ @java.io.Serial
+ private static final long serialVersionUID = 1L;
+
+ private static final int MAX_REQUEST_BYTES = 1 * 1024 * 1024; // 1 MB
+ private static final int MAX_METHOD_LENGTH = 128;
+ private static final int SSE_PREFETCH = 8;
+ private static final long SSE_STREAM_TIMEOUT_SECONDS = 300;
+
+ private final transient RequestHandler handler;
+ private final transient A2aAuthProvider authProvider;
+
+ /**
+ * Creates a servlet with the given auth provider.
+ * Use this constructor for production deployments.
+ */
+ public A2aServlet(RequestHandler handler, A2aAuthProvider authProvider) {
+ this.handler = Objects.requireNonNull(handler, "handler");
+ this.authProvider = Objects.requireNonNull(authProvider, "authProvider");
+ }
+
+ /**
+ * Creates a servlet that accepts all requests as unauthenticated.
+ *
+ *
WARNING: This constructor is intended for testing and local development
+ * only. Any caller that can reach this endpoint can invoke all registered tools
+ * without authentication. Use {@link #A2aServlet(RequestHandler, A2aAuthProvider)}
+ * with a real {@link A2aAuthProvider} for production deployments.
+ *
+ * @deprecated Use {@link #A2aServlet(RequestHandler, A2aAuthProvider)} for production.
+ */
+ @Deprecated
+ public A2aServlet(RequestHandler handler) {
+ this(handler, request -> new ServerCallContext(
+ UnauthenticatedUser.INSTANCE,
+ Map.of(),
+ Set.of(),
+ AgentInterface.CURRENT_PROTOCOL_VERSION));
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ Object requestId = null;
+ try {
+ String body = readRequestBody(request.getInputStream());
+ var parsedBody = JsonParser.parseString(body);
+ if (!parsedBody.isJsonObject()) {
+ throw new JsonParseException("JSON-RPC request must be an object");
+ }
+ JsonObject envelope = parsedBody.getAsJsonObject();
+ requestId = extractId(envelope.get("id"));
+ JsonElement methodElement = envelope.get("method");
+ String method = methodElement != null && methodElement.isJsonPrimitive()
+ && methodElement.getAsJsonPrimitive().isString()
+ ? methodElement.getAsString()
+ : null;
+ if (method == null || method.isBlank()) {
+ writeError(response, HttpServletResponse.SC_BAD_REQUEST, requestId,
+ new InvalidRequestError("JSON-RPC method is required"));
+ return;
+ }
+ if (method.length() > MAX_METHOD_LENGTH) {
+ writeError(response, HttpServletResponse.SC_BAD_REQUEST, requestId,
+ new InvalidRequestError("JSON-RPC method too long"));
+ return;
+ }
+
+ ServerCallContext callContext = authProvider.authenticate(request);
+
+ switch (method) {
+ case A2AMethods.SEND_MESSAGE_METHOD -> {
+ SendMessageRequest parsed = JsonUtil.fromJson(body, SendMessageRequest.class);
+ if (wantsStreaming(request)) {
+ stream(request, response, requestId,
+ handler.onMessageSendStream(parsed.getParams(), callContext));
+ } else {
+ writeJson(response, HttpServletResponse.SC_OK,
+ new SendMessageResponse(requestId,
+ handler.onMessageSend(parsed.getParams(), callContext)));
+ }
+ }
+ case A2AMethods.SEND_STREAMING_MESSAGE_METHOD -> {
+ SendStreamingMessageRequest parsed = JsonUtil.fromJson(body, SendStreamingMessageRequest.class);
+ stream(request, response, requestId,
+ handler.onMessageSendStream(parsed.getParams(), callContext));
+ }
+ case A2AMethods.GET_TASK_METHOD -> {
+ GetTaskRequest parsed = JsonUtil.fromJson(body, GetTaskRequest.class);
+ writeJson(response, HttpServletResponse.SC_OK,
+ new GetTaskResponse(requestId,
+ handler.onGetTask(parsed.getParams(), callContext)));
+ }
+ case A2AMethods.CANCEL_TASK_METHOD -> {
+ CancelTaskRequest parsed = JsonUtil.fromJson(body, CancelTaskRequest.class);
+ writeJson(response, HttpServletResponse.SC_OK,
+ new CancelTaskResponse(requestId,
+ handler.onCancelTask(parsed.getParams(), callContext)));
+ }
+ case A2AMethods.SUBSCRIBE_TO_TASK_METHOD -> {
+ SubscribeToTaskRequest parsed = JsonUtil.fromJson(body, SubscribeToTaskRequest.class);
+ stream(request, response, requestId,
+ handler.onSubscribeToTask(parsed.getParams(), callContext));
+ }
+ default -> writeError(response, HttpServletResponse.SC_BAD_REQUEST, requestId,
+ new InvalidRequestError("Unsupported JSON-RPC method: "
+ + sanitizeMethodName(method)));
+ }
+ } catch (JsonParseException | JsonProcessingException e) {
+ writeError(response, HttpServletResponse.SC_BAD_REQUEST, requestId,
+ new InvalidRequestError("Invalid JSON-RPC request"));
+ } catch (IOException e) {
+ if (!response.isCommitted()) {
+ writeError(response, HttpServletResponse.SC_BAD_REQUEST, requestId,
+ new InvalidRequestError("Invalid request body"));
+ return;
+ }
+ throw e;
+ } catch (A2AError e) {
+ writeError(response, HttpServletResponse.SC_OK, requestId, e);
+ } catch (Exception e) {
+ writeError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, requestId,
+ new InternalError("Internal error"));
+ }
+ }
+
+ private static boolean wantsStreaming(HttpServletRequest request) {
+ String accept = request.getHeader("Accept");
+ return accept != null && accept.toLowerCase(java.util.Locale.ROOT).contains("text/event-stream");
+ }
+
+ private static String readRequestBody(InputStream inputStream) throws IOException {
+ try (InputStream in = inputStream; ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[1024];
+ int total = 0;
+ int read;
+ while ((read = in.read(buffer)) != -1) {
+ total += read;
+ if (total > MAX_REQUEST_BYTES) {
+ throw new IOException("A2A request body exceeds " + MAX_REQUEST_BYTES + " bytes");
+ }
+ out.write(buffer, 0, read);
+ }
+ return out.toString(StandardCharsets.UTF_8);
+ }
+ }
+
+ private static Object extractId(JsonElement idElement) {
+ if (idElement == null || idElement.isJsonNull()) {
+ return null;
+ }
+ if (idElement.isJsonPrimitive()) {
+ if (idElement.getAsJsonPrimitive().isString()) {
+ String s = idElement.getAsString();
+ return s.length() > 128 ? s.substring(0, 128) : s;
+ }
+ if (idElement.getAsJsonPrimitive().isNumber()) {
+ return idElement.getAsNumber();
+ }
+ // Boolean ids are non-conforming per JSON-RPC 2.0; treat as null
+ }
+ // Structured ids (arrays, objects) are non-conforming; do not echo
+ return null;
+ }
+
+ private static String sanitizeMethodName(String method) {
+ if (method == null) return "(null)";
+ String truncated = method.length() > MAX_METHOD_LENGTH
+ ? method.substring(0, MAX_METHOD_LENGTH) + "..."
+ : method;
+ return truncated.replaceAll("[\\p{Cc}]", "");
+ }
+
+ private static void writeJson(HttpServletResponse response, int status, Object payload) throws IOException {
+ response.setStatus(status);
+ response.setCharacterEncoding(StandardCharsets.UTF_8.name());
+ response.setContentType("application/json");
+ try {
+ response.getWriter().write(JsonUtil.toJson(payload));
+ } catch (JsonProcessingException e) {
+ throw new IOException("Failed to serialize JSON-RPC response", e);
+ }
+ response.getWriter().flush();
+ }
+
+ private static void writeError(HttpServletResponse response, int status,
+ Object requestId, A2AError error) throws IOException {
+ writeJson(response, status, new A2AErrorResponse(requestId, error));
+ }
+
+ private static void stream(HttpServletRequest request, HttpServletResponse response, Object requestId,
+ Flow.Publisher publisher) throws IOException {
+ if (!request.isAsyncSupported()) {
+ throw new IllegalStateException("A2aServlet requires asyncSupported=true for streaming responses");
+ }
+ AsyncContext asyncContext = request.startAsync(request, response);
+ asyncContext.setTimeout(SSE_STREAM_TIMEOUT_SECONDS * 1000L);
+
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.setCharacterEncoding(StandardCharsets.UTF_8.name());
+ response.setContentType("text/event-stream");
+ response.setHeader("Cache-Control", "no-cache");
+
+ AtomicLong sequence = new AtomicLong(1);
+ AtomicReference subRef = new AtomicReference<>();
+ AtomicBoolean completed = new AtomicBoolean();
+ Object writerLock = new Object();
+
+ asyncContext.addListener(new AsyncListener() {
+ @Override
+ public void onComplete(AsyncEvent event) {
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent event) {
+ cancelSubscription(subRef);
+ try {
+ writeTimeoutResponse(response, requestId, sequence, asyncContext, writerLock, completed);
+ } catch (IOException ignored) {
+ completeAsync(asyncContext, writerLock, completed);
+ }
+ }
+
+ @Override
+ public void onError(AsyncEvent event) {
+ cancelSubscription(subRef);
+ try {
+ writeFinalStreamingResponse(response,
+ new SendStreamingMessageResponse(requestId, toA2aError(event.getThrowable())),
+ sequence, asyncContext, writerLock, completed);
+ } catch (IOException ignored) {
+ completeAsync(asyncContext, writerLock, completed);
+ }
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent event) {
+ }
+ });
+
+ publisher.subscribe(new Flow.Subscriber<>() {
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ subRef.set(subscription);
+ subscription.request(SSE_PREFETCH);
+ }
+
+ @Override
+ public void onNext(StreamingEventKind item) {
+ try {
+ writeStreamingResponse(response, new SendStreamingMessageResponse(requestId, item),
+ sequence, writerLock, completed);
+ Flow.Subscription subscription = subRef.get();
+ if (subscription != null && !completed.get()) {
+ subscription.request(1);
+ }
+ } catch (IOException e) {
+ cancelSubscription(subRef);
+ completeAsync(asyncContext, writerLock, completed);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ try {
+ writeFinalStreamingResponse(response,
+ new SendStreamingMessageResponse(requestId, toA2aError(throwable)),
+ sequence, asyncContext, writerLock, completed);
+ } catch (IOException ignored) {
+ completeAsync(asyncContext, writerLock, completed);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ completeAsync(asyncContext, writerLock, completed);
+ }
+ });
+ }
+
+ private static void writeTimeoutResponse(HttpServletResponse response, Object requestId,
+ AtomicLong sequence, AsyncContext asyncContext, Object writerLock,
+ AtomicBoolean completed) throws IOException {
+ // Both the committed and uncommitted cases are handled atomically under a single lock
+ // acquisition to prevent onNext() from writing another SSE event between the timeout
+ // decision and the final timeout event / completion.
+ synchronized (writerLock) {
+ if (completed.get()) {
+ return;
+ }
+ completed.set(true);
+ if (!response.isCommitted()) {
+ writeError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, requestId,
+ new InternalError("Streaming response timed out"));
+ } else {
+ response.getWriter().write(SseFormatter.formatResponseAsSSE(
+ new SendStreamingMessageResponse(requestId, new InternalError("Streaming response timed out")),
+ sequence.getAndIncrement()));
+ response.getWriter().flush();
+ }
+ asyncContext.complete();
+ }
+ }
+
+ private static void writeFinalStreamingResponse(HttpServletResponse response, SendStreamingMessageResponse payload,
+ AtomicLong sequence, AsyncContext asyncContext,
+ Object writerLock, AtomicBoolean completed) throws IOException {
+ synchronized (writerLock) {
+ if (completed.compareAndSet(false, true)) {
+ response.getWriter().write(SseFormatter.formatResponseAsSSE(payload, sequence.getAndIncrement()));
+ response.getWriter().flush();
+ asyncContext.complete();
+ }
+ }
+ }
+
+ private static void writeStreamingResponse(HttpServletResponse response, SendStreamingMessageResponse payload,
+ AtomicLong sequence, Object writerLock,
+ AtomicBoolean completed) throws IOException {
+ synchronized (writerLock) {
+ if (completed.get()) {
+ return;
+ }
+ response.getWriter().write(SseFormatter.formatResponseAsSSE(payload, sequence.getAndIncrement()));
+ response.getWriter().flush();
+ }
+ }
+
+ private static void cancelSubscription(AtomicReference subRef) {
+ Flow.Subscription subscription = subRef.getAndSet(null);
+ if (subscription != null) {
+ subscription.cancel();
+ }
+ }
+
+ private static void completeAsync(AsyncContext asyncContext, Object writerLock, AtomicBoolean completed) {
+ synchronized (writerLock) {
+ if (completed.compareAndSet(false, true)) {
+ asyncContext.complete();
+ }
+ }
+ }
+
+ private static A2AError toA2aError(Throwable throwable) {
+ return throwable instanceof A2AError a2aError
+ ? a2aError
+ : new InternalError("Internal error");
+ }
+}
diff --git a/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/package-info.java b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/package-info.java
new file mode 100644
index 0000000..c500cdd
--- /dev/null
+++ b/adcp-server/src/main/java/org/adcontextprotocol/adcp/server/a2a/package-info.java
@@ -0,0 +1,7 @@
+/**
+ * A2A server-side transport support: request-handler wiring, JSON-RPC servlet
+ * dispatch, and {@link org.a2aproject.sdk.server.agentexecution.AgentExecutor}
+ * adaptation onto {@code AdcpPlatform}.
+ */
+@org.jspecify.annotations.NullMarked
+package org.adcontextprotocol.adcp.server.a2a;
diff --git a/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aAgentExecutorTest.java b/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aAgentExecutorTest.java
new file mode 100644
index 0000000..ad7b9e0
--- /dev/null
+++ b/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aAgentExecutorTest.java
@@ -0,0 +1,251 @@
+package org.adcontextprotocol.adcp.server.a2a;
+
+import org.a2aproject.sdk.server.ServerCallContext;
+import org.a2aproject.sdk.server.agentexecution.RequestContext;
+import org.a2aproject.sdk.server.auth.UnauthenticatedUser;
+import org.a2aproject.sdk.server.events.EventQueue;
+import org.a2aproject.sdk.server.events.EventQueueItem;
+import org.a2aproject.sdk.server.tasks.AgentEmitter;
+import org.a2aproject.sdk.spec.AgentInterface;
+import org.a2aproject.sdk.spec.DataPart;
+import org.a2aproject.sdk.spec.InvalidRequestError;
+import org.a2aproject.sdk.spec.Message;
+import org.a2aproject.sdk.spec.MessageSendParams;
+import org.a2aproject.sdk.spec.Part;
+import org.a2aproject.sdk.spec.TextPart;
+import org.adcontextprotocol.adcp.AdcpVersion;
+import org.adcontextprotocol.adcp.server.AdcpContext;
+import org.adcontextprotocol.adcp.server.AdcpPlatform;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class A2aAgentExecutorTest {
+
+ @Test
+ void execute_dispatches_tool_and_emits_response() throws Exception {
+ RecordingPlatform platform = new RecordingPlatform();
+ A2aAgentExecutor executor = new A2aAgentExecutor(platform);
+ RequestContext context = requestContext(Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .messageId("msg-1")
+ .metadata(Map.of("adcp_tool_name", "echo"))
+ .parts(new TextPart("echo"), new DataPart(Map.of(
+ "adcp_major_version", 3,
+ "adcp_version", "3.1",
+ "query", "test")))
+ .build());
+ RecordingEmitter emitter = new RecordingEmitter(context);
+
+ executor.execute(context, emitter);
+
+ assertEquals("echo", platform.toolName);
+ assertEquals(Map.of("query", "test"), platform.request);
+ assertNotNull(platform.context);
+ assertEquals(new AdcpVersion(3, "3.1"), platform.context.adcpVersion());
+ assertTrue(emitter.started);
+ assertTrue(emitter.completed);
+ assertNotNull(emitter.messageParts);
+ assertEquals("{\"echo\":true}", ((TextPart) emitter.messageParts.getFirst()).text());
+ }
+
+ @Test
+ void execute_includes_call_context_metadata_in_adcp_context() throws Exception {
+ RecordingPlatform platform = new RecordingPlatform();
+ A2aAgentExecutor executor = new A2aAgentExecutor(platform);
+ RequestContext context = requestContext(
+ Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .metadata(Map.of("adcp_tool_name", "echo"))
+ .parts(new TextPart("echo"))
+ .build(),
+ new ServerCallContext(
+ UnauthenticatedUser.INSTANCE,
+ Map.of(
+ "tenant", "acme",
+ "priority", 7,
+ "features", List.of("a", "b")),
+ Set.of(),
+ AgentInterface.CURRENT_PROTOCOL_VERSION));
+ RecordingEmitter emitter = new RecordingEmitter(context);
+
+ executor.execute(context, emitter);
+
+ assertEquals(Map.of("tenant", "acme", "priority", "7"), platform.context.headers());
+ }
+
+ @Test
+ void cancel_delegates_to_emitter_cancel() throws Exception {
+ A2aAgentExecutor executor = new A2aAgentExecutor(new RecordingPlatform());
+ RequestContext context = requestContext(Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .parts(new TextPart("echo"))
+ .build());
+ RecordingEmitter emitter = new RecordingEmitter(context);
+
+ executor.cancel(context, emitter);
+
+ assertTrue(emitter.canceled);
+ }
+
+ @Test
+ void execute_propagates_invalid_request_error_for_missing_tool_name() throws Exception {
+ A2aAgentExecutor executor = new A2aAgentExecutor(new RecordingPlatform());
+ // Message with no tool name in metadata and no non-blank text parts
+ RequestContext context = requestContext(Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .parts(new DataPart(java.util.Map.of("key", "value")))
+ .build());
+ RecordingEmitter emitter = new RecordingEmitter(context);
+
+ // Should propagate as InvalidRequestError (A2AError), not be swallowed as internal_error
+ assertThrows(InvalidRequestError.class, () -> executor.execute(context, emitter));
+ }
+
+ @Test
+ void execute_propagates_invalid_request_error_for_non_object_adcp_args() throws Exception {
+ A2aAgentExecutor executor = new A2aAgentExecutor(new RecordingPlatform());
+ // adcp_args is a String, not a JSON object — should be InvalidRequestError
+ RequestContext context = requestContext(Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .metadata(java.util.Map.of(
+ "adcp_tool_name", "echo",
+ "adcp_args", "not-an-object"))
+ .parts(new TextPart("echo"))
+ .build());
+ RecordingEmitter emitter = new RecordingEmitter(context);
+
+ assertThrows(InvalidRequestError.class, () -> executor.execute(context, emitter));
+ }
+
+ @Test
+ void execute_propagates_invalid_request_error_for_control_char_in_tool_name() throws Exception {
+ A2aAgentExecutor executor = new A2aAgentExecutor(new RecordingPlatform());
+ RequestContext context = requestContext(Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .metadata(java.util.Map.of("adcp_tool_name", "echo\u0000bad"))
+ .parts(new TextPart("echo"))
+ .build());
+ RecordingEmitter emitter = new RecordingEmitter(context);
+
+ assertThrows(InvalidRequestError.class, () -> executor.execute(context, emitter));
+ }
+
+ @Test
+ void execute_propagates_invalid_request_error_for_non_object_data_part() throws Exception {
+ A2aAgentExecutor executor = new A2aAgentExecutor(new RecordingPlatform());
+ // DataPart with a List instead of a Map — should be InvalidRequestError
+ RequestContext context = requestContext(Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .metadata(java.util.Map.of("adcp_tool_name", "echo"))
+ .parts(new DataPart(java.util.List.of("not", "a", "map")))
+ .build());
+ RecordingEmitter emitter = new RecordingEmitter(context);
+
+ assertThrows(InvalidRequestError.class, () -> executor.execute(context, emitter));
+ }
+
+ private static RequestContext requestContext(Message message) throws Exception {
+ return requestContext(message, null);
+ }
+
+ private static RequestContext requestContext(Message message, ServerCallContext callContext) throws Exception {
+ RequestContext.Builder builder = new RequestContext.Builder()
+ .setTaskId("task-1")
+ .setContextId("ctx-1")
+ .setParams(new MessageSendParams(message, null, null));
+ if (callContext != null) {
+ builder.setServerCallContext(callContext);
+ }
+ return builder.build();
+ }
+
+ private static final class RecordingPlatform extends AdcpPlatform {
+ private String toolName;
+ private Map request;
+ private AdcpContext context;
+
+ @Override
+ public Set supportedTools() {
+ return Set.of("echo");
+ }
+
+ @Override
+ public Object handleTool(String toolName, Map request, AdcpContext ctx) {
+ this.toolName = toolName;
+ this.request = request;
+ this.context = ctx;
+ return Map.of("echo", true);
+ }
+ }
+
+ private static final class RecordingEmitter extends AgentEmitter {
+ private boolean started;
+ private boolean completed;
+ private boolean canceled;
+ private List> messageParts;
+
+ private RecordingEmitter(RequestContext context) {
+ super(context, new NoOpEventQueue());
+ }
+
+ @Override
+ public void startWork() {
+ started = true;
+ }
+
+ @Override
+ public void sendMessage(List> parts) {
+ this.messageParts = parts;
+ }
+
+ @Override
+ public void complete() {
+ completed = true;
+ }
+
+ @Override
+ public void cancel() {
+ canceled = true;
+ }
+ }
+
+ private static final class NoOpEventQueue extends EventQueue {
+ @Override
+ public void awaitQueuePollerStart() {}
+
+ @Override
+ public void signalQueuePollerStarted() {}
+
+ @Override
+ public void enqueueItem(EventQueueItem item) {}
+
+ @Override
+ public EventQueue tap() {
+ return this;
+ }
+
+ @Override
+ public EventQueueItem dequeueEventItem(int timeoutMillis) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void close(boolean clear) {}
+
+ @Override
+ public void close(boolean clear, boolean interruptPollers) {}
+ }
+}
diff --git a/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aServerBuilderTest.java b/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aServerBuilderTest.java
new file mode 100644
index 0000000..e2e7aba
--- /dev/null
+++ b/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aServerBuilderTest.java
@@ -0,0 +1,177 @@
+package org.adcontextprotocol.adcp.server.a2a;
+
+import org.a2aproject.sdk.server.requesthandlers.DefaultRequestHandler;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.AgentSkill;
+import org.adcontextprotocol.adcp.error.ConfigurationError;
+import org.adcontextprotocol.adcp.server.AdcpPlatform;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class A2aServerBuilderTest {
+
+ @Test
+ void build_requires_agent_name() {
+ A2aServerBuilder builder = A2aServerBuilder.create(platform())
+ .agentUrl("https://agent.example.com")
+ .agentVersion("1.0.0");
+
+ ConfigurationError error = assertThrows(ConfigurationError.class, builder::build);
+ assertEquals("agentName", error.configField());
+ }
+
+ @Test
+ void build_requires_agent_url() {
+ A2aServerBuilder builder = A2aServerBuilder.create(platform())
+ .agentName("test-agent")
+ .agentVersion("1.0.0");
+
+ ConfigurationError error = assertThrows(ConfigurationError.class, builder::build);
+ assertEquals("agentUrl", error.configField());
+ }
+
+ @Test
+ void build_requires_agent_version() {
+ A2aServerBuilder builder = A2aServerBuilder.create(platform())
+ .agentName("test-agent")
+ .agentUrl("https://agent.example.com");
+
+ ConfigurationError error = assertThrows(ConfigurationError.class, builder::build);
+ assertEquals("agentVersion", error.configField());
+ }
+
+ @Test
+ void build_returns_default_request_handler() {
+ DefaultRequestHandler handler = A2aServerBuilder.create(platform())
+ .agentName("test-agent")
+ .agentUrl("https://agent.example.com")
+ .agentVersion("1.0.0")
+ .build();
+
+ assertNotNull(handler);
+ }
+
+ @Test
+ void build_exposes_agent_card() {
+ A2aServerBuilder builder = A2aServerBuilder.create(platform())
+ .agentName("test-agent")
+ .agentUrl("https://agent.example.com")
+ .agentVersion("1.0.0");
+
+ builder.build();
+ AgentCard card = builder.getAgentCard();
+
+ assertEquals("test-agent", card.name());
+ assertEquals("https://agent.example.com", card.url());
+ assertEquals("1.0.0", card.version());
+ }
+
+ @Test
+ void build_agent_card_populates_skills_from_platform_supported_tools() {
+ AdcpPlatform richPlatform = new AdcpPlatform() {
+ @Override
+ public Set supportedTools() {
+ return Set.of("get_products", "get_creatives");
+ }
+
+ @Override
+ public Map toolDescriptions() {
+ return Map.of(
+ "get_products", "Fetch available ad products",
+ "get_creatives", "Retrieve creative assets");
+ }
+ };
+
+ A2aServerBuilder builder = A2aServerBuilder.create(richPlatform)
+ .agentName("rich-agent")
+ .agentUrl("https://agent.example.com")
+ .agentVersion("1.0.0");
+
+ AgentCard card = builder.buildAgentCard();
+
+ assertNotNull(card.skills());
+ assertEquals(2, card.skills().size());
+
+ // Sorted by tool name for deterministic ordering
+ List skills = card.skills().stream()
+ .sorted(java.util.Comparator.comparing(AgentSkill::id))
+ .toList();
+ assertEquals("get_creatives", skills.get(0).id());
+ assertEquals("Retrieve creative assets", skills.get(0).description());
+ assertEquals("get_products", skills.get(1).id());
+ assertEquals("Fetch available ad products", skills.get(1).description());
+ }
+
+ @Test
+ void build_agent_card_uses_tool_name_as_description_when_not_provided() {
+ A2aServerBuilder builder = A2aServerBuilder.create(platform()) // platform has "echo" with no description
+ .agentName("test-agent")
+ .agentUrl("https://agent.example.com")
+ .agentVersion("1.0.0");
+
+ AgentCard card = builder.buildAgentCard();
+
+ assertEquals(1, card.skills().size());
+ AgentSkill skill = card.skills().get(0);
+ assertEquals("echo", skill.id());
+ assertEquals("echo", skill.description()); // falls back to tool name
+ }
+
+ @Test
+ void build_agent_card_has_empty_skills_when_platform_has_no_tools() {
+ AdcpPlatform emptyPlatform = new AdcpPlatform() {};
+
+ AgentCard card = A2aServerBuilder.create(emptyPlatform)
+ .agentName("empty-agent")
+ .agentUrl("https://agent.example.com")
+ .agentVersion("1.0.0")
+ .buildAgentCard();
+
+ assertNotNull(card.skills());
+ assertTrue(card.skills().isEmpty());
+ }
+
+ @Test
+ void build_uses_injectable_agent_executor_not_caller_thread() throws Exception {
+ AtomicReference executorThread = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Inject an executor that records which thread ran the task
+ java.util.concurrent.Executor recordingExecutor = task -> {
+ Thread t = new Thread(() -> {
+ executorThread.set(Thread.currentThread());
+ latch.countDown();
+ task.run();
+ }, "test-agent-thread");
+ t.setDaemon(true);
+ t.start();
+ };
+
+ A2aServerBuilder.create(platform())
+ .agentName("test-agent")
+ .agentUrl("https://agent.example.com")
+ .agentVersion("1.0.0")
+ .agentExecutor(recordingExecutor)
+ .build(); // just verify it builds without error
+
+ // Verify the builder accepts a custom executor (structural test)
+ assertNotNull(recordingExecutor);
+ }
+
+ private static AdcpPlatform platform() {
+ return new AdcpPlatform() {
+ @Override
+ public Set supportedTools() {
+ return Set.of("echo");
+ }
+ };
+ }
+}
diff --git a/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aServletTest.java b/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aServletTest.java
new file mode 100644
index 0000000..0894dad
--- /dev/null
+++ b/adcp-server/src/test/java/org/adcontextprotocol/adcp/server/a2a/A2aServletTest.java
@@ -0,0 +1,556 @@
+package org.adcontextprotocol.adcp.server.a2a;
+
+import jakarta.servlet.AsyncContext;
+import jakarta.servlet.AsyncEvent;
+import jakarta.servlet.AsyncListener;
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletContext;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.ServletRequest;
+import jakarta.servlet.ServletResponse;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import org.a2aproject.sdk.jsonrpc.common.json.JsonUtil;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.ListTasksResult;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.SendMessageRequest;
+import org.a2aproject.sdk.server.ServerCallContext;
+import org.a2aproject.sdk.server.requesthandlers.RequestHandler;
+import org.a2aproject.sdk.spec.A2AError;
+import org.a2aproject.sdk.spec.CancelTaskParams;
+import org.a2aproject.sdk.spec.DeleteTaskPushNotificationConfigParams;
+import org.a2aproject.sdk.spec.EventKind;
+import org.a2aproject.sdk.spec.GetTaskPushNotificationConfigParams;
+import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsParams;
+import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsResult;
+import org.a2aproject.sdk.spec.ListTasksParams;
+import org.a2aproject.sdk.spec.Message;
+import org.a2aproject.sdk.spec.MessageSendParams;
+import org.a2aproject.sdk.spec.StreamingEventKind;
+import org.a2aproject.sdk.spec.Task;
+import org.a2aproject.sdk.spec.TaskIdParams;
+import org.a2aproject.sdk.spec.TaskPushNotificationConfig;
+import org.a2aproject.sdk.spec.TaskQueryParams;
+import org.a2aproject.sdk.spec.TextPart;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Proxy;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Flow;
+import java.util.concurrent.SubmissionPublisher;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("deprecation")
+class A2aServletTest {
+
+ @Test
+ void doPost_returns_bad_request_when_body_exceeds_limit() throws Exception {
+ A2aServlet servlet = new A2aServlet(new RecordingRequestHandler());
+ byte[] body = new byte[(1024 * 1024) + 1];
+ for (int i = 0; i < body.length; i++) {
+ body[i] = 'x';
+ }
+ TestHttpServletResponse response = new TestHttpServletResponse();
+
+ servlet.doPost(request(body, null), response.asServletResponse());
+
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, response.status());
+ assertTrue(response.body().contains("Invalid request body"));
+ }
+
+ @Test
+ void doPost_returns_bad_request_for_malformed_json() throws Exception {
+ A2aServlet servlet = new A2aServlet(new RecordingRequestHandler());
+ TestHttpServletResponse response = new TestHttpServletResponse();
+
+ servlet.doPost(request("{".getBytes(StandardCharsets.UTF_8), null), response.asServletResponse());
+
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, response.status());
+ assertTrue(response.body().contains("Invalid JSON-RPC request"));
+ }
+
+ @Test
+ void doPost_returns_bad_request_when_method_is_missing() throws Exception {
+ A2aServlet servlet = new A2aServlet(new RecordingRequestHandler());
+ TestHttpServletResponse response = new TestHttpServletResponse();
+
+ servlet.doPost(request(
+ "{\"jsonrpc\":\"2.0\",\"id\":1,\"params\":{}}".getBytes(StandardCharsets.UTF_8),
+ null),
+ response.asServletResponse());
+
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, response.status());
+ assertTrue(response.body().contains("JSON-RPC method is required"));
+ }
+
+ @Test
+ void doPost_returns_bad_request_for_unknown_method() throws Exception {
+ A2aServlet servlet = new A2aServlet(new RecordingRequestHandler());
+ TestHttpServletResponse response = new TestHttpServletResponse();
+
+ servlet.doPost(request(
+ "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"unknown\",\"params\":{}}"
+ .getBytes(StandardCharsets.UTF_8),
+ null),
+ response.asServletResponse());
+
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, response.status());
+ assertTrue(response.body().contains("Unsupported JSON-RPC method: unknown"));
+ }
+
+ @Test
+ void doPost_writes_sync_response_for_send_message_without_sse_accept() throws Exception {
+ RecordingRequestHandler handler = new RecordingRequestHandler();
+ A2aServlet servlet = new A2aServlet(handler);
+ MessageSendParams params = new MessageSendParams(
+ Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .messageId("msg-1")
+ .parts(new TextPart("echo"))
+ .build(),
+ null,
+ null);
+ String body = JsonUtil.toJson(new SendMessageRequest("req-1", params));
+ TestHttpServletResponse response = new TestHttpServletResponse();
+
+ servlet.doPost(request(body.getBytes(StandardCharsets.UTF_8), null), response.asServletResponse());
+
+ assertEquals(HttpServletResponse.SC_OK, response.status());
+ assertEquals("application/json", response.contentType());
+ assertEquals(1, handler.messageSendCalls());
+ assertEquals(0, handler.messageSendStreamCalls());
+ assertTrue(response.body().contains("req-1"));
+ assertTrue(response.body().contains("ok"));
+ }
+
+ @Test
+ void doPost_streams_async_submission_publisher_events() throws Exception {
+ AsyncStreamingRequestHandler handler = new AsyncStreamingRequestHandler();
+ A2aServlet servlet = new A2aServlet(handler);
+ MessageSendParams params = new MessageSendParams(
+ Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .messageId("msg-1")
+ .parts(new TextPart("echo"))
+ .build(),
+ null,
+ null);
+ String body = JsonUtil.toJson(new SendMessageRequest("req-1", params));
+ TestHttpServletRequest request = asyncRequest(body.getBytes(StandardCharsets.UTF_8), "text/event-stream");
+ TestHttpServletResponse response = new TestHttpServletResponse();
+
+ servlet.doPost(request.asServletRequest(), response.asServletResponse());
+
+ assertTrue(request.awaitAsyncCompletion(5, TimeUnit.SECONDS));
+ assertEquals(HttpServletResponse.SC_OK, response.status());
+ assertEquals("text/event-stream", response.contentType());
+ assertEquals(1, handler.messageSendStreamCalls());
+ assertTrue(response.body().contains("first"));
+ assertTrue(response.body().contains("second"));
+ }
+
+ private static HttpServletRequest request(byte[] body, String accept) {
+ return new TestHttpServletRequest(body, accept, false).asServletRequest();
+ }
+
+ private static TestHttpServletRequest asyncRequest(byte[] body, String accept) {
+ return new TestHttpServletRequest(body, accept, true);
+ }
+
+ private static Object defaultValue(Class> type) {
+ if (type == void.class || !type.isPrimitive()) {
+ return null;
+ }
+ if (type == boolean.class) {
+ return false;
+ }
+ if (type == byte.class) {
+ return (byte) 0;
+ }
+ if (type == short.class) {
+ return (short) 0;
+ }
+ if (type == int.class) {
+ return 0;
+ }
+ if (type == long.class) {
+ return 0L;
+ }
+ if (type == float.class) {
+ return 0f;
+ }
+ if (type == double.class) {
+ return 0d;
+ }
+ if (type == char.class) {
+ return '\0';
+ }
+ throw new IllegalArgumentException("Unsupported primitive type: " + type);
+ }
+
+ private static final class ByteArrayServletInputStream extends ServletInputStream {
+ private final ByteArrayInputStream delegate;
+
+ private ByteArrayServletInputStream(byte[] body) {
+ this.delegate = new ByteArrayInputStream(body);
+ }
+
+ @Override
+ public int read() {
+ return delegate.read();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return delegate.available() == 0;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static final class TestHttpServletRequest {
+ private final byte[] body;
+ private final String accept;
+ private final boolean asyncSupported;
+ private final HttpServletRequest servletRequest;
+ private volatile TestAsyncContext asyncContext;
+
+ private TestHttpServletRequest(byte[] body, String accept, boolean asyncSupported) {
+ this.body = body;
+ this.accept = accept;
+ this.asyncSupported = asyncSupported;
+ this.servletRequest = (HttpServletRequest) Proxy.newProxyInstance(
+ HttpServletRequest.class.getClassLoader(),
+ new Class>[]{HttpServletRequest.class},
+ (proxy, method, args) -> switch (method.getName()) {
+ case "getInputStream" -> new ByteArrayServletInputStream(this.body);
+ case "getHeader" -> "Accept".equals(args[0]) ? this.accept : null;
+ case "isAsyncSupported" -> this.asyncSupported;
+ case "isAsyncStarted" -> asyncContext != null && !asyncContext.isCompleted();
+ case "startAsync" -> startAsync(args);
+ case "getAsyncContext" -> asyncContext;
+ default -> defaultValue(method.getReturnType());
+ });
+ }
+
+ private HttpServletRequest asServletRequest() {
+ return servletRequest;
+ }
+
+ private boolean awaitAsyncCompletion(long timeout, TimeUnit unit) throws InterruptedException {
+ return asyncContext != null && asyncContext.awaitCompletion(timeout, unit);
+ }
+
+ private AsyncContext startAsync(Object[] args) {
+ if (!asyncSupported) {
+ throw new IllegalStateException("async not supported");
+ }
+ ServletRequest request = args != null && args.length > 0
+ ? (ServletRequest) args[0]
+ : servletRequest;
+ ServletResponse response = args != null && args.length > 1
+ ? (ServletResponse) args[1]
+ : null;
+ asyncContext = new TestAsyncContext(request, response);
+ return asyncContext;
+ }
+ }
+
+ private static final class TestAsyncContext implements AsyncContext {
+ private final ServletRequest request;
+ private final ServletResponse response;
+ private final CountDownLatch completed = new CountDownLatch(1);
+ private final java.util.List listeners = new CopyOnWriteArrayList<>();
+ private volatile long timeout;
+ private volatile boolean done;
+
+ private TestAsyncContext(ServletRequest request, ServletResponse response) {
+ this.request = request;
+ this.response = response;
+ }
+
+ @Override
+ public ServletRequest getRequest() {
+ return request;
+ }
+
+ @Override
+ public ServletResponse getResponse() {
+ return response;
+ }
+
+ @Override
+ public boolean hasOriginalRequestAndResponse() {
+ return true;
+ }
+
+ @Override
+ public void dispatch() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dispatch(String path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dispatch(ServletContext context, String path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void complete() {
+ done = true;
+ AsyncEvent event = new AsyncEvent(this, request, response);
+ for (AsyncListener listener : listeners) {
+ try {
+ listener.onComplete(event);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ completed.countDown();
+ }
+
+ @Override
+ public void start(Runnable run) {
+ new Thread(run).start();
+ }
+
+ @Override
+ public void addListener(AsyncListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void addListener(AsyncListener listener, ServletRequest request, ServletResponse response) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public T createListener(Class clazz) throws ServletException {
+ try {
+ return clazz.getDeclaredConstructor().newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new ServletException(e);
+ }
+ }
+
+ @Override
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ @Override
+ public long getTimeout() {
+ return timeout;
+ }
+
+ private boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
+ return completed.await(timeout, unit);
+ }
+
+ private boolean isCompleted() {
+ return done;
+ }
+ }
+
+ private static final class TestHttpServletResponse {
+ private final StringWriter buffer = new StringWriter();
+ private final PrintWriter writer = new PrintWriter(buffer) {
+ @Override
+ public void flush() {
+ super.flush();
+ committed = true;
+ }
+ };
+ private final Map headers = new LinkedHashMap<>();
+ private int status;
+ private String characterEncoding;
+ private String contentType;
+ private boolean committed;
+
+ private HttpServletResponse asServletResponse() {
+ return (HttpServletResponse) Proxy.newProxyInstance(
+ HttpServletResponse.class.getClassLoader(),
+ new Class>[]{HttpServletResponse.class},
+ (proxy, method, args) -> switch (method.getName()) {
+ case "setStatus" -> {
+ status = (int) args[0];
+ yield null;
+ }
+ case "setCharacterEncoding" -> {
+ characterEncoding = (String) args[0];
+ yield null;
+ }
+ case "setContentType" -> {
+ contentType = (String) args[0];
+ yield null;
+ }
+ case "setHeader" -> {
+ headers.put((String) args[0], (String) args[1]);
+ yield null;
+ }
+ case "getWriter" -> writer;
+ case "flushBuffer" -> {
+ writer.flush();
+ committed = true;
+ yield null;
+ }
+ case "isCommitted" -> committed;
+ case "getCharacterEncoding" -> characterEncoding;
+ case "getContentType" -> contentType;
+ default -> defaultValue(method.getReturnType());
+ });
+ }
+
+ private int status() {
+ return status;
+ }
+
+ private String body() {
+ writer.flush();
+ return buffer.toString();
+ }
+
+ private String contentType() {
+ return contentType;
+ }
+ }
+
+ private static class RecordingRequestHandler implements RequestHandler {
+ private int messageSendCalls;
+ private int messageSendStreamCalls;
+
+ @Override
+ public Task onGetTask(TaskQueryParams params, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public Task onCancelTask(CancelTaskParams params, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public EventKind onMessageSend(MessageSendParams params, ServerCallContext callContext) {
+ messageSendCalls++;
+ return Message.builder()
+ .role(Message.Role.ROLE_AGENT)
+ .messageId("reply-1")
+ .parts(new TextPart("ok"))
+ .build();
+ }
+
+ @Override
+ public Flow.Publisher onMessageSendStream(
+ MessageSendParams params, ServerCallContext callContext) {
+ messageSendStreamCalls++;
+ return streamingPublisher();
+ }
+
+ @Override
+ public TaskPushNotificationConfig onCreateTaskPushNotificationConfig(
+ TaskPushNotificationConfig taskPushNotificationConfig, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public TaskPushNotificationConfig onGetTaskPushNotificationConfig(
+ GetTaskPushNotificationConfigParams params, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public Flow.Publisher onSubscribeToTask(
+ TaskIdParams params, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public ListTaskPushNotificationConfigsResult onListTaskPushNotificationConfigs(
+ ListTaskPushNotificationConfigsParams params, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public void onDeleteTaskPushNotificationConfig(
+ DeleteTaskPushNotificationConfigParams params, ServerCallContext callContext) {
+ throw unused();
+ }
+
+ @Override
+ public void validateRequestedTask(String taskId) throws A2AError {
+ }
+
+ int messageSendCalls() {
+ return messageSendCalls;
+ }
+
+ int messageSendStreamCalls() {
+ return messageSendStreamCalls;
+ }
+
+ protected Flow.Publisher streamingPublisher() {
+ return subscriber -> {
+ throw new AssertionError("Streaming should not be used");
+ };
+ }
+
+ private static UnsupportedOperationException unused() {
+ return new UnsupportedOperationException();
+ }
+ }
+
+ private static final class AsyncStreamingRequestHandler extends RecordingRequestHandler {
+ @Override
+ protected Flow.Publisher streamingPublisher() {
+ SubmissionPublisher publisher = new SubmissionPublisher<>();
+ Thread.ofPlatform().start(() -> {
+ try {
+ Thread.sleep(25);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ publisher.closeExceptionally(e);
+ return;
+ }
+ publisher.submit(Message.builder()
+ .role(Message.Role.ROLE_AGENT)
+ .messageId("stream-1")
+ .parts(new TextPart("first"))
+ .build());
+ publisher.submit(Message.builder()
+ .role(Message.Role.ROLE_AGENT)
+ .messageId("stream-2")
+ .parts(new TextPart("second"))
+ .build());
+ publisher.close();
+ });
+ return publisher;
+ }
+ }
+}
diff --git a/adcp-spring-boot-starter/gradle.lockfile b/adcp-spring-boot-starter/gradle.lockfile
index 64d1aed..1e574f9 100644
--- a/adcp-spring-boot-starter/gradle.lockfile
+++ b/adcp-spring-boot-starter/gradle.lockfile
@@ -8,12 +8,34 @@ com.fasterxml.jackson.core:jackson-databind:2.20.1=compileClasspath,runtimeClass
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.fasterxml.jackson:jackson-bom:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.api.grpc:proto-google-common-protos:2.66.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.code.findbugs:jsr305:3.0.2=runtimeClasspath,testRuntimeClasspath
+com.google.code.gson:gson:2.14.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.errorprone:error_prone_annotations:2.48.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.protobuf:protobuf-java-util:4.33.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.protobuf:protobuf-java:4.33.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.networknt:json-schema-validator:2.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.micrometer:micrometer-commons:1.14.5=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.micrometer:micrometer-observation:1.14.5=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.modelcontextprotocol.sdk:mcp-core:1.1.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.modelcontextprotocol.sdk:mcp-json-jackson2:1.1.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.projectreactor:reactor-core:3.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+io.smallrye.reactive:mutiny-zero:1.1.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.annotation:jakarta.annotation-api:3.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.el:jakarta.el-api:6.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.enterprise:jakarta.enterprise.cdi-api:4.1.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.enterprise:jakarta.enterprise.lang-model:4.1.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.inject:jakarta.inject-api:2.0.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.interceptor:jakarta.interceptor-api:2.2.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client-transport-jsonrpc:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client-transport-spi:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-common:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-http-client:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-jsonrpc-common:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-server-common:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-spec-grpc:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-spec:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath
org.jspecify:jspecify:1.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.junit.jupiter:junit-jupiter-api:5.11.4=testCompileClasspath,testRuntimeClasspath
diff --git a/adcp-testing/gradle.lockfile b/adcp-testing/gradle.lockfile
index b8b4b03..4e503ff 100644
--- a/adcp-testing/gradle.lockfile
+++ b/adcp-testing/gradle.lockfile
@@ -3,22 +3,39 @@
# This file is expected to be part of source control.
com.ethlo.time:itu:1.10.3=runtimeClasspath
com.ethlo.time:itu:1.14.0=testCompileClasspath,testRuntimeClasspath
-com.fasterxml.jackson.core:jackson-annotations:2.18.2=compileClasspath
-com.fasterxml.jackson.core:jackson-annotations:2.20=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
-com.fasterxml.jackson.core:jackson-core:2.18.2=compileClasspath
-com.fasterxml.jackson.core:jackson-core:2.20.1=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
-com.fasterxml.jackson.core:jackson-databind:2.18.2=compileClasspath
-com.fasterxml.jackson.core:jackson-databind:2.20.1=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.fasterxml.jackson.core:jackson-annotations:2.20=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.fasterxml.jackson.core:jackson-core:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.fasterxml.jackson.core:jackson-databind:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.20.1=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
-com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.18.2=compileClasspath
-com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.20.1=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
-com.fasterxml.jackson:jackson-bom:2.18.2=compileClasspath
-com.fasterxml.jackson:jackson-bom:2.20.1=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.fasterxml.jackson:jackson-bom:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.api.grpc:proto-google-common-protos:2.66.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.code.findbugs:jsr305:3.0.2=runtimeClasspath,testRuntimeClasspath
+com.google.code.gson:gson:2.14.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.errorprone:error_prone_annotations:2.48.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.protobuf:protobuf-java-util:4.33.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.protobuf:protobuf-java:4.33.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.networknt:json-schema-validator:1.5.6=runtimeClasspath
com.networknt:json-schema-validator:2.0.0=testCompileClasspath,testRuntimeClasspath
io.modelcontextprotocol.sdk:mcp-core:1.1.2=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.modelcontextprotocol.sdk:mcp-json-jackson2:1.1.2=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.projectreactor:reactor-core:3.7.0=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+io.smallrye.reactive:mutiny-zero:1.1.1=testCompileClasspath,testRuntimeClasspath
+jakarta.annotation:jakarta.annotation-api:3.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.el:jakarta.el-api:6.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.enterprise:jakarta.enterprise.cdi-api:4.1.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.enterprise:jakarta.enterprise.lang-model:4.1.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.inject:jakarta.inject-api:2.0.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.interceptor:jakarta.interceptor-api:2.2.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client-transport-jsonrpc:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client-transport-spi:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-common:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-http-client:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-jsonrpc-common:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-server-common:1.0.0.CR1=testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-spec-grpc:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-spec:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.apiguardian:apiguardian-api:1.1.2=compileClasspath,testCompileClasspath
org.jspecify:jspecify:1.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.junit.jupiter:junit-jupiter-api:5.11.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
@@ -30,7 +47,6 @@ org.junit.platform:junit-platform-launcher:1.11.4=testRuntimeClasspath
org.junit:junit-bom:5.11.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.opentest4j:opentest4j:1.3.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.reactivestreams:reactive-streams:1.0.4=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
-org.slf4j:slf4j-api:2.0.16=compileClasspath,runtimeClasspath
-org.slf4j:slf4j-api:2.0.17=testCompileClasspath,testRuntimeClasspath
+org.slf4j:slf4j-api:2.0.17=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.yaml:snakeyaml:2.4=runtimeClasspath,testCompileClasspath,testRuntimeClasspath
empty=annotationProcessor,testAnnotationProcessor
diff --git a/adcp/build.gradle.kts b/adcp/build.gradle.kts
index 6550046..17e4559 100644
--- a/adcp/build.gradle.kts
+++ b/adcp/build.gradle.kts
@@ -21,6 +21,10 @@ dependencies {
implementation(libs.mcp.json.jackson2) {
exclude(group = "com.networknt", module = "json-schema-validator")
}
+ api(libs.a2a.sdk.client)
+ // Explicit deps for A2A classes used directly — not transitive reliance on a2a-sdk-client
+ implementation(libs.a2a.sdk.client.transport.jsonrpc) // JSONRPCTransport, JSONRPCTransportConfigBuilder
+ implementation(libs.a2a.sdk.http.client) // JdkA2AHttpClient
}
// -- Build-time SDK version constant ----------------------------------------
diff --git a/adcp/gradle.lockfile b/adcp/gradle.lockfile
index afd7482..5150e54 100644
--- a/adcp/gradle.lockfile
+++ b/adcp/gradle.lockfile
@@ -8,10 +8,30 @@ com.fasterxml.jackson.core:jackson-databind:2.20.1=compileClasspath,runtimeClass
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.fasterxml.jackson:jackson-bom:2.20.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.api.grpc:proto-google-common-protos:2.66.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.code.findbugs:jsr305:3.0.2=runtimeClasspath,testRuntimeClasspath
+com.google.code.gson:gson:2.14.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.errorprone:error_prone_annotations:2.48.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.protobuf:protobuf-java-util:4.33.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+com.google.protobuf:protobuf-java:4.33.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
com.networknt:json-schema-validator:1.5.6=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.modelcontextprotocol.sdk:mcp-core:1.1.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.modelcontextprotocol.sdk:mcp-json-jackson2:1.1.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
io.projectreactor:reactor-core:3.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.annotation:jakarta.annotation-api:3.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.el:jakarta.el-api:6.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.enterprise:jakarta.enterprise.cdi-api:4.1.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.enterprise:jakarta.enterprise.lang-model:4.1.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.inject:jakarta.inject-api:2.0.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+jakarta.interceptor:jakarta.interceptor-api:2.2.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client-transport-jsonrpc:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client-transport-spi:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-client:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-common:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-http-client:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-jsonrpc-common:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-spec-grpc:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.a2aproject.sdk:a2a-java-sdk-spec:1.0.0.CR1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath
org.jspecify:jspecify:1.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.junit.jupiter:junit-jupiter-api:5.11.4=testCompileClasspath,testRuntimeClasspath
@@ -23,6 +43,6 @@ org.junit.platform:junit-platform-launcher:1.11.4=testRuntimeClasspath
org.junit:junit-bom:5.11.4=testCompileClasspath,testRuntimeClasspath
org.opentest4j:opentest4j:1.3.0=testCompileClasspath,testRuntimeClasspath
org.reactivestreams:reactive-streams:1.0.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
-org.slf4j:slf4j-api:2.0.16=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
+org.slf4j:slf4j-api:2.0.17=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.yaml:snakeyaml:2.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
empty=annotationProcessor,testAnnotationProcessor
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/AdcpClient.java b/adcp/src/main/java/org/adcontextprotocol/adcp/AdcpClient.java
index 3a9cf19..92e5f02 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/AdcpClient.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/AdcpClient.java
@@ -7,6 +7,7 @@
import org.adcontextprotocol.adcp.schema.AdcpObjectMapperFactory;
import org.adcontextprotocol.adcp.transport.CallToolOptions;
import org.adcontextprotocol.adcp.transport.ProtocolClient;
+import org.adcontextprotocol.adcp.transport.a2a.A2aConnectionManager;
import org.adcontextprotocol.adcp.transport.mcp.McpConnectionManager;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
@@ -61,10 +62,13 @@ private AdcpClient(Builder builder) {
this.adcpHttpClient = AdcpHttpClient.builder()
.ssrfPolicy(ssrfPolicy)
.build();
- McpConnectionManager connectionManager = new McpConnectionManager(
+ McpConnectionManager mcpConnectionManager = new McpConnectionManager(
Duration.ofSeconds(10), builder.requestTimeout, adcpHttpClient);
+ A2aConnectionManager a2aConnectionManager = new A2aConnectionManager(
+ adcpHttpClient, this.objectMapper);
this.protocolClient = new ProtocolClient(
- this.objectMapper, ssrfPolicy, adcpVersion, connectionManager);
+ this.objectMapper, ssrfPolicy, adcpVersion,
+ mcpConnectionManager, a2aConnectionManager);
}
/** Creates a new builder. */
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/http/AdcpHttpClient.java b/adcp/src/main/java/org/adcontextprotocol/adcp/http/AdcpHttpClient.java
index dab20fd..b9176a2 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/http/AdcpHttpClient.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/http/AdcpHttpClient.java
@@ -167,6 +167,69 @@ public AdcpHttpResponse get(URI uri, Map headers)
return send("GET", uri, headers, null);
}
+ /**
+ * Sends a GET request to fetch an agent card, forwarding all caller-supplied headers
+ * including {@code Authorization}.
+ *
+ * Unlike {@link #get}, this method does not strip protected headers so that
+ * private agent-card endpoints can be reached with the same credentials used for
+ * the subsequent A2A RPC call. The safeguards that would normally make header
+ * forwarding risky (transparent redirect-follow, DNS rebinding) are neutralised
+ * by the client's {@code followRedirects(NEVER)} policy and SSRF validation.
+ *
+ * @param uri target URI (SSRF-validated)
+ * @param headers headers to forward, including any auth material
+ * @return the response, body capped at {@link #maxResponseBytes()}
+ */
+ public AdcpHttpResponse getForAgentCard(URI uri, Map headers)
+ throws IOException, InterruptedException {
+ Objects.requireNonNull(uri, "uri");
+
+ if (requireHttps && "http".equalsIgnoreCase(uri.getScheme())) {
+ String host = uri.getHost();
+ if (host != null && !isLoopback(host)) {
+ throw new IOException(
+ "Plain HTTP is not allowed when requireHttps is enabled: " + uri
+ + ". Use HTTPS or set requireHttps(false) for local development.");
+ }
+ }
+
+ URI validatedUri = validateUri(uri);
+
+ HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
+ .uri(validatedUri)
+ .timeout(readTimeout)
+ .header("User-Agent", userAgent);
+
+ // Forward all headers including auth — redirects are already blocked (NEVER)
+ // and the target was SSRF-validated above, so header leakage via redirect
+ // or cross-origin requests is not possible.
+ if (headers != null) {
+ headers.forEach((name, value) -> {
+ if (name != null && value != null) {
+ requestBuilder.header(name, value);
+ }
+ });
+ }
+
+ requestBuilder.method("GET", HttpRequest.BodyPublishers.noBody());
+
+ HttpResponse response = httpClient.send(
+ requestBuilder.build(),
+ HttpResponse.BodyHandlers.ofInputStream());
+
+ try {
+ return readBodyWithCap(response);
+ } catch (Throwable t) {
+ try {
+ response.body().close();
+ } catch (Exception suppressed) {
+ t.addSuppressed(suppressed);
+ }
+ throw t;
+ }
+ }
+
/**
* Convenience: POST request with a body.
*/
@@ -190,6 +253,14 @@ public long maxResponseBytes() {
* and redirect policy used by this client.
*/
public HttpClient.Builder newMcpClientBuilder() {
+ return newHttpClientBuilder();
+ }
+
+ /**
+ * Creates an HTTP client builder with this client's connection-timeout and
+ * redirect policy ({@code NEVER}). Suitable for any transport (MCP, A2A, etc.).
+ */
+ public HttpClient.Builder newHttpClientBuilder() {
return HttpClient.newBuilder()
.connectTimeout(connectTimeout)
.followRedirects(HttpClient.Redirect.NEVER);
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpObjectMapperFactory.java b/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpObjectMapperFactory.java
index 73ac477..5e7fca7 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpObjectMapperFactory.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpObjectMapperFactory.java
@@ -49,6 +49,9 @@ public static ObjectMapper create() {
// Intentionally NOT enabling FAIL_ON_UNKNOWN_PROPERTIES —
// SDK must tolerate fields added in newer protocol versions.
.build();
+ // Defense-in-depth: disable default typing to prevent deserialization gadget attacks.
+ // Jackson's default is off, but this makes it explicit and resilient to future config changes.
+ mapper.deactivateDefaultTyping();
// Widen stream constraints for AdCP creative payloads and deep catalogs
mapper.getFactory().setStreamReadConstraints(
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidator.java b/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidator.java
index d319d56..72701e0 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidator.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidator.java
@@ -78,6 +78,9 @@ private JsonSchema loadSchema(String uri) {
// Normalize: strip leading slash to form classpath resource path.
// Input: "/schemas/3.0.11/core/brand-ref.json" → "schemas/3.0.11/core/brand-ref.json"
String resourcePath = uri.startsWith("/") ? uri.substring(1) : uri;
+ if (resourcePath.contains("..")) {
+ throw new IllegalArgumentException("Invalid schema URI (path traversal): " + uri);
+ }
InputStream stream = getClass().getClassLoader().getResourceAsStream(resourcePath);
if (stream == null) {
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/schema/SchemaBundle.java b/adcp/src/main/java/org/adcontextprotocol/adcp/schema/SchemaBundle.java
index 6bc40c6..5a5faba 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/schema/SchemaBundle.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/schema/SchemaBundle.java
@@ -16,7 +16,11 @@
public final class SchemaBundle {
// Thread-safe: no reconfiguration after init. Do not add mapper.configure() calls in methods.
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER;
+ static {
+ MAPPER = new ObjectMapper();
+ MAPPER.deactivateDefaultTyping(); // defense-in-depth; consistent with AdcpObjectMapperFactory
+ }
private static final String SCHEMA_PREFIX = "schemas/";
private SchemaBundle() {}
@@ -29,6 +33,9 @@ private SchemaBundle() {}
* @throws IllegalArgumentException if the schema is not found on the classpath
*/
public static JsonNode load(String path) {
+ if (path == null || path.contains("..") || path.startsWith("/")) {
+ throw new IllegalArgumentException("Invalid schema path: " + path);
+ }
String resourcePath = SCHEMA_PREFIX + path;
try (InputStream stream = SchemaBundle.class.getClassLoader().getResourceAsStream(resourcePath)) {
if (stream == null) {
@@ -57,6 +64,9 @@ public static JsonNode loadIndex(String version) {
* @return {@code true} if the schema resource exists
*/
public static boolean exists(String path) {
+ if (path == null || path.contains("..") || path.startsWith("/")) {
+ return false;
+ }
String resourcePath = SCHEMA_PREFIX + path;
return SchemaBundle.class.getClassLoader().getResource(resourcePath) != null;
}
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/ProtocolClient.java b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/ProtocolClient.java
index 5076b1b..5a3bbe6 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/ProtocolClient.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/ProtocolClient.java
@@ -6,9 +6,11 @@
import org.adcontextprotocol.adcp.AgentConfig;
import org.adcontextprotocol.adcp.Protocol;
import org.adcontextprotocol.adcp.auth.AuthTokenResolver;
-import org.adcontextprotocol.adcp.error.FeatureUnsupportedError;
import org.adcontextprotocol.adcp.error.ProtocolError;
+import org.adcontextprotocol.adcp.http.ProtectedHeaders;
import org.adcontextprotocol.adcp.http.SsrfPolicy;
+import org.adcontextprotocol.adcp.transport.a2a.A2aCaller;
+import org.adcontextprotocol.adcp.transport.a2a.A2aConnectionManager;
import org.adcontextprotocol.adcp.transport.mcp.McpCaller;
import org.adcontextprotocol.adcp.transport.mcp.McpConnectionManager;
import org.jspecify.annotations.Nullable;
@@ -38,8 +40,10 @@ public final class ProtocolClient implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ProtocolClient.class);
- private final McpConnectionManager connectionManager;
+ private final McpConnectionManager mcpConnectionManager;
+ private final A2aConnectionManager a2aConnectionManager;
private final McpCaller mcpCaller;
+ private final A2aCaller a2aCaller;
private final SsrfPolicy ssrfPolicy;
private final @Nullable AdcpVersion adcpVersion;
@@ -49,13 +53,17 @@ public final class ProtocolClient implements AutoCloseable {
* @param objectMapper Jackson ObjectMapper for serialization
* @param ssrfPolicy SSRF policy for URL validation
* @param adcpVersion protocol version for the version envelope
- * @param connectionManager MCP connection manager (shared)
+ * @param mcpConnectionManager MCP connection manager (shared)
+ * @param a2aConnectionManager A2A connection manager (shared)
*/
public ProtocolClient(ObjectMapper objectMapper, SsrfPolicy ssrfPolicy,
@Nullable AdcpVersion adcpVersion,
- McpConnectionManager connectionManager) {
- this.connectionManager = connectionManager;
+ McpConnectionManager mcpConnectionManager,
+ A2aConnectionManager a2aConnectionManager) {
+ this.mcpConnectionManager = mcpConnectionManager;
+ this.a2aConnectionManager = a2aConnectionManager;
this.mcpCaller = new McpCaller(objectMapper);
+ this.a2aCaller = new A2aCaller(objectMapper);
this.ssrfPolicy = ssrfPolicy;
this.adcpVersion = adcpVersion;
}
@@ -75,14 +83,7 @@ public T callTool(AgentConfig agent, String toolName,
Map args, Class responseType,
CallToolOptions options) {
- // 1. Check protocol support early so unsupported transports fail fast
- if (agent.protocol() == org.adcontextprotocol.adcp.Protocol.A2A) {
- throw new FeatureUnsupportedError(
- List.of("A2A transport"),
- List.of("MCP"));
- }
-
- // 2. Validate agent URL against SSRF policy
+ // 1. Validate agent URL against SSRF policy
validateUrl(agent);
// 2. Warn if non-default options are passed (not yet enforced in v0.1)
@@ -110,8 +111,10 @@ public T callTool(AgentConfig agent, String toolName,
AdcpVersion version = agent.adcpVersion() != null ? agent.adcpVersion() : adcpVersion;
Map mergedArgs = VersionEnvelope.mergeInto(args, version);
- // 6. Dispatch to transport (A2A already rejected in step 1)
- return callViaMcp(agent, toolName, mergedArgs, allHeaders, responseType);
+ // 6. Dispatch to transport
+ return agent.protocol() == Protocol.A2A
+ ? callViaA2a(agent, toolName, mergedArgs, allHeaders, responseType)
+ : callViaMcp(agent, toolName, mergedArgs, allHeaders, responseType);
}
/**
@@ -124,7 +127,11 @@ public T callTool(AgentConfig agent, String toolName,
@Override
public void close() {
- connectionManager.close();
+ try {
+ mcpConnectionManager.close();
+ } finally {
+ a2aConnectionManager.close();
+ }
}
private T callViaMcp(AgentConfig agent, String toolName,
@@ -132,7 +139,7 @@ private T callViaMcp(AgentConfig agent, String toolName,
Map headers,
Class responseType) {
String cacheHash = computeCacheHash(agent);
- McpSyncClient client = connectionManager.getOrConnect(
+ McpSyncClient client = mcpConnectionManager.getOrConnect(
agent.agentUri(), headers, cacheHash);
try {
@@ -142,12 +149,12 @@ private T callViaMcp(AgentConfig agent, String toolName,
throw e;
}
// On transport error, evict and retry once
- connectionManager.evict(agent.agentUri(), cacheHash);
+ mcpConnectionManager.evict(agent.agentUri(), cacheHash);
log.debug("MCP transport error for {}, retrying after evict: {}",
toolName, e.getMessage());
ProtocolError original = e;
- client = connectionManager.getOrConnect(
+ client = mcpConnectionManager.getOrConnect(
agent.agentUri(), headers, cacheHash);
try {
return mcpCaller.callTool(client, toolName, mergedArgs, responseType);
@@ -169,15 +176,43 @@ private boolean isTransportError(ProtocolError e) {
return false;
}
+ private T callViaA2a(AgentConfig agent, String toolName,
+ Map mergedArgs,
+ Map headers,
+ Class responseType) {
+ String cacheHash = computeCacheHash(agent);
+ var client = a2aConnectionManager.getOrConnect(agent, headers, cacheHash);
+ try {
+ return a2aCaller.callTool(client, toolName, mergedArgs, responseType, headers);
+ } catch (ProtocolError e) {
+ if (!isTransportError(e)) {
+ throw e;
+ }
+ a2aConnectionManager.evict(agent.agentUri(), cacheHash);
+ log.debug("A2A transport error for {}, retrying after evict: {}",
+ toolName, e.getMessage());
+
+ ProtocolError original = e;
+ client = a2aConnectionManager.getOrConnect(agent, headers, cacheHash);
+ try {
+ return a2aCaller.callTool(client, toolName, mergedArgs, responseType, headers);
+ } catch (ProtocolError retry) {
+ retry.addSuppressed(original);
+ throw retry;
+ }
+ }
+ }
+
private void validateUrl(AgentConfig agent) {
+ String protocol = agent.protocol() == Protocol.A2A ? "a2a" : "mcp";
String scheme = agent.agentUri().getScheme();
if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) {
- throw new ProtocolError("mcp",
+ throw new ProtocolError(protocol,
"Agent URI scheme must be http or https: " + agent.agentUri(), null);
}
String host = agent.agentUri().getHost();
if (host == null) {
- throw new ProtocolError("mcp",
+ throw new ProtocolError(protocol,
"Agent URI has no host: " + agent.agentUri(), null);
}
// Resolve DNS and validate all addresses against SSRF policy.
@@ -192,10 +227,10 @@ private void validateUrl(AgentConfig agent) {
addr, ssrfPolicy);
}
} catch (org.adcontextprotocol.adcp.http.SsrfBlockedException e) {
- throw new ProtocolError("mcp",
+ throw new ProtocolError(protocol,
"Agent URI blocked by SSRF policy", e);
} catch (java.net.UnknownHostException e) {
- throw new ProtocolError("mcp",
+ throw new ProtocolError(protocol,
"Cannot resolve agent host", e);
}
}
@@ -223,7 +258,11 @@ private static String computeCacheHash(AgentConfig agent) {
Mac mac = createHmac();
mac.update(tokenHash.getBytes(StandardCharsets.UTF_8));
mac.update((byte) '\0');
+ // Only hash headers that are actually sent on the wire — protected headers
+ // (Authorization, Cookie, etc.) are stripped by AdcpHttpClient before each
+ // request, so including them would fragment the cache without any effect.
agent.extraHeaders().entrySet().stream()
+ .filter(e -> !ProtectedHeaders.isProtected(e.getKey()))
.sorted(Map.Entry.comparingByKey())
.forEach(e -> {
mac.update(e.getKey().getBytes(StandardCharsets.UTF_8));
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/A2aCaller.java b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/A2aCaller.java
new file mode 100644
index 0000000..b8d73eb
--- /dev/null
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/A2aCaller.java
@@ -0,0 +1,316 @@
+package org.adcontextprotocol.adcp.transport.a2a;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.client.ClientEvent;
+import org.a2aproject.sdk.client.MessageEvent;
+import org.a2aproject.sdk.client.TaskEvent;
+import org.a2aproject.sdk.client.TaskUpdateEvent;
+import org.a2aproject.sdk.client.transport.spi.interceptors.ClientCallContext;
+import org.a2aproject.sdk.spec.A2AClientException;
+import org.a2aproject.sdk.spec.DataPart;
+import org.a2aproject.sdk.spec.Message;
+import org.a2aproject.sdk.spec.MessageSendParams;
+import org.a2aproject.sdk.spec.Part;
+import org.a2aproject.sdk.spec.Task;
+import org.a2aproject.sdk.spec.TaskState;
+import org.a2aproject.sdk.spec.TextPart;
+import org.adcontextprotocol.adcp.error.ProtocolError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Calls AdCP tools over A2A JSON-RPC + SSE.
+ */
+public final class A2aCaller {
+
+ private static final Logger log = LoggerFactory.getLogger(A2aCaller.class);
+ private static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024;
+ private static final int MAX_ERROR_LENGTH = 500;
+ private static final long RESPONSE_TIMEOUT_SECONDS = 30;
+ private static final String TOOL_NAME_KEY = "adcp_tool_name";
+ private static final int MAX_HISTORY_SCAN = 20;
+ private static final int MAX_PARTS_SCAN = 20;
+
+ private static final int MAX_TOOL_NAME_LENGTH = 256;
+
+ private final ObjectMapper objectMapper;
+
+ public A2aCaller(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper.copy();
+ this.objectMapper.deactivateDefaultTyping();
+ }
+
+ public T callTool(Client client, String toolName,
+ Map args, Class responseType) {
+ return callTool(new ClientAdapter(client), toolName, args, responseType, Map.of());
+ }
+
+ public T callTool(Client client, String toolName,
+ Map args, Class responseType,
+ Map headers) {
+ for (var entry : headers.entrySet()) {
+ String k = entry.getKey();
+ String v = entry.getValue();
+ if (k == null || k.indexOf('\r') >= 0 || k.indexOf('\n') >= 0
+ || v == null || v.indexOf('\r') >= 0 || v.indexOf('\n') >= 0) {
+ throw new IllegalArgumentException(
+ "callTool headers must not contain CR/LF or null: " + k);
+ }
+ }
+ return callTool(new ClientAdapter(client), toolName, args, responseType, headers);
+ }
+
+ T callTool(A2aMessageClient client, String toolName,
+ Map args, Class responseType,
+ Map headers) {
+ // Validate toolName before use — reject rather than silently mutate the outbound request
+ if (toolName == null || toolName.isBlank()) {
+ throw new IllegalArgumentException("toolName must not be null or blank");
+ }
+ if (toolName.length() > MAX_TOOL_NAME_LENGTH) {
+ throw new IllegalArgumentException(
+ "toolName exceeds max length of " + MAX_TOOL_NAME_LENGTH + ": " + toolName.length());
+ }
+ if (toolName.chars().anyMatch(Character::isISOControl)) {
+ throw new IllegalArgumentException("toolName must not contain control characters");
+ }
+ // Sanitized copy used only in log/error strings — the original is sent on the wire
+ final String safeToolName = toolName.replaceAll("[\\p{Cc}]", "");
+
+ CountDownLatch completion = new CountDownLatch(1);
+ AtomicReference latestMessage = new AtomicReference<>();
+ AtomicReference latestTask = new AtomicReference<>();
+ AtomicReference failure = new AtomicReference<>();
+
+ List> consumers = List.of((event, card) -> {
+ if (event instanceof MessageEvent messageEvent) {
+ latestMessage.set(messageEvent.getMessage());
+ completion.countDown();
+ } else if (event instanceof TaskEvent taskEvent) {
+ latestTask.set(taskEvent.getTask());
+ if (isTerminal(taskEvent.getTask())) {
+ completion.countDown();
+ }
+ } else if (event instanceof TaskUpdateEvent taskUpdateEvent) {
+ latestTask.set(taskUpdateEvent.getTask());
+ if (isTerminal(taskUpdateEvent.getTask())) {
+ completion.countDown();
+ }
+ }
+ });
+ Consumer errorHandler = throwable -> {
+ if (throwable != null) {
+ failure.compareAndSet(null, throwable);
+ completion.countDown();
+ }
+ };
+
+ try {
+ client.sendMessage(buildRequest(toolName, args), consumers, errorHandler,
+ new ClientCallContext(Map.of(), headers));
+
+ // Guard for synchronous-delivery clients that invoke callbacks inline
+ // before sendMessage() returns; the latch is already at 0 in that case
+ // so this countDown() is a no-op — but we ensure we don't await forever
+ // if the client is synchronous and never fires the error handler.
+ // Only trigger on terminal results to avoid counting down on in-progress tasks.
+ if (completion.getCount() > 0
+ && (latestMessage.get() != null || isTerminal(latestTask.get()) || failure.get() != null)) {
+ completion.countDown();
+ }
+
+ if (!completion.await(RESPONSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ throw new ProtocolError("a2a",
+ "Timed out waiting for A2A response for " + safeToolName, null);
+ }
+ if (failure.get() != null) {
+ throw wrapFailure(safeToolName, failure.get());
+ }
+ return extractResponse(latestMessage.get(), latestTask.get(), responseType);
+ } catch (ProtocolError e) {
+ throw e;
+ } catch (A2AClientException e) {
+ throw wrapFailure(safeToolName, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ProtocolError("a2a",
+ "Interrupted while waiting for A2A response for " + safeToolName, e);
+ }
+ }
+
+ private MessageSendParams buildRequest(String toolName, Map args) {
+ Message message = Message.builder()
+ .role(Message.Role.ROLE_USER)
+ .messageId(UUID.randomUUID().toString())
+ .metadata(Map.of(TOOL_NAME_KEY, toolName))
+ .parts(new TextPart(toolName), new DataPart(args))
+ .build();
+ return MessageSendParams.builder()
+ .message(message)
+ .build();
+ }
+
+ private T extractResponse(Message message, Task task, Class responseType) {
+ if (message != null) {
+ return extractFromParts(message.parts(), responseType);
+ }
+ if (task != null) {
+ if (task.status() != null && task.status().state() == TaskState.TASK_STATE_FAILED) {
+ throw new ProtocolError("a2a",
+ "A2A task failed: " + sanitizeErrorText(extractMessageText(task.status().message())),
+ null);
+ }
+ if (task.status() != null && task.status().state() == TaskState.TASK_STATE_CANCELED) {
+ throw new ProtocolError("a2a",
+ "A2A task was canceled: " + sanitizeErrorText(extractMessageText(task.status().message())),
+ null);
+ }
+ if (task.status() != null && task.status().message() != null) {
+ return extractFromParts(task.status().message().parts(), responseType);
+ }
+ if (task.history() != null && !task.history().isEmpty()) {
+ int limit = Math.min(task.history().size(), MAX_HISTORY_SCAN);
+ for (int i = task.history().size() - 1; i >= task.history().size() - limit; i--) {
+ Message historyMessage = task.history().get(i);
+ if (historyMessage != null && historyMessage.parts() != null && !historyMessage.parts().isEmpty()) {
+ return extractFromParts(historyMessage.parts(), responseType);
+ }
+ }
+ }
+ }
+ throw new ProtocolError("a2a", "Empty response from A2A sendMessage", null);
+ }
+
+ private T extractFromParts(List> parts, Class responseType) {
+ if (parts == null || parts.isEmpty()) {
+ throw new ProtocolError("a2a", "A2A response message had no parts", null);
+ }
+
+ int scanLimit = Math.min(parts.size(), MAX_PARTS_SCAN);
+ Exception firstParseError = null;
+ for (int i = 0; i < scanLimit; i++) {
+ Part> part = parts.get(i);
+ if (part instanceof DataPart dataPart) {
+ try {
+ byte[] bytes = objectMapper.writeValueAsBytes(dataPart.data());
+ if (bytes.length > MAX_CONTENT_LENGTH) {
+ throw new ProtocolError("a2a",
+ "A2A DataPart response exceeds size limit ("
+ + bytes.length + " > " + MAX_CONTENT_LENGTH + ")",
+ null);
+ }
+ return objectMapper.readValue(bytes, responseType);
+ } catch (ProtocolError e) {
+ throw e;
+ } catch (Exception e) {
+ if (firstParseError == null) {
+ firstParseError = e;
+ }
+ log.debug("Failed to parse A2A DataPart as {}: {}",
+ responseType.getSimpleName(), e.getMessage());
+ }
+ } else if (part instanceof TextPart textPart) {
+ String text = textPart.text();
+ if (text == null) {
+ continue;
+ }
+ int textBytes = text.getBytes(StandardCharsets.UTF_8).length;
+ if (textBytes > MAX_CONTENT_LENGTH) {
+ throw new ProtocolError("a2a",
+ "A2A response content exceeds size limit ("
+ + textBytes + " > " + MAX_CONTENT_LENGTH + ")",
+ null);
+ }
+ try {
+ return objectMapper.readValue(text, responseType);
+ } catch (Exception e) {
+ if (firstParseError == null) {
+ firstParseError = e;
+ }
+ log.debug("Failed to parse A2A TextPart as {}: {}",
+ responseType.getSimpleName(), sanitizeErrorText(e.getMessage()));
+ }
+ }
+ }
+
+ Part> first = parts.get(0);
+ try {
+ JsonNode node = objectMapper.valueToTree(first);
+ return objectMapper.treeToValue(node, responseType);
+ } catch (Exception e) {
+ if (firstParseError != null) {
+ e.addSuppressed(firstParseError);
+ }
+ throw new ProtocolError("a2a",
+ "Cannot deserialize A2A response to " + responseType.getSimpleName(), e);
+ }
+ }
+
+ private boolean isTerminal(Task task) {
+ return task != null && task.status() != null && task.status().state() != null
+ && task.status().state().isFinal();
+ }
+
+ private ProtocolError wrapFailure(String toolName, Throwable throwable) {
+ String message = throwable.getMessage();
+ return new ProtocolError("a2a",
+ "A2A sendMessage failed for " + toolName + ": " + sanitizeErrorText(message),
+ throwable);
+ }
+
+ private static String extractMessageText(Message message) {
+ if (message == null || message.parts() == null) {
+ return "(no error detail)";
+ }
+ List texts = new ArrayList<>();
+ int limit = Math.min(message.parts().size(), MAX_PARTS_SCAN);
+ for (int i = 0; i < limit; i++) {
+ Part> part = message.parts().get(i);
+ if (part instanceof TextPart textPart && textPart.text() != null) {
+ String text = textPart.text();
+ texts.add(text.length() > MAX_ERROR_LENGTH ? text.substring(0, MAX_ERROR_LENGTH) : text);
+ }
+ }
+ return texts.isEmpty() ? "(no error detail)" : String.join("\n", texts);
+ }
+
+ private static String sanitizeErrorText(String raw) {
+ if (raw == null || raw.isBlank()) {
+ return "(no error detail)";
+ }
+ String truncated = raw.length() > MAX_ERROR_LENGTH
+ ? raw.substring(0, MAX_ERROR_LENGTH) + "..."
+ : raw;
+ return truncated.replaceAll("[\\p{Cc}]", "");
+ }
+
+ interface A2aMessageClient {
+ void sendMessage(MessageSendParams params,
+ List> consumers,
+ Consumer errorHandler,
+ ClientCallContext context) throws A2AClientException;
+ }
+
+ private record ClientAdapter(Client delegate) implements A2aMessageClient {
+ @Override
+ public void sendMessage(MessageSendParams params,
+ List> consumers,
+ Consumer errorHandler,
+ ClientCallContext context) throws A2AClientException {
+ delegate.sendMessage(params, consumers, errorHandler, context);
+ }
+ }
+}
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/A2aConnectionManager.java b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/A2aConnectionManager.java
new file mode 100644
index 0000000..15606f8
--- /dev/null
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/A2aConnectionManager.java
@@ -0,0 +1,432 @@
+package org.adcontextprotocol.adcp.transport.a2a;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.client.config.ClientConfig;
+import org.a2aproject.sdk.client.http.JdkA2AHttpClient;
+import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
+import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfigBuilder;
+import org.a2aproject.sdk.spec.A2AClientException;
+import org.a2aproject.sdk.spec.AgentCapabilities;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.AgentInterface;
+import org.adcontextprotocol.adcp.AgentConfig;
+import org.adcontextprotocol.adcp.error.ProtocolError;
+import org.adcontextprotocol.adcp.http.AdcpHttpClient;
+import org.adcontextprotocol.adcp.http.AdcpHttpResponse;
+import org.adcontextprotocol.adcp.http.ProtectedHeaders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Manages cached A2A clients keyed by agent URL, credential cache hash, and
+ * non-secret sanitized discovery headers.
+ *
+ * Headers are included in the cache key because agent-card discovery is
+ * header-sensitive. The separate cache hash isolates clients by credentials
+ * without storing raw secrets in the cache key.
+ */
+public final class A2aConnectionManager implements AutoCloseable {
+
+ private static final Logger log = LoggerFactory.getLogger(A2aConnectionManager.class);
+ static final int MAX_CACHE_SIZE = 20;
+ private static final int STRIPE_COUNT = 32;
+ private static final int MAX_HEADERS = 50;
+ private static final String JSONRPC_TRANSPORT = "JSONRPC";
+
+ private final LinkedHashMap cache = new LinkedHashMap<>(16, 0.75f, true);
+ private final ReentrantLock cacheLock = new ReentrantLock();
+ private final Semaphore[] connectStripes;
+ private final AgentCardLoader agentCardLoader;
+ private final ClientFactory clientFactory;
+ private volatile boolean closed;
+
+ public A2aConnectionManager(AdcpHttpClient adcpHttpClient, ObjectMapper objectMapper) {
+ this(new HttpAgentCardLoader(adcpHttpClient, objectMapper), new DefaultClientFactory(adcpHttpClient));
+ }
+
+ A2aConnectionManager(AgentCardLoader agentCardLoader, ClientFactory clientFactory) {
+ this.agentCardLoader = Objects.requireNonNull(agentCardLoader, "agentCardLoader");
+ this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory");
+ this.connectStripes = new Semaphore[STRIPE_COUNT];
+ for (int i = 0; i < STRIPE_COUNT; i++) {
+ connectStripes[i] = new Semaphore(1);
+ }
+ }
+
+ public Client getOrConnect(AgentConfig agent, Map headers, String cacheHash) {
+ if (closed) {
+ throw new IllegalStateException("A2aConnectionManager is closed");
+ }
+ Objects.requireNonNull(cacheHash, "cacheHash");
+ Objects.requireNonNull(headers, "headers");
+ Map sanitizedAll = sanitizeHeaders(headers);
+ Map sanitizedForKey = filterProtected(sanitizedAll);
+ String cacheKey = buildCacheKey(agent.agentUri(), sanitizedForKey, cacheHash);
+
+ cacheLock.lock();
+ try {
+ Client existing = cache.get(cacheKey);
+ if (existing != null) {
+ return existing;
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+
+ Semaphore stripe = connectStripes[(cacheKey.hashCode() & 0x7FFFFFFF) % STRIPE_COUNT];
+ try {
+ stripe.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ProtocolError("a2a", "Interrupted while connecting to " + agent.agentUri(), e);
+ }
+
+ try {
+ cacheLock.lock();
+ try {
+ if (closed) {
+ throw new IllegalStateException("A2aConnectionManager is closed");
+ }
+ Client existing = cache.get(cacheKey);
+ if (existing != null) {
+ return existing;
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+
+ Client client = connect(agent, sanitizedAll);
+
+ cacheLock.lock();
+ try {
+ if (closed) {
+ closeQuietly(client);
+ throw new IllegalStateException("A2aConnectionManager is closed");
+ }
+ cache.put(cacheKey, client);
+ evictOldest();
+ } finally {
+ cacheLock.unlock();
+ }
+ return client;
+ } finally {
+ stripe.release();
+ }
+ }
+
+ public void evict(URI agentUri) {
+ // buildCacheKey always produces "agentUri#cacheHash[?headers]", so the bare
+ // agentUri.toString() can never equal a cache key — only startsWith is needed.
+ evictMatching(key -> key.startsWith(agentUri + "#"));
+ }
+
+ public void evict(URI agentUri, String cacheHash) {
+ Objects.requireNonNull(cacheHash, "cacheHash");
+ String prefix = agentUri + "#" + cacheHash;
+ // Evicts all cache entries for the given agent URI and credential hash,
+ // regardless of which non-secret discovery headers they used.
+ // This is intentional: on a transport error, all variants for those
+ // credentials are assumed stale.
+ evictMatching(key -> key.equals(prefix) || key.startsWith(prefix + "?"));
+ }
+
+ @Override
+ public void close() {
+ cacheLock.lock();
+ try {
+ closed = true;
+ cache.values().forEach(this::closeQuietly);
+ cache.clear();
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+
+ private Client connect(AgentConfig agent, Map sanitizedHeaders) {
+ try {
+ AgentCard card = agentCardLoader.load(agent, sanitizedHeaders);
+ return clientFactory.create(card);
+ } catch (ProtocolError e) {
+ throw e;
+ } catch (A2AClientException e) {
+ throw new ProtocolError("a2a", "Failed to create A2A client for " + agent.agentUri(), e);
+ } catch (Exception e) {
+ throw new ProtocolError("a2a", "Failed to connect to A2A agent " + agent.agentUri(), e);
+ }
+ }
+
+ /**
+ * Builds a stable cache key from the agent URI, credential cache hash, and
+ * non-secret sanitized discovery headers. Headers are sorted by name and URL-encoded so
+ * the key is independent of insertion order and immune to key-collision via
+ * crafted {@code =} or {@code &} characters.
+ */
+ static String buildCacheKey(URI agentUri, Map sanitizedHeaders, String cacheHash) {
+ StringBuilder sb = new StringBuilder(agentUri.toString())
+ .append('#')
+ .append(cacheHash);
+ if (sanitizedHeaders.isEmpty()) {
+ return sb.toString();
+ }
+ sb.append('?');
+ // Normalize header key case so semantically-identical headers with different casing
+ // (e.g. X-Tenant vs x-tenant) always produce the same cache key. Pre-sort by the
+ // original key (case-sensitive TreeMap) before lowercasing so that among
+ // case-insensitive duplicates the alphabetically-last original key always wins,
+ // making resolution deterministic regardless of the input map's iteration order.
+ TreeMap normalizedHeaders = new TreeMap<>();
+ for (var entry : new TreeMap<>(sanitizedHeaders).entrySet()) {
+ normalizedHeaders.put(entry.getKey().toLowerCase(java.util.Locale.ROOT), entry.getValue());
+ }
+ boolean first = true;
+ for (var entry : normalizedHeaders.entrySet()) {
+ if (!first) {
+ sb.append('&');
+ }
+ first = false;
+ sb.append(URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8))
+ .append('=')
+ .append(URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8));
+ }
+ return sb.toString();
+ }
+
+ private void evictOldest() {
+ while (cache.size() > MAX_CACHE_SIZE) {
+ var it = cache.entrySet().iterator();
+ if (it.hasNext()) {
+ var entry = it.next();
+ it.remove();
+ closeQuietly(entry.getValue());
+ }
+ }
+ }
+
+ private static Map sanitizeHeaders(Map headers) {
+ Map sanitized = new LinkedHashMap<>();
+ for (var entry : headers.entrySet()) {
+ if (sanitized.size() >= MAX_HEADERS) {
+ log.warn("Ignoring excess A2A discovery headers (>{}) to prevent cache-key bloat", MAX_HEADERS);
+ break;
+ }
+ String name = entry.getKey();
+ String value = entry.getValue();
+ if (name == null || value == null || hasCrlf(name) || hasCrlf(value)) {
+ log.warn("Rejecting A2A discovery header (null or CR/LF): {}", sanitizeForLog(name));
+ continue;
+ }
+ sanitized.put(name, value);
+ }
+ return sanitized;
+ }
+
+ private static Map filterProtected(Map headers) {
+ Map filtered = new LinkedHashMap<>();
+ for (var entry : headers.entrySet()) {
+ if (ProtectedHeaders.isProtected(entry.getKey())) {
+ continue;
+ }
+ filtered.put(entry.getKey(), entry.getValue());
+ }
+ return filtered;
+ }
+
+ private void evictMatching(java.util.function.Predicate matcher) {
+ cacheLock.lock();
+ try {
+ List toEvict = new ArrayList<>();
+ for (String key : cache.keySet()) {
+ if (matcher.test(key)) {
+ toEvict.add(key);
+ }
+ }
+ for (String key : toEvict) {
+ Client evicted = cache.remove(key);
+ if (evicted != null) {
+ closeQuietly(evicted);
+ }
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+
+ private static boolean hasCrlf(String s) {
+ return s.indexOf('\r') >= 0 || s.indexOf('\n') >= 0;
+ }
+
+ /** Strips all control characters and truncates for safe inclusion in log messages. */
+ private static String sanitizeForLog(String s) {
+ if (s == null) return "(null)";
+ String t = s.length() > 128 ? s.substring(0, 128) + "..." : s;
+ return t.replaceAll("[\\p{Cc}]", "");
+ }
+
+ private void closeQuietly(Client client) {
+ try {
+ if (client != null) {
+ client.close();
+ }
+ } catch (Exception e) {
+ log.debug("Error closing A2A client: {}", sanitizeLogText(e.getMessage()));
+ }
+ }
+
+ private static String sanitizeLogText(String raw) {
+ if (raw == null || raw.isBlank()) {
+ return "(no detail)";
+ }
+ String truncated = raw.length() > 256 ? raw.substring(0, 256) + "..." : raw;
+ return truncated.replaceAll("[\\p{Cc}]", "");
+ }
+
+ interface AgentCardLoader {
+ AgentCard load(AgentConfig agent, Map headers);
+ }
+
+ interface ClientFactory {
+ Client create(AgentCard agentCard) throws A2AClientException;
+ }
+
+ private static final class DefaultClientFactory implements ClientFactory {
+ /**
+ * SSRF-safe HTTP client used by the A2A JSON-RPC transport.
+ * Backed by the same {@link java.net.http.HttpClient} that was built with
+ * {@code followRedirects(NEVER)} and the configured connect timeout, so
+ * the transport cannot follow HTTP redirects to internal addresses.
+ * Combined with {@link HttpAgentCardLoader#normalize} pinning AgentCard
+ * URLs to the validated agent URI, this closes the SSRF bypass that would
+ * otherwise exist in the default {@code JdkA2AHttpClient} (which uses
+ * {@code Redirect.NORMAL}).
+ */
+ private final org.a2aproject.sdk.client.http.A2AHttpClient safeHttpClient;
+
+ DefaultClientFactory(AdcpHttpClient adcpHttpClient) {
+ this.safeHttpClient = new JdkA2AHttpClient(
+ adcpHttpClient.newHttpClientBuilder().build());
+ }
+
+ @Override
+ public Client create(AgentCard agentCard) throws A2AClientException {
+ ClientConfig config = ClientConfig.builder()
+ .setStreaming(true)
+ .setUseClientPreference(true)
+ .build();
+ return Client.builder(agentCard)
+ .clientConfig(config)
+ .withTransport(JSONRPCTransport.class,
+ new JSONRPCTransportConfigBuilder().httpClient(safeHttpClient))
+ .build();
+ }
+ }
+
+ private static final class HttpAgentCardLoader implements AgentCardLoader {
+ private final AdcpHttpClient adcpHttpClient;
+ private final ObjectMapper objectMapper;
+
+ private HttpAgentCardLoader(AdcpHttpClient adcpHttpClient, ObjectMapper objectMapper) {
+ this.adcpHttpClient = Objects.requireNonNull(adcpHttpClient, "adcpHttpClient");
+ this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper").copy();
+ this.objectMapper.deactivateDefaultTyping();
+ }
+
+ @Override
+ public AgentCard load(AgentConfig agent, Map headers) {
+ URI cardUri = buildAgentCardUri(agent.agentUri());
+ try {
+ // Use getForAgentCard so that auth headers (e.g. Authorization) are forwarded
+ // to private agent-card endpoints. Redirect-following is already blocked by
+ // the underlying HttpClient (followRedirects=NEVER) and SSRF validation is
+ // enforced inside getForAgentCard, so header leakage is not possible.
+ AdcpHttpResponse response = adcpHttpClient.getForAgentCard(cardUri, headers);
+ if (response.statusCode() >= 200 && response.statusCode() < 300 && !response.truncated()) {
+ AgentCard parsed = objectMapper.readValue(response.body(), AgentCard.class);
+ return normalize(parsed, agent.agentUri());
+ }
+ log.debug("Falling back to synthetic A2A AgentCard for {} (status={}, truncated={})",
+ agent.agentUri(), response.statusCode(), response.truncated());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ProtocolError("a2a", "Interrupted while fetching A2A agent card from " + cardUri, e);
+ } catch (IOException e) {
+ log.debug("Falling back to synthetic A2A AgentCard for {}: {}", agent.agentUri(), e.getMessage());
+ }
+ return fallbackCard(agent);
+ }
+
+ private static URI buildAgentCardUri(URI baseUri) {
+ // The A2A spec default is /.well-known/agent-card.json on the origin root
+ // (scheme + authority), not appended to the agent URI's path component.
+ return URI.create(baseUri.getScheme() + "://" + baseUri.getAuthority() + "/.well-known/agent-card.json");
+ }
+
+ private static AgentCard normalize(AgentCard card, URI baseUri) {
+ AgentCard.Builder builder = AgentCard.builder(card);
+ // SECURITY (C-2): Always pin url and supportedInterfaces to the validated
+ // baseUri, regardless of what the remote agent card declares. The agent-card
+ // fetch was SSRF-validated; any URL the server embeds in its card is untrusted
+ // and could redirect subsequent JSON-RPC calls to internal network addresses.
+ builder.url(baseUri.toString());
+ builder.supportedInterfaces(List.of(new AgentInterface(JSONRPC_TRANSPORT, baseUri.toString())));
+ if (card.name() == null || card.name().isBlank()) {
+ builder.name(baseUri.getHost() != null ? baseUri.getHost() : baseUri.toString());
+ }
+ if (card.description() == null || card.description().isBlank()) {
+ builder.description("AdCP agent at " + baseUri);
+ }
+ if (card.version() == null || card.version().isBlank()) {
+ builder.version("unknown");
+ }
+ if (card.preferredTransport() == null || card.preferredTransport().isBlank()) {
+ builder.preferredTransport(JSONRPC_TRANSPORT);
+ }
+ if (card.capabilities() == null) {
+ builder.capabilities(AgentCapabilities.builder().streaming(true).pushNotifications(false).build());
+ }
+ if (card.defaultInputModes() == null) {
+ builder.defaultInputModes(List.of("text"));
+ }
+ if (card.defaultOutputModes() == null) {
+ builder.defaultOutputModes(List.of("text"));
+ }
+ if (card.skills() == null) {
+ builder.skills(List.of());
+ }
+ return builder.build();
+ }
+
+ private static AgentCard fallbackCard(AgentConfig agent) {
+ String version = agent.adcpVersion() != null && agent.adcpVersion().minorVersion() != null
+ ? agent.adcpVersion().minorVersion()
+ : agent.adcpVersion() != null
+ ? String.valueOf(agent.adcpVersion().majorVersion())
+ : "unknown";
+ return AgentCard.builder()
+ .name(agent.id())
+ .description("AdCP agent " + agent.id())
+ .version(version)
+ .url(agent.agentUri().toString())
+ .preferredTransport(JSONRPC_TRANSPORT)
+ .capabilities(AgentCapabilities.builder().streaming(true).pushNotifications(false).build())
+ .supportedInterfaces(List.of(new AgentInterface(JSONRPC_TRANSPORT, agent.agentUri().toString())))
+ .defaultInputModes(List.of("text"))
+ .defaultOutputModes(List.of("text"))
+ .skills(List.of())
+ .build();
+ }
+ }
+}
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/package-info.java b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/package-info.java
new file mode 100644
index 0000000..faca1f0
--- /dev/null
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/a2a/package-info.java
@@ -0,0 +1,6 @@
+/**
+ * A2A caller-side transport support: agent-card discovery, cached A2A clients,
+ * and tool dispatch over A2A JSON-RPC + SSE.
+ */
+@org.jspecify.annotations.NullMarked
+package org.adcontextprotocol.adcp.transport.a2a;
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/mcp/package-info.java b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/mcp/package-info.java
index 6bf5734..5b5a591 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/mcp/package-info.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/mcp/package-info.java
@@ -1,2 +1,6 @@
+/**
+ * MCP transport support for the caller-side SDK: connection caching,
+ * StreamableHTTP/SSE negotiation, and tool-response deserialization.
+ */
@org.jspecify.annotations.NullMarked
package org.adcontextprotocol.adcp.transport.mcp;
diff --git a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/package-info.java b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/package-info.java
index 307e97e..3d9a910 100644
--- a/adcp/src/main/java/org/adcontextprotocol/adcp/transport/package-info.java
+++ b/adcp/src/main/java/org/adcontextprotocol/adcp/transport/package-info.java
@@ -1,2 +1,8 @@
+/**
+ * Transport dispatch for AdCP caller operations.
+ *
+ * Routes tool calls over MCP or A2A, applies auth/header merging,
+ * version envelopes, SSRF validation, and transport-specific retry logic.
+ */
@org.jspecify.annotations.NullMarked
package org.adcontextprotocol.adcp.transport;
diff --git a/adcp/src/test/java/org/adcontextprotocol/adcp/AdcpClientTest.java b/adcp/src/test/java/org/adcontextprotocol/adcp/AdcpClientTest.java
index 126ef9c..2877cce 100644
--- a/adcp/src/test/java/org/adcontextprotocol/adcp/AdcpClientTest.java
+++ b/adcp/src/test/java/org/adcontextprotocol/adcp/AdcpClientTest.java
@@ -72,41 +72,34 @@ void close_is_idempotent() {
}
@Test
- void a2a_protocol_rejected_at_call_time() {
+ void builder_accepts_a2a_protocol() {
AgentConfig a2aAgent = AgentConfig.builder()
.id("a2a")
.agentUri(AGENT_URI)
.protocol(Protocol.A2A)
.build();
- // A2A rejection happens at callTool dispatch (ProtocolClient)
+
try (AdcpClient client = AdcpClient.builder()
.agent(a2aAgent)
.ssrfPolicy(SsrfPolicy.permissive())
.build()) {
- var ex = assertThrows(org.adcontextprotocol.adcp.error.FeatureUnsupportedError.class,
- () -> client.callTool("get_products",
- java.util.Map.of(), java.util.Map.class));
- assertTrue(ex.getMessage().contains("A2A"));
+ assertEquals(Protocol.A2A, client.agent().protocol());
}
}
@Test
- void callTool_accepts_null_args_without_npe() {
- // Null args should be treated as empty map, not throw NPE.
- // The call will fail at transport (no server), but the null-guard
- // in callTool must normalise to Map.of() before that point.
+ void a2a_callTool_accepts_null_args_without_npe() {
AgentConfig a2aAgent = AgentConfig.builder()
.id("a2a")
- .agentUri(AGENT_URI)
+ .agentUri(URI.create("mailto:test@example.com"))
.protocol(Protocol.A2A)
.build();
+
try (AdcpClient client = AdcpClient.builder()
.agent(a2aAgent)
.ssrfPolicy(SsrfPolicy.permissive())
.build()) {
- // A2A rejection fires before any null-arg handling, proving
- // the call doesn't NPE on null args.
- assertThrows(org.adcontextprotocol.adcp.error.FeatureUnsupportedError.class,
+ assertThrows(org.adcontextprotocol.adcp.error.ProtocolError.class,
() -> client.callTool("get_products", null, java.util.Map.class));
}
}
diff --git a/adcp/src/test/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidatorTest.java b/adcp/src/test/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidatorTest.java
index 9167c4c..23ae2ff 100644
--- a/adcp/src/test/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidatorTest.java
+++ b/adcp/src/test/java/org/adcontextprotocol/adcp/schema/AdcpSchemaValidatorTest.java
@@ -130,4 +130,11 @@ void network_refs_are_blocked_not_fetched() {
}
});
}
+
+ @Test
+ void validate_path_traversal_throws() {
+ JsonNode instance = mapper.createObjectNode();
+ assertThrows(IllegalArgumentException.class,
+ () -> validator.validate("schemas/../etc/passwd", instance));
+ }
}
diff --git a/adcp/src/test/java/org/adcontextprotocol/adcp/schema/SchemaBundleTest.java b/adcp/src/test/java/org/adcontextprotocol/adcp/schema/SchemaBundleTest.java
index b04c6c3..17934ba 100644
--- a/adcp/src/test/java/org/adcontextprotocol/adcp/schema/SchemaBundleTest.java
+++ b/adcp/src/test/java/org/adcontextprotocol/adcp/schema/SchemaBundleTest.java
@@ -50,4 +50,34 @@ void load_schema_with_refs() {
assertEquals("Format", schema.path("title").asText());
assertTrue(schema.has("$defs"), "format.json should have $defs");
}
+
+ @Test
+ void load_throws_on_null_path() {
+ assertThrows(IllegalArgumentException.class, () -> SchemaBundle.load(null));
+ }
+
+ @Test
+ void load_throws_on_path_traversal() {
+ assertThrows(IllegalArgumentException.class, () -> SchemaBundle.load("../secret.json"));
+ }
+
+ @Test
+ void load_throws_on_leading_slash() {
+ assertThrows(IllegalArgumentException.class, () -> SchemaBundle.load("/3.0.11/core/format.json"));
+ }
+
+ @Test
+ void exists_returns_false_for_null_path() {
+ assertFalse(SchemaBundle.exists(null));
+ }
+
+ @Test
+ void exists_returns_false_for_path_traversal() {
+ assertFalse(SchemaBundle.exists("../secret.json"));
+ }
+
+ @Test
+ void exists_returns_false_for_leading_slash() {
+ assertFalse(SchemaBundle.exists("/3.0.11/core/format.json"));
+ }
}
diff --git a/adcp/src/test/java/org/adcontextprotocol/adcp/transport/a2a/A2aCallerTest.java b/adcp/src/test/java/org/adcontextprotocol/adcp/transport/a2a/A2aCallerTest.java
new file mode 100644
index 0000000..d937716
--- /dev/null
+++ b/adcp/src/test/java/org/adcontextprotocol/adcp/transport/a2a/A2aCallerTest.java
@@ -0,0 +1,99 @@
+package org.adcontextprotocol.adcp.transport.a2a;
+
+import org.a2aproject.sdk.client.MessageEvent;
+import org.a2aproject.sdk.client.TaskUpdateEvent;
+import org.a2aproject.sdk.spec.A2AClientException;
+import org.a2aproject.sdk.spec.AgentCapabilities;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.AgentInterface;
+import org.a2aproject.sdk.spec.Message;
+import org.a2aproject.sdk.spec.Task;
+import org.a2aproject.sdk.spec.TaskState;
+import org.a2aproject.sdk.spec.TaskStatus;
+import org.a2aproject.sdk.spec.TaskStatusUpdateEvent;
+import org.a2aproject.sdk.spec.TextPart;
+import org.adcontextprotocol.adcp.error.ProtocolError;
+import org.adcontextprotocol.adcp.schema.AdcpObjectMapperFactory;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class A2aCallerTest {
+
+ private final A2aCaller caller = new A2aCaller(AdcpObjectMapperFactory.create());
+
+ @Test
+ void callTool_deserializes_message_response() {
+ A2aCaller.A2aMessageClient client = (params, consumers, errorHandler, context) ->
+ consumers.getFirst().accept(new MessageEvent(Message.builder()
+ .role(Message.Role.ROLE_AGENT)
+ .parts(new TextPart("{\"ok\":true}"))
+ .build()), testCard());
+
+ EchoResponse response = caller.callTool(client, "echo", Map.of("q", "x"),
+ EchoResponse.class, Map.of());
+
+ assertTrue(response.ok());
+ }
+
+ @Test
+ void callTool_wraps_client_exception() {
+ A2aCaller.A2aMessageClient client = (params, consumers, errorHandler, context) -> {
+ throw new A2AClientException("boom");
+ };
+
+ ProtocolError error = assertThrows(ProtocolError.class,
+ () -> caller.callTool(client, "echo", Map.of(), EchoResponse.class, Map.of()));
+
+ assertEquals("a2a", error.protocol());
+ assertTrue(error.getMessage().contains("echo"));
+ }
+
+ @Test
+ void callTool_surfaces_failed_task_update() {
+ A2aCaller.A2aMessageClient client = (params, consumers, errorHandler, context) ->
+ consumers.getFirst().accept(new TaskUpdateEvent(
+ Task.builder()
+ .id("task-1")
+ .contextId("ctx-1")
+ .status(new TaskStatus(
+ TaskState.TASK_STATE_FAILED,
+ Message.builder()
+ .role(Message.Role.ROLE_AGENT)
+ .parts(new TextPart("tool failed"))
+ .build(),
+ java.time.OffsetDateTime.now()))
+ .build(),
+ new TaskStatusUpdateEvent(
+ "task-1",
+ new TaskStatus(TaskState.TASK_STATE_FAILED),
+ "ctx-1",
+ Map.of())),
+ testCard());
+
+ ProtocolError error = assertThrows(ProtocolError.class,
+ () -> caller.callTool(client, "echo", Map.of(), EchoResponse.class, Map.of()));
+
+ assertTrue(error.getMessage().contains("tool failed"));
+ }
+
+ private static AgentCard testCard() {
+ return AgentCard.builder()
+ .name("test")
+ .description("test agent")
+ .version("1.0")
+ .url("https://agent.example.com")
+ .preferredTransport("JSONRPC")
+ .capabilities(AgentCapabilities.builder().streaming(true).pushNotifications(false).build())
+ .supportedInterfaces(List.of(new AgentInterface("JSONRPC", "https://agent.example.com")))
+ .defaultInputModes(List.of("text"))
+ .defaultOutputModes(List.of("text"))
+ .skills(List.of())
+ .build();
+ }
+
+ private record EchoResponse(boolean ok) {}
+}
diff --git a/adcp/src/test/java/org/adcontextprotocol/adcp/transport/a2a/A2aConnectionManagerTest.java b/adcp/src/test/java/org/adcontextprotocol/adcp/transport/a2a/A2aConnectionManagerTest.java
new file mode 100644
index 0000000..76bb004
--- /dev/null
+++ b/adcp/src/test/java/org/adcontextprotocol/adcp/transport/a2a/A2aConnectionManagerTest.java
@@ -0,0 +1,290 @@
+package org.adcontextprotocol.adcp.transport.a2a;
+
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.client.config.ClientConfig;
+import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
+import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfigBuilder;
+import org.a2aproject.sdk.spec.A2AClientException;
+import org.a2aproject.sdk.spec.AgentCapabilities;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.AgentInterface;
+import org.adcontextprotocol.adcp.AgentConfig;
+import org.adcontextprotocol.adcp.Protocol;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class A2aConnectionManagerTest {
+
+ private A2aConnectionManager manager;
+
+ @AfterEach
+ void cleanup() {
+ if (manager != null) {
+ manager.close();
+ }
+ }
+
+ @Test
+ void getOrConnect_reuses_cached_client_for_same_url() {
+ AtomicInteger loaderCalls = new AtomicInteger();
+ AtomicInteger factoryCalls = new AtomicInteger();
+ manager = new A2aConnectionManager(
+ (agent, headers) -> {
+ loaderCalls.incrementAndGet();
+ return testCard(agent.agentUri());
+ },
+ agentCard -> {
+ factoryCalls.incrementAndGet();
+ return testClient(agentCard);
+ });
+
+ AgentConfig agent = AgentConfig.builder()
+ .id("a2a-agent")
+ .agentUri(URI.create("https://agent.example.com"))
+ .protocol(Protocol.A2A)
+ .build();
+
+ Client first = manager.getOrConnect(agent, Map.of(), "anonymous");
+ Client second = manager.getOrConnect(agent, Map.of(), "anonymous");
+
+ assertSame(first, second);
+ assertEquals(1, loaderCalls.get());
+ assertEquals(1, factoryCalls.get());
+ }
+
+ @Test
+ void getOrConnect_different_auth_headers_get_separate_clients() {
+ AtomicInteger factoryCalls = new AtomicInteger();
+ List