Skip to content

xds/extproc: adds ClientInterceptor and ClientStream implementation for normal mode for gRFC A93#9174

Open
eshitachandwani wants to merge 27 commits into
grpc:masterfrom
eshitachandwani:newStream
Open

xds/extproc: adds ClientInterceptor and ClientStream implementation for normal mode for gRFC A93#9174
eshitachandwani wants to merge 27 commits into
grpc:masterfrom
eshitachandwani:newStream

Conversation

@eshitachandwani

Copy link
Copy Markdown
Member

This PR add implementation of NewStream and ClientStream for normal mode for A93: xds-ext-proc.
This PR does not include channel retention , metrics and observability mode.

#ext-proc-a93

RELEASE NOTES: None

@eshitachandwani eshitachandwani added this to the 1.83 Release milestone Jun 8, 2026
@eshitachandwani eshitachandwani added Type: Feature New features or improvements in behavior Area: xDS Includes everything xDS related, including LB policies used with xDS. labels Jun 8, 2026
@codecov

codecov Bot commented Jun 8, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 69.20000% with 231 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.01%. Comparing base (484f150) to head (c4e5ab0).
⚠️ Report is 4 commits behind head on master.

Files with missing lines Patch % Lines
internal/xds/httpfilter/extproc/ext_proc.go 65.79% 162 Missing and 61 partials ⚠️
internal/xds/httpfilter/extconfig.go 95.69% 2 Missing and 2 partials ⚠️
...ternal/xds/httpfilter/extproc/internal/internal.go 0.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #9174      +/-   ##
==========================================
- Coverage   83.19%   83.01%   -0.18%     
==========================================
  Files         420      421       +1     
  Lines       34010    34747     +737     
==========================================
+ Hits        28295    28846     +551     
- Misses       4281     4412     +131     
- Partials     1434     1489      +55     
Files with missing lines Coverage Δ
internal/resolver/config_selector.go 100.00% <ø> (ø)
internal/xds/httpfilter/extproc/config.go 89.47% <ø> (+5.26%) ⬆️
stream.go 81.96% <100.00%> (-0.23%) ⬇️
xds/xds.go 40.90% <ø> (ø)
internal/xds/httpfilter/extconfig.go 94.91% <95.69%> (+2.91%) ⬆️
...ternal/xds/httpfilter/extproc/internal/internal.go 0.00% <0.00%> (ø)
internal/xds/httpfilter/extproc/ext_proc.go 68.42% <65.79%> (-8.50%) ⬇️

... and 24 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread internal/resolver/config_selector.go
Comment thread internal/xds/httpfilter/extproc/ext_proc.go
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go
Comment thread internal/xds/httpfilter/extproc/ext_proc.go
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
},
}
if err = procStream.Send(headerReq); err != nil {
return cs.handleInitError(fmt.Errorf("failed to send client headers to external processor server: %v", err), newStream, opts)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to follow the code when NewStream hits an initialization error and therefore handleInitError is called. Let's say failure_mode_allow is set to true. In this case, handleInitError will cancel the proc stream's context, will create the dataplane stream and will set procStreamBypass to true.

Now, let's say the application tries to send a message. In SendMsg, it will see that the proc stream is bypassed and therefore will call waitForDataplaneStream. But for cases where the proc stream is bypassed, can we be sure that a dataplane stream definitely exists? I see that this is true for the init errors. But just wanted to make sure if it is true for all cases. If that is the case, in SendMsg instead of this block:

	if extClosed || cs.config.processingModes.requestBodyMode == modeSkip {
		s, err := cs.waitForDataplaneStream(cs.ctx)
		if err != nil {
			return err
		}
		return s.SendMsg(m)
	}

Does, it make sense to separate it out into two blocks:

	if extClosed {
		return s.SendMsg(m)
    }
    if cs.config.processingModes.requestBodyMode == modeSkip {
		s, err := cs.waitForDataplaneStream(cs.ctx)
		if err != nil {
			return err
		}
		return s.SendMsg(m)
	}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now , there is no such case where extClosed is true but dataplane is nil. But I think we should still keep the check waitForDataplaneStream because the proc filter is a highly concurrent system and it might be good to have a check. It will not introduce latency because waitForDataplaneStream will exit immediately because the dataplaneReady channel will already have been closed.
WDYT ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot have checks to cover things that we don't know about. We should have some reasoning about why a check if required. Saying that this is a highly concurrent system and therefore I will check everything possible everywhere only complicates the code more and makes it harded to reason about.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed . In SendMsg and CloseSend both.

Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
}
if err != io.EOF && (cs.ignoreFailureMode.Load() || !cs.config.failureModeAllow) {
cs.procStreamErr.Store(status.Errorf(codes.Internal, "extproc: external processor RPC failed: %v", err))
cs.procStreamFailed.Fire()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this event fire irrespective of whether cs.ignoreFailureMode.Load() || !cs.config.failureModeAllow is true?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This event is to indicate that we should fail the RPC , but if failure mode allow is true and we do not have to ignore the failure mode allow , that means the events should continue on the dataplane stream rather than failing with error.
And to signal continue on dataplane stream , we have triggerDrain()

return
}
cs.procStreamBypass.Store(true)
cs.triggerDrain()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can guarantee that the proc stream is either marked closed (err == io.EOF) or failed (err != io.EOF), do we need to trigger this drain process here? I'm trying to see if we can restrict the drainTriggeredCh only to cases where the proc server sent us the request_drain bit.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can do that because restricting drainTriggeredCh to only server initiated drains would cause deadlocks in fail-open (bypass) scenarios. For example:

