diff --git a/jobs/loggr-forwarder-agent-windows/monit b/jobs/loggr-forwarder-agent-windows/monit index 93e2a72f1..60c362071 100644 --- a/jobs/loggr-forwarder-agent-windows/monit +++ b/jobs/loggr-forwarder-agent-windows/monit @@ -28,6 +28,8 @@ "EMIT_OTEL_TRACES" => "#{p("emit_otel_traces")}", "EMIT_OTEL_METRICS" => "#{p("emit_otel_metrics")}", "EMIT_OTEL_LOGS" => "#{p("emit_otel_logs")}", + "OTEL_RETRY_MAX_RETRIES" => "#{p("otel_retry.max_retries")}", + "OTEL_RETRY_QUEUE_SIZE" => "#{p("otel_retry.queue_size")}", "METRICS_PORT" => "#{p("metrics.port")}", "METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt", "METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt", diff --git a/jobs/loggr-forwarder-agent-windows/spec b/jobs/loggr-forwarder-agent-windows/spec index e85d3adc1..245cf66eb 100644 --- a/jobs/loggr-forwarder-agent-windows/spec +++ b/jobs/loggr-forwarder-agent-windows/spec @@ -49,6 +49,14 @@ properties: description: "Emit logs to downstream OpenTelemetry consumers" default: true + otel_retry.max_retries: + description: "Maximum number of retry attempts before a failed OTel Collector export batch is dropped" + default: 7 + + otel_retry.queue_size: + description: "Per-signal retry queue capacity (number of export batches that can be buffered per signal type)" + default: 1024 + tls.ca_cert: description: | TLS loggregator root CA certificate. It is required for key/cert diff --git a/jobs/loggr-forwarder-agent/spec b/jobs/loggr-forwarder-agent/spec index fadee9e24..af2c3737d 100644 --- a/jobs/loggr-forwarder-agent/spec +++ b/jobs/loggr-forwarder-agent/spec @@ -49,6 +49,14 @@ properties: description: "Emit logs to downstream OpenTelemetry consumers" default: true + otel_retry.max_retries: + description: "Maximum number of retry attempts before a failed OTel Collector export batch is dropped" + default: 7 + + otel_retry.queue_size: + description: "Per-signal retry queue capacity (number of export batches that can be buffered per signal type)" + default: 1024 + tls.ca_cert: description: | TLS loggregator root CA certificate. It is required for key/cert diff --git a/jobs/loggr-forwarder-agent/templates/bpm.yml.erb b/jobs/loggr-forwarder-agent/templates/bpm.yml.erb index 7e8f502f9..9ce12267c 100644 --- a/jobs/loggr-forwarder-agent/templates/bpm.yml.erb +++ b/jobs/loggr-forwarder-agent/templates/bpm.yml.erb @@ -34,6 +34,9 @@ "EMIT_OTEL_METRICS" => p("emit_otel_metrics"), "EMIT_OTEL_LOGS" => p("emit_otel_logs"), + "OTEL_RETRY_MAX_RETRIES" => p("otel_retry.max_retries"), + "OTEL_RETRY_QUEUE_SIZE" => p("otel_retry.queue_size"), + "METRICS_PORT" => "#{p("metrics.port")}", "METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt", "METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt", diff --git a/src/cmd/forwarder-agent/app/config.go b/src/cmd/forwarder-agent/app/config.go index 022e27bd4..1b96c767a 100644 --- a/src/cmd/forwarder-agent/app/config.go +++ b/src/cmd/forwarder-agent/app/config.go @@ -19,6 +19,12 @@ type GRPC struct { CipherSuites []string `env:"AGENT_CIPHER_SUITES, report"` } +// OtelRetry holds tunable parameters for the OTel Collector gRPC retry logic. +type OtelRetry struct { + MaxRetries int `env:"OTEL_RETRY_MAX_RETRIES, report"` + RetryQueueSize int `env:"OTEL_RETRY_QUEUE_SIZE, report"` +} + // Config holds the configuration for the forwarder agent type Config struct { UseRFC3339 bool `env:"USE_RFC3339"` @@ -33,6 +39,7 @@ type Config struct { EmitOTelTraces bool `env:"EMIT_OTEL_TRACES, report"` EmitOTelMetrics bool `env:"EMIT_OTEL_METRICS, report"` EmitOTelLogs bool `env:"EMIT_OTEL_LOGS, report"` + OtelRetry OtelRetry } // LoadConfig will load the configuration for the forwarder agent from the @@ -44,6 +51,10 @@ func LoadConfig() Config { Host: "127.0.0.1", Port: 3458, }, + OtelRetry: OtelRetry{ + MaxRetries: 7, + RetryQueueSize: 1024, + }, } if err := envstruct.Load(&cfg); err != nil { panic(fmt.Sprintf("Failed to load config from environment: %s", err)) diff --git a/src/cmd/forwarder-agent/app/forwarder_agent.go b/src/cmd/forwarder-agent/app/forwarder_agent.go index 0cf889b80..db3315cb1 100644 --- a/src/cmd/forwarder-agent/app/forwarder_agent.go +++ b/src/cmd/forwarder-agent/app/forwarder_agent.go @@ -44,6 +44,7 @@ type ForwarderAgent struct { emitOTelTraces bool emitOTelMetrics bool emitOTelLogs bool + otelRetry OtelRetry } type Metrics interface { @@ -77,6 +78,7 @@ func NewForwarderAgent( emitOTelTraces: cfg.EmitOTelTraces, emitOTelMetrics: cfg.EmitOTelMetrics, emitOTelLogs: cfg.EmitOTelLogs, + otelRetry: cfg.OtelRetry, } } @@ -100,7 +102,7 @@ func (s *ForwarderAgent) Run() { })) dests := downstreamDestinations(s.downstreamFilePattern, s.log) - writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.emitOTelMetrics, s.emitOTelLogs, s.log) + writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.emitOTelMetrics, s.emitOTelLogs, s.otelRetry, s.log) tagger := egress_v2.NewTagger(s.tags) ew := egress_v2.NewEnvelopeWriter( multiWriter{writers: writers}, @@ -213,13 +215,13 @@ func downstreamDestinations(pattern string, l *log.Logger) []destination { return dests } -func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces, emitOTelMetrics, emitOTelLogs bool, l *log.Logger) []Writer { +func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces, emitOTelMetrics, emitOTelLogs bool, otelRetry OtelRetry, l *log.Logger) []Writer { var writers []Writer for _, d := range dests { var w Writer switch d.Protocol { case "otelcol": - w = otelCollectorClient(d, grpc, m, emitOTelTraces, emitOTelMetrics, emitOTelLogs, l) + w = otelCollectorClient(d, grpc, m, emitOTelTraces, emitOTelMetrics, emitOTelLogs, otelRetry, l) default: w = loggregatorClient(d, grpc, m, l) } @@ -228,7 +230,7 @@ func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces return writers } -func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emitMetrics, emitLogs bool, l *log.Logger) Writer { +func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emitMetrics, emitLogs bool, otelRetry OtelRetry, l *log.Logger) Writer { clientCreds, err := tlsconfig.Build( tlsconfig.WithInternalServiceDefaults(), tlsconfig.WithIdentityFromFile(grpc.CertFile, grpc.KeyFile), @@ -242,7 +244,11 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emi occl := log.New(l.Writer(), fmt.Sprintf("[OTEL COLLECTOR CLIENT] -> %s: ", dest.Ingress), l.Flags()) - w, err := otelcolclient.NewGRPCWriter(dest.Ingress, clientCreds, occl) + writerCfg := otelcolclient.GRPCWriterConfig{ + MaxRetries: otelRetry.MaxRetries, + RetryQueueSize: otelRetry.RetryQueueSize, + } + w, err := otelcolclient.NewGRPCWriter(dest.Ingress, clientCreds, writerCfg, occl) if err != nil { l.Fatalf("Failed to create OTel Collector gRPC writer for %s: %s", dest.Ingress, err) } diff --git a/src/pkg/otelcolclient/otelcolclient.go b/src/pkg/otelcolclient/otelcolclient.go index ba2de7365..51be9e9c1 100644 --- a/src/pkg/otelcolclient/otelcolclient.go +++ b/src/pkg/otelcolclient/otelcolclient.go @@ -20,9 +20,30 @@ import ( metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" ) +const ( + retryInitialDelay = 1 * time.Second + retryMaxDelay = 30 * time.Second +) + +// GRPCWriterConfig holds tunable parameters for the GRPCWriter retry behaviour. +type GRPCWriterConfig struct { + // MaxRetries is the maximum number of retry attempts before a batch is dropped. + MaxRetries int + // RetryQueueSize is the per-signal channel capacity for pending retry batches. + RetryQueueSize int +} + +// retryItem holds a failed export closure and a count of retry attempts already made. +type retryItem struct { + exportFn func() error + attempts int +} + type GRPCWriter struct { // The client API for the OTel Collector metrics service msc colmetricspb.MetricsServiceClient @@ -41,10 +62,28 @@ type GRPCWriter struct { // The logger to use for errors l *log.Logger + + // Retry configuration. + maxRetries int + initialRetryDelay time.Duration + maxRetryDelay time.Duration + + // Per-signal queues for async retry workers. Items are enqueued by withRetry + // and consumed by runRetryWorker goroutines started in NewGRPCWriter. The withRetry + // method enqueues a failed batch to a per-signal buffered channel and returns + // immediately, releasing the SignalBatcher mutex so the DiodeWriter ring + // buffer continues to drain without stalling. + // Sized to absorb the full retry window: 7 attempts over 91 s (1+2+4+8+16+30+30) + // at the 100 ms flush interval produces at most 910 batches. 1024 clears that + // with a small margin. A nil channel disables async retry (used in tests that + // construct GRPCWriter directly without starting workers). + metricsRetry chan retryItem + logsRetry chan retryItem + tracesRetry chan retryItem } // NewGRPCWriter dials the provided gRPC address and returns a *GRPCWriter. -func NewGRPCWriter(addr string, tlsConfig *tls.Config, l *log.Logger) (*GRPCWriter, error) { +func NewGRPCWriter(addr string, tlsConfig *tls.Config, cfg GRPCWriterConfig, l *log.Logger) (*GRPCWriter, error) { cc, err := grpc.NewClient(addr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) if err != nil { return nil, err @@ -58,58 +97,205 @@ func NewGRPCWriter(addr string, tlsConfig *tls.Config, l *log.Logger) (*GRPCWrit }() w := &GRPCWriter{ - msc: colmetricspb.NewMetricsServiceClient(cc), - tsc: coltracepb.NewTraceServiceClient(cc), - lsc: collogspb.NewLogsServiceClient(cc), - ctx: ctx, - cancel: cancel, - l: l, + msc: colmetricspb.NewMetricsServiceClient(cc), + tsc: coltracepb.NewTraceServiceClient(cc), + lsc: collogspb.NewLogsServiceClient(cc), + ctx: ctx, + cancel: cancel, + l: l, + maxRetries: cfg.MaxRetries, + initialRetryDelay: retryInitialDelay, + maxRetryDelay: retryMaxDelay, + metricsRetry: make(chan retryItem, cfg.RetryQueueSize), + logsRetry: make(chan retryItem, cfg.RetryQueueSize), + tracesRetry: make(chan retryItem, cfg.RetryQueueSize), } + go w.runRetryWorker("metrics", w.metricsRetry) + go w.runRetryWorker("logs", w.logsRetry) + go w.runRetryWorker("traces", w.tracesRetry) cancel = nil return w, nil } func (w GRPCWriter) WriteLogs(batch []*logspb.ResourceLogs) { - resp, err := w.lsc.Export(w.ctx, &collogspb.ExportLogsServiceRequest{ - ResourceLogs: batch, - }) - if err == nil { - err = errorOnLogsRejection(resp) - } - if err != nil { - w.l.Println("Write error:", err) - } + w.withRetry(func() error { + resp, err := w.lsc.Export(w.ctx, &collogspb.ExportLogsServiceRequest{ + ResourceLogs: batch, + }) + if err != nil { + return err + } + return errorOnLogsRejection(resp) + }, w.logsRetry) } func (w GRPCWriter) WriteMetrics(batch []*metricspb.Metric) { - resp, err := w.msc.Export(w.ctx, &colmetricspb.ExportMetricsServiceRequest{ - ResourceMetrics: []*metricspb.ResourceMetrics{ - { - ScopeMetrics: []*metricspb.ScopeMetrics{ - { - Metrics: batch, + w.withRetry(func() error { + resp, err := w.msc.Export(w.ctx, &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + ScopeMetrics: []*metricspb.ScopeMetrics{ + { + Metrics: batch, + }, }, }, }, - }, - }) - if err == nil { - err = errorOnRejection(resp) + }) + if err != nil { + return err + } + return errorOnRejection(resp) + }, w.metricsRetry) +} + +func (w GRPCWriter) WriteTrace(batch []*tracepb.ResourceSpans) { + w.withRetry(func() error { + resp, err := w.tsc.Export(w.ctx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: batch, + }) + if err != nil { + return err + } + return errorOnTraceRejection(resp) + }, w.tracesRetry) +} + +// isRetryable reports whether a gRPC error is transient and worth retrying. +// Only codes.Unavailable is retried — the expected code when the OTel +// Collector is down or restarting. Partial-success rejections (plain errors +// from errorOn*Rejection) return false. +func isRetryable(err error) bool { + s, ok := status.FromError(err) + if !ok { + return false } - if err != nil { - w.l.Println("Write error:", err) + return s.Code() == codes.Unavailable +} + +// isContextError reports whether an error is due to context cancellation so +// that shutdown paths can be distinguished from genuine write failures. +func isContextError(err error) bool { + if errors.Is(err, context.Canceled) { + return true } + s, ok := status.FromError(err) + return ok && s.Code() == codes.Canceled } -func (w GRPCWriter) WriteTrace(batch []*tracepb.ResourceSpans) { - resp, err := w.tsc.Export(w.ctx, &coltracepb.ExportTraceServiceRequest{ - ResourceSpans: batch, - }) +// withRetry makes a single export attempt. If the error is retryable the batch +// is handed off to the background retry worker via queue so the caller (running +// inside the SignalBatcher flush path, holding its mutex) is not blocked during +// backoff. Non-retryable errors and context errors are handled inline. +func (w GRPCWriter) withRetry(exportFn func() error, queue chan<- retryItem) { + err := exportFn() if err == nil { - err = errorOnTraceRejection(resp) + return } - if err != nil { + if isContextError(err) { + return + } + if !isRetryable(err) { w.l.Println("Write error:", err) + return + } + select { + case queue <- retryItem{exportFn: exportFn}: + default: + w.l.Println("Write error (retry queue full):", err) + } +} + +// drainRetryQueue non-blockingly moves all pending items from src into dst. +func drainRetryQueue(dst []retryItem, src <-chan retryItem) []retryItem { + for { + select { + case item := <-src: + dst = append(dst, item) + default: + return dst + } + } +} + +// runRetryWorker retries batches that were queued by withRetry. It maintains a +// pool of pending batches and replays them all on each backoff tick so that +// when the collector recovers the entire backlog is flushed in one sweep rather +// than one batch per backoff cycle. +// +// The shared pool delay resets to initialRetryDelay each time the pool drains +// to empty. Items that exhaust maxRetries are logged and discarded. The +// goroutine exits silently when the writer's context is cancelled. +func (w *GRPCWriter) runRetryWorker(signal string, queue <-chan retryItem) { + var pool []retryItem + delay := w.initialRetryDelay + + for { + // Merge any newly queued items into the pool. + prevLen := len(pool) + pool = drainRetryQueue(pool, queue) + if prevLen == 0 && len(pool) > 0 { + delay = w.initialRetryDelay + } + + if len(pool) == 0 { + // Block until there is work to do or the context is cancelled. + select { + case <-w.ctx.Done(): + return + case item := <-queue: + pool = append(pool, item) + delay = w.initialRetryDelay + } + pool = drainRetryQueue(pool, queue) + } + + // Wait before the retry attempt. + select { + case <-w.ctx.Done(): + return + case <-time.After(delay): + } + + // Drain items that arrived during the sleep. + pool = drainRetryQueue(pool, queue) + + // Attempt every item in the pool; keep the ones that still need more retries. + var remaining []retryItem + var lastError error + for _, item := range pool { + if isContextError(w.ctx.Err()) { + return + } + err := item.exportFn() + if err == nil { + continue + } + if isContextError(err) { + return + } + item.attempts++ + if !isRetryable(err) || item.attempts >= w.maxRetries { + w.l.Printf("Dropping %s batch after %d attempts: %v", signal, item.attempts, err) + continue + } + lastError = err + remaining = append(remaining, item) + } + + if len(remaining) > 0 { + w.l.Printf("Retrying %d %s batches in %s, last err: %v", len(remaining), signal, delay, lastError) + } + if len(pool) > 0 && len(remaining) == 0 { + w.l.Printf("%s retry pool drained after recovery", signal) + } + pool = remaining + + if len(pool) == 0 { + delay = w.initialRetryDelay + } else { + delay = min(delay*2, w.maxRetryDelay) + } } } diff --git a/src/pkg/otelcolclient/otelcolclient_test.go b/src/pkg/otelcolclient/otelcolclient_test.go index 6aa265524..a220575bd 100644 --- a/src/pkg/otelcolclient/otelcolclient_test.go +++ b/src/pkg/otelcolclient/otelcolclient_test.go @@ -5,6 +5,7 @@ import ( "errors" "log" "math" + "sync" "time" "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" @@ -20,6 +21,8 @@ import ( metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/testing/protocmp" ) @@ -1056,6 +1059,169 @@ var _ = Describe("Client", func() { }) }) + Describe("retry behavior", func() { + var ( + retryMSC *spyMetricsServiceClient + retryLSC *spyLogsServiceClient + retryTSC *spyTraceServiceClient + retryW *GRPCWriter + retryCancel context.CancelFunc + ) + + BeforeEach(func() { + retryMSC = &spyMetricsServiceClient{ + requests: make(chan *colmetricspb.ExportMetricsServiceRequest, 10), + response: &colmetricspb.ExportMetricsServiceResponse{}, + } + retryLSC = &spyLogsServiceClient{ + requests: make(chan *collogspb.ExportLogsServiceRequest, 10), + response: &collogspb.ExportLogsServiceResponse{}, + } + retryTSC = &spyTraceServiceClient{ + requests: make(chan *coltracepb.ExportTraceServiceRequest, 10), + response: &coltracepb.ExportTraceServiceResponse{}, + } + ctx, cancel := context.WithCancel(context.Background()) + retryCancel = cancel + retryW = &GRPCWriter{ + msc: retryMSC, + tsc: retryTSC, + lsc: retryLSC, + ctx: ctx, + cancel: cancel, + l: log.New(GinkgoWriter, "", 0), + maxRetries: 3, + initialRetryDelay: time.Millisecond, + maxRetryDelay: 5 * time.Millisecond, + metricsRetry: make(chan retryItem, 16), + logsRetry: make(chan retryItem, 16), + tracesRetry: make(chan retryItem, 16), + } + go retryW.runRetryWorker("metrics", retryW.metricsRetry) + go retryW.runRetryWorker("logs", retryW.logsRetry) + go retryW.runRetryWorker("traces", retryW.tracesRetry) + }) + + AfterEach(func() { + retryCancel() + }) + + Context("when a metrics export fails with a retryable error then succeeds on retry", func() { + BeforeEach(func() { + retryMSC.responseErrs = []error{ + status.Error(codes.Unavailable, "collector temporarily unavailable"), + nil, + } + }) + + It("retries in the background and does not log a write error", func() { + retryW.WriteMetrics([]*metricspb.Metric{{Name: "test-metric"}}) + + Eventually(func() int { + retryMSC.mu.Lock() + defer retryMSC.mu.Unlock() + return retryMSC.exportCount + }).Should(Equal(2)) + Expect(buf).NotTo(gbytes.Say("Write error")) + }) + }) + + Context("when all retries are exhausted", func() { + BeforeEach(func() { + retryMSC.responseErr = status.Error(codes.Unavailable, "collector down") + }) + + It("logs a drop message after the final retry attempt", func() { + retryW.WriteMetrics([]*metricspb.Metric{{Name: "test-metric"}}) + Eventually(buf).Should(gbytes.Say("Dropping metrics batch after.*collector down")) + }) + }) + + Context("when the error is not retryable", func() { + BeforeEach(func() { + retryMSC.responseErr = status.Error(codes.InvalidArgument, "bad metric") + }) + + It("logs the error immediately without queuing a retry", func() { + retryW.WriteMetrics([]*metricspb.Metric{{Name: "test-metric"}}) + + retryMSC.mu.Lock() + count := retryMSC.exportCount + retryMSC.mu.Unlock() + + Expect(count).To(Equal(1)) + Expect(buf).To(gbytes.Say("Write error:.*bad metric")) + }) + }) + + Context("when a logs export fails with ResourceExhausted", func() { + BeforeEach(func() { + retryLSC.responseErr = status.Error(codes.ResourceExhausted, "rate limited") + }) + + It("logs the error immediately without queuing a retry", func() { + retryW.WriteLogs([]*logspb.ResourceLogs{{}}) + + retryLSC.mu.Lock() + count := retryLSC.exportCount + retryLSC.mu.Unlock() + + Expect(count).To(Equal(1)) + Expect(buf).To(gbytes.Say("Write error:.*rate limited")) + }) + }) + + Context("when a trace export fails with Aborted", func() { + BeforeEach(func() { + retryTSC.responseErr = status.Error(codes.Aborted, "transaction aborted") + }) + + It("logs the error immediately without queuing a retry", func() { + retryW.WriteTrace([]*tracepb.ResourceSpans{{}}) + + retryTSC.mu.Lock() + count := retryTSC.exportCount + retryTSC.mu.Unlock() + + Expect(count).To(Equal(1)) + Expect(buf).To(gbytes.Say("Write error:.*transaction aborted")) + }) + }) + + Context("when the retry queue is full", func() { + BeforeEach(func() { + retryW.metricsRetry = make(chan retryItem, 1) + retryMSC.responseErr = status.Error(codes.Unavailable, "collector down") + }) + + It("logs a queue-full error for the overflow batch", func() { + retryW.WriteMetrics([]*metricspb.Metric{{Name: "m1"}}) + retryW.WriteMetrics([]*metricspb.Metric{{Name: "m2"}}) + Eventually(buf).Should(gbytes.Say("retry queue full")) + }) + }) + + Context("when the context is cancelled while retries are pending", func() { + BeforeEach(func() { + retryMSC.responseErr = status.Error(codes.Unavailable, "collector down") + }) + + It("stops the retry worker without logging a write error", func() { + retryW.WriteMetrics([]*metricspb.Metric{{Name: "test-metric"}}) + Eventually(func() int { + retryMSC.mu.Lock() + defer retryMSC.mu.Unlock() + return retryMSC.exportCount + }).Should(BeNumerically(">=", 1)) + + retryCancel() + // Allow goroutine to observe cancellation, then confirm no error was logged + // for the cancelled in-flight retry. + Consistently(buf, "50ms").ShouldNot(gbytes.Say("Write error")) + }) + }) + }) + Describe("Close", func() { It("cancels the gRPC context", func() { envelope := &loggregator_v2.Envelope{ @@ -1078,42 +1244,93 @@ var _ = Describe("Client", func() { }) type spyMetricsServiceClient struct { - requests chan *colmetricspb.ExportMetricsServiceRequest - response *colmetricspb.ExportMetricsServiceResponse - responseErr error - ctx context.Context + mu sync.Mutex + requests chan *colmetricspb.ExportMetricsServiceRequest + response *colmetricspb.ExportMetricsServiceResponse + responseErr error + responseErrs []error // dequeued on successive calls; falls back to responseErr when empty + exportCount int + ctx context.Context } func (c *spyMetricsServiceClient) Export(ctx context.Context, in *colmetricspb.ExportMetricsServiceRequest, opts ...grpc.CallOption) (*colmetricspb.ExportMetricsServiceResponse, error) { - c.requests <- in + c.mu.Lock() + c.exportCount++ + var err error + if len(c.responseErrs) > 0 { + err = c.responseErrs[0] + c.responseErrs = c.responseErrs[1:] + } else { + err = c.responseErr + } + c.mu.Unlock() + + select { + case c.requests <- in: + default: + } c.ctx = ctx - return c.response, c.responseErr + return c.response, err } type spyLogsServiceClient struct { - requests chan *collogspb.ExportLogsServiceRequest - response *collogspb.ExportLogsServiceResponse - responseErr error - ctx context.Context + mu sync.Mutex + requests chan *collogspb.ExportLogsServiceRequest + response *collogspb.ExportLogsServiceResponse + responseErr error + responseErrs []error + exportCount int + ctx context.Context } func (c *spyLogsServiceClient) Export(ctx context.Context, in *collogspb.ExportLogsServiceRequest, opts ...grpc.CallOption) (*collogspb.ExportLogsServiceResponse, error) { - c.requests <- in + c.mu.Lock() + c.exportCount++ + var err error + if len(c.responseErrs) > 0 { + err = c.responseErrs[0] + c.responseErrs = c.responseErrs[1:] + } else { + err = c.responseErr + } + c.mu.Unlock() + + select { + case c.requests <- in: + default: + } c.ctx = ctx - return c.response, c.responseErr + return c.response, err } type spyTraceServiceClient struct { - requests chan *coltracepb.ExportTraceServiceRequest - response *coltracepb.ExportTraceServiceResponse - responseErr error - ctx context.Context + mu sync.Mutex + requests chan *coltracepb.ExportTraceServiceRequest + response *coltracepb.ExportTraceServiceResponse + responseErr error + responseErrs []error + exportCount int + ctx context.Context } func (c *spyTraceServiceClient) Export(ctx context.Context, in *coltracepb.ExportTraceServiceRequest, opts ...grpc.CallOption) (*coltracepb.ExportTraceServiceResponse, error) { - c.requests <- in + c.mu.Lock() + c.exportCount++ + var err error + if len(c.responseErrs) > 0 { + err = c.responseErrs[0] + c.responseErrs = c.responseErrs[1:] + } else { + err = c.responseErr + } + c.mu.Unlock() + + select { + case c.requests <- in: + default: + } c.ctx = ctx - return c.response, c.responseErr + return c.response, err } func span(tsr *coltracepb.ExportTraceServiceRequest) *tracepb.Span {