Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions cliext/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type ClientOptionsBuilder struct {
// Logger is the slog logger to use for the client. If set, it will be
// wrapped with the SDK's structured logger adapter.
Logger *slog.Logger

// PayloadCodec is populated by Build when a remote payload codec is
// configured. Callers can use it to decode payloads outside the gRPC
// interceptor chain (e.g. payloads nested inside opaque proto bytes).
PayloadCodec converter.PayloadCodec
}

type oauthCredentials struct {
Expand Down Expand Up @@ -248,13 +253,18 @@ func (b *ClientOptionsBuilder) Build(ctx context.Context) (client.Options, error
if err != nil {
return client.Options{}, fmt.Errorf("invalid codec headers: %w", err)
}
interceptor, err := newPayloadCodecInterceptor(
payloadCodec := newRemotePayloadCodec(
profile.Namespace, profile.Codec.Endpoint, profile.Codec.Auth, codecHeaders)
interceptor, err := converter.NewPayloadCodecGRPCClientInterceptor(
converter.PayloadCodecGRPCClientInterceptorOptions{
Codecs: []converter.PayloadCodec{payloadCodec},
})
if err != nil {
return client.Options{}, fmt.Errorf("failed creating payload codec interceptor: %w", err)
}
clientOpts.ConnectionOptions.DialOptions = append(
clientOpts.ConnectionOptions.DialOptions, grpc.WithChainUnaryInterceptor(interceptor))
b.PayloadCodec = payloadCodec
}

// Set connect timeout for GetSystemInfo if provided.
Expand All @@ -278,16 +288,16 @@ func parseKeyValuePairs(pairs []string) (map[string]string, error) {
return result, nil
}

// newPayloadCodecInterceptor creates a gRPC interceptor for remote payload codec.
func newPayloadCodecInterceptor(
// newRemotePayloadCodec constructs a remote payload codec from the configured endpoint,
// auth, and headers.
func newRemotePayloadCodec(
namespace string,
codecEndpoint string,
codecAuth string,
codecHeaders map[string]string,
) (grpc.UnaryClientInterceptor, error) {
) converter.PayloadCodec {
codecEndpoint = strings.ReplaceAll(codecEndpoint, "{namespace}", namespace)

payloadCodec := converter.NewRemotePayloadCodec(
return converter.NewRemotePayloadCodec(
converter.RemotePayloadCodecOptions{
Endpoint: codecEndpoint,
ModifyRequest: func(req *http.Request) error {
Expand All @@ -302,11 +312,6 @@ func newPayloadCodecInterceptor(
},
},
)
return converter.NewPayloadCodecGRPCClientInterceptor(
converter.PayloadCodecGRPCClientInterceptorOptions{
Codecs: []converter.PayloadCodec{payloadCodec},
},
)
}

func (c *oauthCredentials) getToken(ctx context.Context) (string, error) {
Expand Down
207 changes: 110 additions & 97 deletions go.mod

Large diffs are not rendered by default.

475 changes: 244 additions & 231 deletions go.sum

Large diffs are not rendered by default.

18 changes: 14 additions & 4 deletions internal/temporalcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,18 @@ import (
// so often used by callers after this call to know the currently configured
// namespace.
func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, error) {
cl, _, err := dialClientWithCodec(cctx, c)
return cl, err
}

// dialClientWithCodec is like [dialClient] but also returns the configured remote
// payload codec, or nil if no codec is configured. The codec is needed to decode
// payloads nested inside system Nexus operation bytes for the human-readable display
// path. It is NOT applied via a gRPC interceptor to avoid mutating the -o json output
// used by SDK replayers.
func dialClientWithCodec(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, converter.PayloadCodec, error) {
if cctx.RootCommand == nil {
return nil, fmt.Errorf("root command unexpectedly missing when dialing client")
return nil, nil, fmt.Errorf("root command unexpectedly missing when dialing client")
}

// Set default identity if not provided
Expand All @@ -47,7 +57,7 @@ func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, e
}
clientOpts, err := builder.Build(cctx)
if err != nil {
return nil, err
return nil, nil, err
}

// We do not put codec on data converter here, it is applied via
Expand Down Expand Up @@ -78,14 +88,14 @@ func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, e

cl, err := client.DialContext(dialCtx, clientOpts)
if err != nil {
return nil, err
return nil, nil, err
}

// Since this namespace value is used by many commands after this call,
// we are mutating it to be the derived one
c.Namespace = clientOpts.Namespace

return cl, nil
return cl, builder.PayloadCodec, nil
}

func fixedHeaderOverrideInterceptor(
Expand Down
2 changes: 1 addition & 1 deletion internal/temporalcli/commands.schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (s *SharedServerSuite) TestSchedule_List() {
out = res.Stdout.String()
assert.Contains(t, out, schedId)
}, 10*time.Second, time.Second)
s.ContainsOnSameLine(out, schedId, "DevWorkflow", "0s" /*jitter*/, "false", "nil" /*memo*/)
s.ContainsOnSameLine(out, schedId, "DevWorkflow", "0s" /*jitter*/, "false", "{}" /*memo*/)
s.ContainsOnSameLine(out, "TestSchedule_List")

// table
Expand Down
Loading
Loading