Setup: requestHeaderMode: SEND, failureModeAllow: true (Fail-Open enabled).
Situation: During NewStream, the client sends request headers to the processor stream and calls processInitialHeaders() to block waiting for the processor's mutated headers response. While waiting, the processor stream fails or crashes.
Deadlock:
recvFromProcServerLoop receives the connection error and calls cs.failStream(err).
Since fail-open is enabled and no body messages are sent yet, failStream does not fire the hard failure event (procStreamFailed). Instead, it attempts to trigger a bypass by setting procStreamBypass to true.
If we do not trigger a drain here, drainTriggeredCh remains open.
Consequently, processInitialHeaders remains blocked forever in its select statement, waiting for either a processor response (which will never come) or a drain/bypass event. The client stream initialization hangs.

And a similar situation with responseHeaders.

We could introduce a separate channel (like bypassTriggeredCh), but since both events require the exact same action—stop sending to the processor and redirect all traffic directly to the dataplane—splitting them would force us to duplicate select cases throughout
Or we can remane this to bypassProcCh to unify the 2 use cases it serves.
WDYT ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unifying the two cases, if possible would be nicer. Lesser things to think about. This code is already huge :) and is only going to get huger.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the variable names to show it bypasses the proc stream instead of triggering drain.

Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
@easwars easwars assigned eshitachandwani and unassigned easwars Jun 23, 2026
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated

// failStream handles stream failures, recording errors or bypassing external
// processor based on failureModeAllow configuration.
func (cs *clientStream) failStream(err error) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we call this failProcStream to be more explicit about which stream is failing here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ... i don't see it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh I made the change and then reverted it I guess while making other changes. Changing it now.

Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
Comment thread internal/xds/httpfilter/extproc/ext_proc.go
Comment thread internal/xds/httpfilter/extproc/ext_proc.go Outdated
…planeCreated incase of err and sucesss , store error , other review comments

// failStream handles stream failures, recording errors or bypassing external
// processor based on failureModeAllow configuration.
func (cs *clientStream) failStream(err error) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ... i don't see it.

