diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/http/Http2PingCloseRewrapInstallTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/http/Http2PingCloseRewrapInstallTest.java
new file mode 100644
index 000000000000..a09bc5db0a22
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/http/Http2PingCloseRewrapInstallTest.java
@@ -0,0 +1,256 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.http;
+
+import com.azure.cosmos.Http2ConnectionConfig;
+import io.netty.channel.Channel;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import reactor.netty.http.client.HttpClientState;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link ReactorNettyClient#installHttp2PingCloseRewrapHandlerIfNeeded} --
+ * the per-child-stream install gate for {@link Http2PingCloseRewrapHandler}.
+ *
+ * In production the handler is installed from reactor-netty's {@code .observe(...)} hook
+ * only when ALL of the following hold:
+ *
+ * - the connection-lifecycle state is {@link HttpClientState#STREAM_CONFIGURED}
+ * (a child stream was just opened);
+ * - PING-health is effectively enabled -- kill-switch
+ * {@code COSMOS.HTTP2_PING_HEALTH_ENABLED} on, a positive PING interval, and HTTP/2
+ * enabled for the client;
+ * - the channel is a child stream ({@code parent() != null}).
+ *
+ * These tests drive the real production method with an {@link EmbeddedChannel} so the
+ * disablement path -- flipping the kill-switch (or any gate input) off must be a true
+ * revert-to-baseline that installs nothing on the child pipeline -- is guarded in CI
+ * without needing a live HTTP/2 server.
+ */
+public class Http2PingCloseRewrapInstallTest {
+
+ private static final String PING_HEALTH_ENABLED = "COSMOS.HTTP2_PING_HEALTH_ENABLED";
+ private static final String PING_INTERVAL_SECONDS = "COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS";
+
+ private String priorPingHealthEnabled;
+ private String priorPingIntervalSeconds;
+
+ @BeforeMethod(groups = "unit")
+ public void before_Method() {
+ // Snapshot the two PING system properties the gate reads so each test sets an
+ // explicit, isolated state and never leaks into sibling tests or CI defaults.
+ this.priorPingHealthEnabled = System.getProperty(PING_HEALTH_ENABLED);
+ this.priorPingIntervalSeconds = System.getProperty(PING_INTERVAL_SECONDS);
+ }
+
+ @AfterMethod(groups = "unit", alwaysRun = true)
+ public void after_Method() {
+ restore(PING_HEALTH_ENABLED, this.priorPingHealthEnabled);
+ restore(PING_INTERVAL_SECONDS, this.priorPingIntervalSeconds);
+ }
+
+ @Test(groups = "unit")
+ public void installsHandler_whenStreamConfiguredAndPingHealthEnabled() {
+ enablePingHealth();
+ ChildChannel child = newChildStream();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.STREAM_CONFIGURED, http2Enabled());
+
+ // Presence AND head-of-pipeline position: the rewrap handler must see
+ // channelInactive before reactor-netty's stream operations handler, so a
+ // regression from addFirst(...) to addLast(...) must fail this test.
+ assertThat(child.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME)).isNotNull();
+ assertThat(child.pipeline().first()).isSameAs(Http2PingCloseRewrapHandler.INSTANCE);
+ assertThat(child.pipeline().names().get(0)).isEqualTo(Http2PingCloseRewrapHandler.HANDLER_NAME);
+ } finally {
+ releaseAll(child);
+ }
+ }
+
+ @Test(groups = "unit")
+ public void skipsInstall_whenKillSwitchOff() {
+ // Disablement path: COSMOS.HTTP2_PING_HEALTH_ENABLED=false must remove the work,
+ // not just its effect -- nothing is added to the child pipeline.
+ System.setProperty(PING_HEALTH_ENABLED, "false");
+ System.setProperty(PING_INTERVAL_SECONDS, "1");
+ ChildChannel child = newChildStream();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.STREAM_CONFIGURED, http2Enabled());
+
+ assertThat(child.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME)).isNull();
+ } finally {
+ releaseAll(child);
+ }
+ }
+
+ @Test(groups = "unit")
+ public void skipsInstall_whenPingIntervalNonPositive() {
+ // A non-positive PING interval disables the PING sender, so there is no
+ // PING-timeout close signal to rewrap -> install is skipped.
+ System.setProperty(PING_HEALTH_ENABLED, "true");
+ System.setProperty(PING_INTERVAL_SECONDS, "0");
+ ChildChannel child = newChildStream();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.STREAM_CONFIGURED, http2Enabled());
+
+ assertThat(child.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME)).isNull();
+ } finally {
+ releaseAll(child);
+ }
+ }
+
+ @Test(groups = "unit")
+ public void skipsInstall_whenHttp2DisabledForClient() {
+ // PING-health globally on, but HTTP/2 disabled on this client's config ->
+ // isPingHealthEffectivelyEnabled is false -> no install.
+ enablePingHealth();
+ ChildChannel child = newChildStream();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.STREAM_CONFIGURED, http2Disabled());
+
+ assertThat(child.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME)).isNull();
+ } finally {
+ releaseAll(child);
+ }
+ }
+
+ @Test(groups = "unit")
+ public void skipsInstall_whenStateNotStreamConfigured() {
+ // Even with PING-health fully enabled, non-STREAM_CONFIGURED lifecycle events
+ // (e.g. the parent-channel CONFIGURED state) must not install the child handler.
+ enablePingHealth();
+ ChildChannel child = newChildStream();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.CONFIGURED, http2Enabled());
+
+ assertThat(child.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME)).isNull();
+ } finally {
+ releaseAll(child);
+ }
+ }
+
+ @Test(groups = "unit")
+ public void skipsInstall_andDoesNotEvaluatePredicate_whenStateNotStreamConfigured() {
+ // Guards the short-circuit order: the state check must run BEFORE the PING-health
+ // predicate. A null http2Cfg would NPE inside isPingHealthEffectivelyEnabled (the
+ // bridge accessor dereferences it), so a non-STREAM_CONFIGURED state must return
+ // early without touching the predicate -- proving the cheap state gate stays first
+ // and off the hot path.
+ enablePingHealth();
+ ChildChannel child = newChildStream();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.CONFIGURED, null);
+
+ assertThat(child.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME)).isNull();
+ } finally {
+ releaseAll(child);
+ }
+ }
+
+ @Test(groups = "unit")
+ public void skipsInstall_whenChannelHasNoParent() {
+ // Defensive parent() != null guard: a parent-less channel (not a real H2 child
+ // stream) must not get the rewrap handler even at STREAM_CONFIGURED.
+ enablePingHealth();
+ EmbeddedChannel parentless = new EmbeddedChannel();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ parentless, HttpClientState.STREAM_CONFIGURED, http2Enabled());
+
+ assertThat(parentless.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME)).isNull();
+ } finally {
+ parentless.finishAndReleaseAll();
+ }
+ }
+
+ @Test(groups = "unit")
+ public void install_isIdempotent() {
+ // The @Sharable handler must be installed at most once per child pipeline even if
+ // the observe hook fires STREAM_CONFIGURED more than once for the same stream.
+ enablePingHealth();
+ ChildChannel child = newChildStream();
+ try {
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.STREAM_CONFIGURED, http2Enabled());
+ ReactorNettyClient.installHttp2PingCloseRewrapHandlerIfNeeded(
+ child, HttpClientState.STREAM_CONFIGURED, http2Enabled());
+
+ long count = child.pipeline().toMap().values().stream()
+ .filter(h -> h instanceof Http2PingCloseRewrapHandler)
+ .count();
+ assertThat(count).isEqualTo(1);
+ } finally {
+ releaseAll(child);
+ }
+ }
+
+ private static void enablePingHealth() {
+ System.setProperty(PING_HEALTH_ENABLED, "true");
+ System.setProperty(PING_INTERVAL_SECONDS, "1");
+ }
+
+ // Explicit enabled/disabled flags so the gate's HTTP/2 condition does not depend on
+ // the ambient COSMOS.HTTP2_ENABLED system property. Constructing the config also runs
+ // Http2ConnectionConfig's static initializer, registering the bridge accessor the gate
+ // reads.
+ private static Http2ConnectionConfig http2Enabled() {
+ return new Http2ConnectionConfig().setEnabled(true);
+ }
+
+ private static Http2ConnectionConfig http2Disabled() {
+ return new Http2ConnectionConfig().setEnabled(false);
+ }
+
+ private static ChildChannel newChildStream() {
+ ChildChannel child = new ChildChannel();
+ child.setParentChannel(new EmbeddedChannel());
+ return child;
+ }
+
+ private static void restore(String key, String prior) {
+ if (prior == null) {
+ System.clearProperty(key);
+ } else {
+ System.setProperty(key, prior);
+ }
+ }
+
+ private static void releaseAll(ChildChannel child) {
+ Channel parent = child.parent();
+ child.finishAndReleaseAll();
+ if (parent instanceof EmbeddedChannel) {
+ ((EmbeddedChannel) parent).finishAndReleaseAll();
+ }
+ }
+
+ /**
+ * An {@link EmbeddedChannel} whose {@link #parent()} can be set after construction,
+ * mimicking an HTTP/2 child stream whose parent is the TCP connection channel. The
+ * parent field is assigned post-construction (never read during the superclass
+ * constructor), which avoids the captured-variable-before-super pitfall of an
+ * anonymous subclass.
+ */
+ private static final class ChildChannel extends EmbeddedChannel {
+ private Channel parentChannel;
+
+ @Override
+ public Channel parent() {
+ return this.parentChannel;
+ }
+
+ void setParentChannel(Channel parent) {
+ this.parentChannel = parent;
+ }
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingCloseRewrapHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingCloseRewrapHandler.java
index 02f6cd67dbcf..7e6b0dd511c4 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingCloseRewrapHandler.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingCloseRewrapHandler.java
@@ -8,11 +8,12 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
- * Per-request HTTP/2 child-stream handler that translates a parent-TCP-channel close
+ * Per-child-stream HTTP/2 handler that translates a parent-TCP-channel close
* driven by {@link Http2PingHandler} into a typed {@link Http2PingTimeoutChannelClosedException}.
*
* Installed at the head of each H2 child-stream pipeline by
- * {@code ReactorNettyClient}'s {@code .doOnRequest(...)} hook. When the parent (TCP)
+ * {@code ReactorNettyClient}'s {@code .observe(...)} hook on {@code STREAM_CONFIGURED}
+ * (the per-child-stream lifecycle event). When the parent (TCP)
* channel is closed by {@link Http2PingHandler} after consecutive PING-ACK timeouts
* or consecutive PING-send failures, the H2 multiplex codec propagates
* {@code channelInactive} to every child stream.
@@ -29,7 +30,8 @@
* JVM-wide {@link #INSTANCE} is reused across all H2 child streams.
*
* For non-H2 channels (parent is {@code null}) this handler is never installed; the
- * install site in {@code ReactorNettyClient} gates on {@code ch.parent() != null}.
+ * install site in {@code ReactorNettyClient} only runs at {@code STREAM_CONFIGURED}
+ * (H2 child streams) and additionally gates on {@code ch.parent() != null}.
*/
@ChannelHandler.Sharable
final class Http2PingCloseRewrapHandler extends ChannelInboundHandlerAdapter {
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHandler.java
index 82782d14c71b..a20ed23b7715 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHandler.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHandler.java
@@ -52,7 +52,8 @@ public class Http2PingHandler extends ChannelDuplexHandler {
*
* Consumed by {@link Http2PingCloseRewrapHandler}, a {@code @Sharable} handler
* installed at the head of each H2 child-stream pipeline by {@code
- * ReactorNettyClient.doOnRequest(...)}. When the parent channel closes with this
+ * ReactorNettyClient}'s {@code .observe(...)} hook on {@code STREAM_CONFIGURED}. When
+ * the parent channel closes with this
* attribute set, the rewrap handler fires {@code exceptionCaught} with a typed
* {@link Http2PingTimeoutChannelClosedException} so the in-flight request's
* response {@code Mono} fails with the typed exception (instead of a bare
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
index e95989c5aa3c..bdd6e68453b2 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
@@ -223,35 +223,98 @@ private void configureChannelPipelineHandlers() {
// Duplicate handler name is the only possible cause.
}
}
+
}))
- .doOnRequest((req, conn) -> {
- // Install a @Sharable head-of-pipeline rewrap handler on each H2
- // child-stream pipeline. When Http2PingHandler closes the parent
- // (TCP) channel after consecutive PING-ACK timeouts or PING-send
- // failures, the H2 multiplex codec propagates channelInactive to
- // every child stream; the rewrap handler intercepts that and fires
- // exceptionCaught with a typed
- // Http2PingTimeoutChannelClosedException so reactor-netty's
- // HttpClientOperations fails the response Mono with the typed
- // exception (instead of bare PrematureCloseException). The rest of the
- // stack maps that to GATEWAY_HTTP2_PING_TIMEOUT_CHANNEL_CLOSED so
- // ClientRetryPolicy can suppress region mark-down.
- //
- // Gate on ch.parent() != null so this only runs on H2 child streams
- // (H1.1 connections have null parent and never need the rewrap).
- Channel ch = conn.channel();
- if (ch.parent() != null && ch.pipeline().get(Http2PingCloseRewrapHandler.HANDLER_NAME) == null) {
- try {
- ch.pipeline().addFirst(
- Http2PingCloseRewrapHandler.HANDLER_NAME,
- Http2PingCloseRewrapHandler.INSTANCE);
- } catch (IllegalArgumentException ignored) {
- // TOCTOU race: between the get()==null check above and addFirst(),
- // a concurrent doOnRequest may have installed the handler.
- // Duplicate handler name is the only possible cause.
- }
- }
- });
+ // Install a @Sharable head-of-pipeline rewrap handler on each H2
+ // child-stream pipeline. STREAM_CONFIGURED is the per-child-stream
+ // lifecycle event reactor-netty fires once when a stream is opened, so
+ // connection.channel() here is the child stream (its parent is the
+ // parent TCP channel). doOnConnected cannot be used for this install:
+ // it fires only on the parent TCP channel (State.CONFIGURED) and never
+ // on a child stream, so a ch.parent() != null gate inside doOnConnected
+ // would never be satisfied and the handler would never install.
+ //
+ // The rewrap MUST live on the child stream pipeline, not the parent.
+ // When Http2PingHandler closes the parent (TCP) channel after
+ // consecutive PING-ACK timeouts or PING-send failures, the H2 multiplex
+ // codec propagates channelInactive to each child stream independently.
+ // Child streams are separate Netty Channels, so that close does not
+ // surface on the parent pipeline -- reactor-netty's per-stream
+ // HttpClientOperations turns the child channelInactive into a bare
+ // PrematureCloseException. This head-of-child-pipeline handler
+ // intercepts channelInactive first and fires exceptionCaught with a
+ // typed Http2PingTimeoutChannelClosedException before HttpClientOperations
+ // observes the close, so the response Mono fails with the typed
+ // exception. The rest of the stack maps that to
+ // GATEWAY_HTTP2_PING_TIMEOUT_CHANNEL_CLOSED so ClientRetryPolicy can
+ // suppress region mark-down.
+ //
+ // The install/skip decision (state gate, PING-health gate, parent and
+ // idempotency guards) is extracted into
+ // installHttp2PingCloseRewrapHandlerIfNeeded so it can be unit-tested
+ // without a live HTTP/2 connection.
+ .observe((connection, state) ->
+ installHttp2PingCloseRewrapHandlerIfNeeded(connection.channel(), state, http2Cfg));
+ }
+ }
+
+ /**
+ * Installs the {@link Http2PingCloseRewrapHandler} at the head of an HTTP/2
+ * child-stream pipeline when, and only when, PING-health is effectively enabled.
+ * Invoked from the {@code .observe(...)} hook in
+ * {@link #configureChannelPipelineHandlers()} for every connection-lifecycle event;
+ * see that hook for why the install must happen on the child stream at
+ * {@link HttpClientState#STREAM_CONFIGURED} rather than via {@code doOnConnected} on
+ * the parent TCP channel.
+ *
+ *
This is a no-op unless all of the following hold:
+ *
+ * - {@code state} is {@link HttpClientState#STREAM_CONFIGURED} (a child stream was
+ * just opened);
+ * - {@link Http2PingHandler#isPingHealthEffectivelyEnabled(Http2ConnectionConfig)}
+ * is {@code true} -- the rewrap handler only has a PING-timeout close signal to
+ * translate while the PING sender is active, so when PING-health is disabled (via
+ * {@code COSMOS.HTTP2_PING_HEALTH_ENABLED=false}, a non-positive PING interval, or
+ * HTTP/2 itself being disabled) the install is skipped and the kill-switch is a
+ * true revert-to-baseline with no extra per-stream pipeline hop;
+ * - {@code channel.parent()} is non-null (defensive: STREAM_CONFIGURED already
+ * implies a child stream) and the handler is not already installed.
+ *
+ * The PING-health predicate is evaluated only after the state check so the common
+ * non-stream lifecycle events stay off this path. The gate is evaluated once per stream
+ * at install time; toggling the kill-switch on at runtime only affects streams
+ * configured afterwards, and since streams are single-use behavior converges within one
+ * stream lifetime.
+ *
+ * @param channel the channel reactor-netty associated with the lifecycle event (the
+ * child stream when {@code state} is STREAM_CONFIGURED)
+ * @param state the connection-lifecycle state being observed
+ * @param http2Cfg the per-client HTTP/2 configuration backing the PING-health gate
+ */
+ static void installHttp2PingCloseRewrapHandlerIfNeeded(
+ Channel channel,
+ ConnectionObserver.State state,
+ Http2ConnectionConfig http2Cfg) {
+
+ if (state != HttpClientState.STREAM_CONFIGURED
+ || !Http2PingHandler.isPingHealthEffectivelyEnabled(http2Cfg)) {
+ return;
+ }
+
+ ChannelPipeline childPipeline = channel.pipeline();
+ // STREAM_CONFIGURED implies a child stream, so channel.parent() is the parent TCP
+ // channel; the null-check is defensive.
+ if (channel.parent() != null
+ && childPipeline.get(Http2PingCloseRewrapHandler.HANDLER_NAME) == null) {
+ try {
+ childPipeline.addFirst(
+ Http2PingCloseRewrapHandler.HANDLER_NAME,
+ Http2PingCloseRewrapHandler.INSTANCE);
+ } catch (IllegalArgumentException ignored) {
+ // Benign duplicate install: another install path may have added the handler
+ // between the get()==null check above and addFirst(). A duplicate handler
+ // name is the only cause.
+ }
}
}