Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jobs/loggr-forwarder-agent-windows/monit
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
"EMIT_OTEL_TRACES" => "#{p("emit_otel_traces")}",
"EMIT_OTEL_METRICS" => "#{p("emit_otel_metrics")}",
"EMIT_OTEL_LOGS" => "#{p("emit_otel_logs")}",
"OTEL_RETRY_MAX_RETRIES" => "#{p("otel_retry.max_retries")}",
"OTEL_RETRY_QUEUE_SIZE" => "#{p("otel_retry.queue_size")}",
"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
"METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt",
Expand Down
8 changes: 8 additions & 0 deletions jobs/loggr-forwarder-agent-windows/spec
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ properties:
description: "Emit logs to downstream OpenTelemetry consumers"
default: true

otel_retry.max_retries:
description: "Maximum number of retry attempts before a failed OTel Collector export batch is dropped"
default: 7

otel_retry.queue_size:
description: "Per-signal retry queue capacity (number of export batches that can be buffered per signal type)"
default: 1024

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
8 changes: 8 additions & 0 deletions jobs/loggr-forwarder-agent/spec
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ properties:
description: "Emit logs to downstream OpenTelemetry consumers"
default: true

otel_retry.max_retries:
description: "Maximum number of retry attempts before a failed OTel Collector export batch is dropped"
default: 7

otel_retry.queue_size:
description: "Per-signal retry queue capacity (number of export batches that can be buffered per signal type)"
default: 1024

tls.ca_cert:
description: |
TLS loggregator root CA certificate. It is required for key/cert
Expand Down
3 changes: 3 additions & 0 deletions jobs/loggr-forwarder-agent/templates/bpm.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
"EMIT_OTEL_METRICS" => p("emit_otel_metrics"),
"EMIT_OTEL_LOGS" => p("emit_otel_logs"),

"OTEL_RETRY_MAX_RETRIES" => p("otel_retry.max_retries"),
"OTEL_RETRY_QUEUE_SIZE" => p("otel_retry.queue_size"),

"METRICS_PORT" => "#{p("metrics.port")}",
"METRICS_CA_FILE_PATH" => "#{certs_dir}/metrics_ca.crt",
"METRICS_CERT_FILE_PATH" => "#{certs_dir}/metrics.crt",
Expand Down
11 changes: 11 additions & 0 deletions src/cmd/forwarder-agent/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type GRPC struct {
CipherSuites []string `env:"AGENT_CIPHER_SUITES, report"`
}

// OtelRetry holds tunable parameters for the OTel Collector gRPC retry logic.
type OtelRetry struct {
MaxRetries int `env:"OTEL_RETRY_MAX_RETRIES, report"`
RetryQueueSize int `env:"OTEL_RETRY_QUEUE_SIZE, report"`
}

// Config holds the configuration for the forwarder agent
type Config struct {
UseRFC3339 bool `env:"USE_RFC3339"`
Expand All @@ -33,6 +39,7 @@ type Config struct {
EmitOTelTraces bool `env:"EMIT_OTEL_TRACES, report"`
EmitOTelMetrics bool `env:"EMIT_OTEL_METRICS, report"`
EmitOTelLogs bool `env:"EMIT_OTEL_LOGS, report"`
OtelRetry OtelRetry
}

// LoadConfig will load the configuration for the forwarder agent from the
Expand All @@ -44,6 +51,10 @@ func LoadConfig() Config {
Host: "127.0.0.1",
Port: 3458,
},
OtelRetry: OtelRetry{
MaxRetries: 7,
RetryQueueSize: 1024,
},
}
if err := envstruct.Load(&cfg); err != nil {
panic(fmt.Sprintf("Failed to load config from environment: %s", err))
Expand Down
16 changes: 11 additions & 5 deletions src/cmd/forwarder-agent/app/forwarder_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ForwarderAgent struct {
emitOTelTraces bool
emitOTelMetrics bool
emitOTelLogs bool
otelRetry OtelRetry
}

type Metrics interface {
Expand Down Expand Up @@ -77,6 +78,7 @@ func NewForwarderAgent(
emitOTelTraces: cfg.EmitOTelTraces,
emitOTelMetrics: cfg.EmitOTelMetrics,
emitOTelLogs: cfg.EmitOTelLogs,
otelRetry: cfg.OtelRetry,
}
}

Expand All @@ -100,7 +102,7 @@ func (s *ForwarderAgent) Run() {
}))

dests := downstreamDestinations(s.downstreamFilePattern, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.emitOTelMetrics, s.emitOTelLogs, s.log)
writers := downstreamWriters(dests, s.grpc, s.m, s.emitOTelTraces, s.emitOTelMetrics, s.emitOTelLogs, s.otelRetry, s.log)
tagger := egress_v2.NewTagger(s.tags)
ew := egress_v2.NewEnvelopeWriter(
multiWriter{writers: writers},
Expand Down Expand Up @@ -213,13 +215,13 @@ func downstreamDestinations(pattern string, l *log.Logger) []destination {
return dests
}

func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces, emitOTelMetrics, emitOTelLogs bool, l *log.Logger) []Writer {
func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces, emitOTelMetrics, emitOTelLogs bool, otelRetry OtelRetry, l *log.Logger) []Writer {
var writers []Writer
for _, d := range dests {
var w Writer
switch d.Protocol {
case "otelcol":
w = otelCollectorClient(d, grpc, m, emitOTelTraces, emitOTelMetrics, emitOTelLogs, l)
w = otelCollectorClient(d, grpc, m, emitOTelTraces, emitOTelMetrics, emitOTelLogs, otelRetry, l)
default:
w = loggregatorClient(d, grpc, m, l)
}
Expand All @@ -228,7 +230,7 @@ func downstreamWriters(dests []destination, grpc GRPC, m Metrics, emitOTelTraces
return writers
}

func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emitMetrics, emitLogs bool, l *log.Logger) Writer {
func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emitMetrics, emitLogs bool, otelRetry OtelRetry, l *log.Logger) Writer {
clientCreds, err := tlsconfig.Build(
tlsconfig.WithInternalServiceDefaults(),
tlsconfig.WithIdentityFromFile(grpc.CertFile, grpc.KeyFile),
Expand All @@ -242,7 +244,11 @@ func otelCollectorClient(dest destination, grpc GRPC, m Metrics, emitTraces, emi

occl := log.New(l.Writer(), fmt.Sprintf("[OTEL COLLECTOR CLIENT] -> %s: ", dest.Ingress), l.Flags())

w, err := otelcolclient.NewGRPCWriter(dest.Ingress, clientCreds, occl)
writerCfg := otelcolclient.GRPCWriterConfig{
MaxRetries: otelRetry.MaxRetries,
RetryQueueSize: otelRetry.RetryQueueSize,
}
w, err := otelcolclient.NewGRPCWriter(dest.Ingress, clientCreds, writerCfg, occl)
if err != nil {
l.Fatalf("Failed to create OTel Collector gRPC writer for %s: %s", dest.Ingress, err)
}
Expand Down
Loading
Loading