for k, v := range reqFields {
val, err := structpb.NewValue(v)
if err != nil {
continue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Add a comment here or add to the docstring that we encode as many attributes as we can and ignore the ones that can't, similar to Envoy.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the comment typed out and then don't know why decided against it. Adding it now

return
}
cs.procStreamBypass.Store(true)
cs.triggerDrain()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unifying the two cases, if possible would be nicer. Lesser things to think about. This code is already huge :) and is only going to get huger.

},
}
if err = procStream.Send(headerReq); err != nil {
return cs.handleInitError(fmt.Errorf("failed to send client headers to external processor server: %v", err), newStream, opts)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot have checks to cover things that we don't know about. We should have some reasoning about why a check if required. Saying that this is a highly concurrent system and therefore I will check everything possible everywhere only complicates the code more and makes it harded to reason about.

@eshitachandwani eshitachandwani requested a review from easwars June 25, 2026 21:26
@mbissa mbissa self-requested a review June 26, 2026 07:44
if cs.procStreamFailed.HasFired() {
return
}
if err != io.EOF && (cs.ignoreFailureMode.Load() || !cs.config.failureModeAllow) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases where we have EOF, but drain is not initiated, then we still need to treat it as non-OK status as per the gRFC. are we handling that here? It should be handled in the success scenario. Also, curious if EOF is in headers only RPC - that should not be failed for wanting of drain right? And by the looks of it, failProcStream is conditional failure, the name should reflect that - attemptProcStreamFailure or something similar, failProcStream souds like guaranteed failure - and is confusing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drain request for EOF is still being discussed and will probably be finalised after the new bidirectional drain is finalised. See the discussion here : https://chat.google.com/room/AAAAbkw9L3c/uuoZ0GXjSdE and so I have not changed it yet.

Also intention behind naming it failProcStream is the proc stream had definately failed/closed. We need to decide wether to fail the RPC of let it bypass. attemptProcStreamFailure might indicate that proc stream is being failed conditionally. WDYT ?

if cs.protocolConfigSent.CompareAndSwap(false, true) {
req.ProtocolConfig = &v3procservicepb.ProtocolConfiguration{
RequestBodyMode: convertBodyMode(cs.config.processingModes.requestBodyMode),
ResponseBodyMode: convertBodyMode(cs.config.processingModes.responseBodyMode),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just looking at validations for response_body_mode during the parsing of configuration - we don't have validation for response_trailer_mode : it shold be SEND when the response_body_mode is GRPC

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh right! I planned to send a seperate PR for that. Here is the PR #9209

// Signal that the response trailer is modified and ready to be sent to
// the client.
cs.responseTrailerModified.Fire()
cs.procStream.CloseSend()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API for stream advises: "It is also not safe to call CloseSend concurrently with SendMsg." should this be guarded to avoid race since we are calling it from multiple places? Same goes for dataplane stream as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made sure that all the Send to the dataplaneStream are complete before calling CloseSend and it will theoritically never be called concurrently with Send for dataplaneStream. But we can add a mutex if we want to be sure.

For the Procstream closeSend , I have changed the implementation such that when the trailers are received from the dataplane server , we send nil to the procSendCh to indicate that we need to call closeSend on proc stream. This way it will only be sent after all the sends to proc stream is done.

// It also contains the initial metadata specified in the config.
procCtx, cancel := context.WithCancel(ctx)
if i.config.server.Timeout != 0 {
procCtx, cancel = context.WithTimeout(ctx, i.config.server.Timeout)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we do this, are we not over-writing context.WithCancel(ctx) ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , we are , because if we have a timeout, we need to use that to create the proc RPC. And context.Timeout also returns a cancellable context with a timeout.
Added some comments to make it a little more clear.

var err error
if cs.dataplaneStream, err = newStream(ctx, opts...); err != nil {
cs.dataplaneCreationErr = err
cs.cancel()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we call cancel and close dataplane setup, the waitForDataplaneStream will have a non deterministic error surfaced based on what happens first

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Now that we have different error holder for dataplane stream , we should check for that too in ctx.Done case.
Changed.

}

// TestDrainingFlowControlNoMessageLoss tests the scenario where a processor
// server sends RequestDrain: true during active flow control backpressure.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are guaranteed to test the draining and message loss logic, but not really backpressure.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Changed the name of the test to reflect the same.

// cannot receive. Verifies that backpressure correctly propagates across the
// filter: the client's Send call blocks, and receiving from the dataplane
// server also blocks.
func (s) TestFlowControl(t *testing.T) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

none of the tests will fail for race because they don't drive request-body forwarding concurrently with response-trailer/close processing - we should add tessts for checking all the close/cancel/send races where we have these happening in parallel.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how to write a deterministic concurrent test for this. I have added a TestConcurrency which will call Recv from the main goroutine, while Send, CloseSend, and context cancellation run in separate concurrent goroutines. And we assert that RPC should fail. Let me know what you think or if you have something else in mind.

cs.failProcStream(fmt.Errorf("extproc: external processor returned invalid status instead of CONTINUE for response headers"))
return
}
if err = cs.config.mutationRules.ApplyAdditions(header.GetResponse().GetHeaderMutation().GetSetHeaders(), cs.responseHeader); err != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a trailers only response, this will incorrectly fail the RPC? Also the Test for trailers only message doesn't capture this scenario effectively.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have asked for what should be the proc behaviour for a trailer only request message.

// external processor server returns GrpcMessageCompressed: true while
// failure_mode_allow is false. Verifies that the stream is cancelled and
// subsequent data plane RPC calls fail with Internal.
func (s) TestStreamFailureGrpcMessageCompressedDeny(t *testing.T) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the negative tests are written separate - can they be combined into table drive tests, it will help with making sense of coverage for review. Also, similar for the three TestImmediateResponse* tests. IT will reduce 9 different tests into 2 tests with cleaner code.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to combine as many as I could.

// returned. Otherwise, the disallowed mutation is silently ignored.
//
// The input metadata must not be nil.
func (hmr *HeaderMutationRules) ApplyAdditions(hvos []*v3corepb.HeaderValueOption, input metadata.MD) error {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes added here are not tested in extconfig_test.go - should they be or are you covering them in end to end tests?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a test for it in TestStreamModification but added extensive test in ext_config_test.go.

@mbissa mbissa assigned eshitachandwani and unassigned easwars and mbissa Jun 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area: xDS Includes everything xDS related, including LB policies used with xDS. Type: Feature New features or improvements in behavior

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants