Skip to content

feat(federation): ADR-F6 stream connection pool (RECON-F6)#44

Merged
ontave merged 2 commits into
mainfrom
feature/recon-f6-federation-stream-pool
Jun 1, 2026
Merged

feat(federation): ADR-F6 stream connection pool (RECON-F6)#44
ontave merged 2 commits into
mainfrom
feature/recon-f6-federation-stream-pool

Conversation

@ontave
Copy link
Copy Markdown
Contributor

@ontave ontave commented May 29, 2026

Summary

  • Implements ADR-F6 (lab/ADR-F6-federation-stream-pool.md) in full
  • D1: semaphore channel enforces FEDERATION_MAX_CONCURRENT_STREAMS (default 50, range 1-1000); 3rd+ concurrent stream receives RESOURCE_EXHAUSTED
  • D2: token-bucket admission rate limiter (golang.org/x/time/rate) enforces FEDERATION_ADMISSION_RATE connections/sec (default 5, burst 2x); over-rate streams receive RESOURCE_EXHAUSTED
  • D4: conductor_federation_stream_active_count Prometheus gauge + conductor_federation_stream_reconnects_total counter vec (labeled cluster_id), registered on ctrlmetrics.Registry
  • ParseFederationMaxStreams / ParseFederationAdmissionRate: env-var parsers with range validation and safe defaults
  • ActiveStreamCount() int64: health-check accessor returning current atomic active count
  • golang.org/x/time v0.14.0 promoted from indirect to direct dependency

Test plan

  • TestFederationServer_RejectsWhenLimitReached: limit=2, 3rd acquire returns false
  • TestFederationServer_AdmitsUpToLimit: 2 goroutines both admitted
  • TestActiveStreamCount_DecreasesOnDisconnect: atomic inc/dec + semaphore round-trip
  • TestParseFederationMaxStreams: 7 cases (empty, valid, out-of-range, invalid)
  • TestParseFederationAdmissionRate: 6 cases (empty, valid, zero, negative, invalid)
  • All existing federation tests pass (stream_test, tls_test, integration stream tests)
  • CI green

Closes RECON-F6.

ontave added 2 commits May 29, 2026 10:08
Adds semaphore-bounded concurrent stream limit (D1), token-bucket
admission rate limiter (D2), and Prometheus metrics (D4) to
FederationServer. Env vars FEDERATION_MAX_CONCURRENT_STREAMS (default 50,
range 1-1000) and FEDERATION_ADMISSION_RATE (default 5, must be >0)
configure the pool at startup. ActiveStreamCount() exposes the current
gauge value for health checks.

Five unit tests cover: semaphore rejection at limit, concurrent admission
up to limit, activeCount increment/decrement on connect/disconnect, and
both env-var parser edge cases.
@ontave ontave merged commit 9870e97 into main Jun 1, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant