feat: foreign-broker relay from OutboxSubscriber#44
Merged
Conversation
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pins the FastStream-native cross-broker chain against OutboxSubscriber with no guardrail code in place: a plain handler return relays to Kafka via the @publisher decorator stack. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Confirms include_router-before-start() resolves the router-publisher's ConfigComposition to the real producer (Kafka side) and that broker_outbox.include_router wires foreign-decorated subscribers from an OutboxRouter the same as broker-direct ones. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds opt-in inbound-header propagation on OutboxSubscriber so handlers that return a plain value can forward outbox-row headers to the foreign publisher chain. Default False matches FastStream's broker-wide convention. The dispatch hook lives in a process_message override that Task 5 (OutboxResponse + foreign-publisher guard) also uses. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Replace body-only assertion with cmd.headers capture so the test actually verifies header propagation. - Add complementary negative test for the default-False case. - Switch # type: ignore[override] → project convention: suppression removed entirely since ty does not flag OutboxSubscriber.process_message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A handler that returns OutboxResponse(...) and is also decorated by a foreign-broker publisher would both insert an outbox row AND publish to the foreign broker on every dispatch. Detect the combo at dispatch time and raise a RuntimeError pointing at the two valid patterns. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Preflight check during OutboxBroker.start() walks subscribers for publishers whose _outer_config is foreign and logs one WARNING per unstarted foreign broker. Operators see the cause immediately instead of debugging an AttributeError on the first relay attempt. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Confirms the FastStream AckPolicy nack path (publisher-chain exception -> AcknowledgementMiddleware.__aexit__ -> outbox row nack/retry) end to end against a real Postgres-backed OutboxBroker plus TestKafkaBroker. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The outbox-to-foreign-broker relay is the canonical use case for this package; promote it to the front page so readers see the payoff line immediately. Keep the standalone-queue example as a secondary quickstart for users without a downstream broker. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rride The consume() and process_message overrides reproduce upstream SubscriberUsecase paths verbatim so _OutboxConfigError can be re-raised without losing FastStream's existing behavior. The upstream-mirrored branches (StopConsume/SystemExit in consume(); parser error and no-matching-handler fall-through in process_message) are unreachable from the outbox dispatch path. Internal outbox publishers in subscriber decorator chains (filtered by "outer is self.config") are also unreachable in normal usage — users should call broker.publish() directly instead. Marking these branches # pragma: no cover keeps the 100% gate clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ders Covers the spec contract that when propagate_inbound_headers=True, the subscriber only fills headers when result_msg.headers is empty — a handler that returns Response(value, headers=...) keeps its choice. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
@kafka_pub @broker_outbox.subscriber(...)) soOutboxSubscribercan be the source of an outbox-to-foreign-broker chain. The chain mechanism is upstream FastStream — no dispatch-path changes were needed for the bare relay.OutboxResponse+ foreign-publisher dual-fire (G1), WARN on unstarted foreign brokers atOutboxBroker.start()(G2), opt-inpropagate_inbound_headers: bool = Falsekwarg (G3).Usagenav; README quickstart rewritten to lead with the relay example.faststream-sqlbrokercomparison documented in the spec andCLAUDE.md.What changed
OutboxSubscriber.process_messageoverride (G3 + G1 hook),OutboxSubscriber.consumeoverride (re-raise_OutboxConfigError),OutboxBroker.startextension (G2 warning),propagate_inbound_headersplumbed throughOutboxSubscriberConfig→ factory → registrator → router → fastapi router.tests/test_relay.py(againstTestKafkaBroker) + 1 integration test intests/test_integration.py(real Postgres + simulated foreign-publish failure proving at-least-once via retry).docs/usage/relay.md(promoted to top of Usage nav), README rewrite, intro callout,CLAUDE.mdsubsection.Test plan
tests/test_relay.py— 8 tests: naked chain (with default-OFF header behavior),KafkaRouterpublisher shape,OutboxRoutersubscriber shape, header propagation TRUE, header propagation FALSE, explicit user-headers-win override, OutboxResponse + foreign publisher refused, unstarted foreign broker warns.tests/test_integration.py::test_relay_at_least_once_under_foreign_publish_failure— real Postgres-backedOutboxBroker+TestKafkaBroker; first foreign publish raises, retry succeeds, row clears.just lint-ciclean.just testpasses — 401 tests, 100% coverage.Non-blocking follow-up notes (from final review)
_OutboxConfigErrorinheritsRuntimeErrorso userexcept RuntimeErrorblocks observe it; documented inCLAUDE.md.Spec:
planning/specs/2026-06-04-foreign-broker-relay-design.mdPlan:
planning/plans/2026-06-04-foreign-broker-relay-plan.md🤖 Generated with Claude Code