diff --git a/.vitepress/config/en.ts b/.vitepress/config/en.ts index b15c30e..299661c 100644 --- a/.vitepress/config/en.ts +++ b/.vitepress/config/en.ts @@ -205,6 +205,7 @@ export const enConfig: LocaleSpecificConfig = { { text: 'ADR-024: Auth & Authz Strategy', link: '/en/contributing/adr/024-auth-authz-strategy' }, { text: 'ADR-025: Package Versioning Strategy', link: '/en/contributing/adr/025-package-versioning-strategy' }, { text: 'ADR-026: EventBus Architecture', link: '/en/contributing/adr/026-eventbus-architecture' }, + { text: 'ADR-027: External Contracts vs EventBus', link: '/en/contributing/adr/027-external-contracts-vs-eventbus' }, ], }, ], diff --git a/en/contributing/adr/027-external-contracts-vs-eventbus.md b/en/contributing/adr/027-external-contracts-vs-eventbus.md new file mode 100644 index 0000000..80a49a1 --- /dev/null +++ b/en/contributing/adr/027-external-contracts-vs-eventbus.md @@ -0,0 +1,58 @@ +# ADR-027: External Message Contracts vs the Internal EventBus + +## Status + +Accepted -- 2026-06-12 + +## Context + +[ADR-026](./026-eventbus-architecture.md) established the EventBus as a proto-first, adapter-based internal event system: `EventBus.publish()` takes a protobuf message, serializes it with `toBinary`, and routes it by proto type. The four adapters (NATS, Kafka, Redis, AMQP) implement a deliberately minimal `EventAdapter` interface that moves opaque `Uint8Array` payloads. + +A production adopter then needed to publish to a **partner's** broker under an externally agreed AsyncAPI contract: a named exchange and durable queue, `contentType: application/json`, `deliveryMode=2`, `mandatory`, and per-message publisher confirms. This drove the `events-amqp-external-contract` change, which added adapter-level serialization (`contentType` + `encode`/`decode`), explicit topology declaration, `queueOverrides`, automatic recovery, and reliable publishing to `@connectum/events-amqp`. + +That work surfaced a recurring design question that will reappear for every adapter: + +- Should the **core** `@connectum/events` interfaces grow to express external-contract variability — per-publish `contentType`, per-subscribe queue names, a `wireFormat: "binary" | "json"` switch on `EventBus`? +- Or do external contracts belong to a different layer than the internal bus? + +Without a stated principle, each adapter would answer this differently, and the core interfaces would accrete passthrough options that weaken the typed proto contract. + +## Decision + +**The EventBus is an internal protobuf bus. External message contracts are served at the adapter layer (or by publishing directly through an adapter), not by extending the core EventBus interfaces.** + +Concretely: + +1. **`EventBus.publish()` stays protobuf-only.** No `wireFormat` switch is introduced. Putting JSON (or any non-protobuf encoding) on the bus would force every subscriber, middleware, retry, and DLQ path to understand multiple wire formats, and would make `RawEvent.payload` ambiguous. protobuf-es `toJson` only solves the producer side; the consumer side would require format detection (content-sniffing), which the AMQP change already rejected as implicit magic that must fail loudly instead. + +2. **External contracts are produced through the adapter.** When an application must emit JSON (or a partner's contentType) against an external contract, it serializes the bytes itself and publishes through the adapter, which controls the wire `contentType` and optional `encode`/`decode` transcoding. This is an outbound producer to a foreign system — it does not need the EventBus's typed routing, middleware, or DLQ. + +3. **Core interfaces stay minimal; variability lives in adapter options.** `PublishOptions` / `RawSubscribeOptions` are not extended with generic passthrough bags, declaration merging, or a viral `EventBus` generic. Contract variability that is genuinely whole-channel (the whole queue is JSON, the whole group reads a named queue) is expressed by adapter-level options (`serialization`, `queueOverrides`, `topology`). A future need for genuine *per-message* variability — if one ever arises with a concrete use case — would be added as typed options on a specific adapter's constructor, not on the core interface. + +4. **`PublishOptions.sync` is removed.** It was a no-op across all four adapters: each already confirms publishes per-message (NATS `PubAck`, Kafka `producer.send`, Redis `XADD`, AMQP per-message broker ack with typed errors). A resolved `publish()` already means the broker accepted the message; there was no fire-and-forget mode to opt out of. Removed ahead of the first stable release. + +## Consequences + +### Positive + +- The internal bus keeps one wire format; subscribers, middleware, retry, and DLQ reason about exactly one payload shape. +- Core `EventAdapter` / `PublishOptions` interfaces stay small and typed — no passthrough erosion across four heterogeneous adapters. +- External-contract concerns (content type, topology, confirms, recovery) are isolated to the adapter that actually talks to the foreign broker. + +### Negative / Trade-offs + +- **Direct adapter publishing bypasses EventBus middleware** (producer-side retry / DLQ). For an external at-least-once producer this is compensated by the adapter's per-message confirms and typed error taxonomy (`AmqpUnroutableError`, `AmqpPublishNackError`, `AmqpPublishTimeoutError`, …), which let the application implement an "advance cursor after confirm" loop. This trade-off must be stated explicitly in the events-amqp guide. +- Per-message wire-format variability is intentionally unsupported. If a real use case appears, it is added per-adapter, not retrofitted into the core interface. + +## Alternatives Considered + +- **`adapterOptions?: Record` passthrough on `PublishOptions`** — rejected: kills type safety, silently ignores typos, becomes a junk drawer; contradicts the project's fail-fast discipline. +- **Module augmentation / declaration merging per adapter** — rejected: fragile under tsup `.d.ts` compilation, augmentations from two adapters in one app collide, poor discoverability. +- **Generic `EventBus`** — rejected: the generic goes viral through `EventBusOptions`, `createServer({ eventBus })`, and middleware for disproportionate cost. +- **`wireFormat: "binary" | "json"` on EventBus** — rejected as an anti-feature (see Decision 1). + +## References + +- [ADR-026: EventBus Architecture](./026-eventbus-architecture.md) +- [ADR-003: Package Decomposition](./003-package-decomposition.md) (layer rules) +- Change: `events-amqp-external-contract` diff --git a/en/contributing/adr/index.md b/en/contributing/adr/index.md index 1478a50..bbec388 100644 --- a/en/contributing/adr/index.md +++ b/en/contributing/adr/index.md @@ -25,6 +25,7 @@ Architecture Decision Records (ADRs) capture important design decisions with the | 024 | [Auth/Authz Strategy](/en/contributing/adr/024-auth-authz-strategy) | 2026-02-15 | @connectum/auth package with JWT, RBAC, context propagation | | 025 | [Package Versioning Strategy](/en/contributing/adr/025-package-versioning-strategy) | 2026-02-20 | Two-phase versioning: Fixed for rc, Hybrid after 1.0.0 stable | | 026 | [EventBus Architecture](/en/contributing/adr/026-eventbus-architecture) | 2026-03-07 | Proto-first EventBus with pluggable broker adapters | +| 027 | [External Contracts vs EventBus](/en/contributing/adr/027-external-contracts-vs-eventbus) | 2026-06-12 | External contracts at adapter layer; EventBus stays protobuf-only; remove `sync` | ## Creating a New ADR diff --git a/en/guide/events/adapters.md b/en/guide/events/adapters.md index 7f3dc09..fe79e41 100644 --- a/en/guide/events/adapters.md +++ b/en/guide/events/adapters.md @@ -194,7 +194,7 @@ const adapter = RedisAdapter({ ## AMQP / RabbitMQ Adapter -Uses the AMQP 0-9-1 protocol via [amqplib](https://amqp-node.github.io/amqplib/) for durable messaging with topic exchanges, competing consumers, and native dead letter exchange (DLX) support. +Uses the AMQP 0-9-1 protocol via [amqplib](https://amqp-node.github.io/amqplib/) for durable messaging with topic exchanges, competing consumers, and native dead letter exchange (DLX) support. Provides per-message publisher confirms, automatic connection recovery (enabled by default), explicit external topology (`topology` / `topologyMode` / `queueOverrides`), and serialization control for external AMQP contracts -- see [@connectum/events-amqp](/en/packages/events-amqp) for the full reference. ```bash pnpm add @connectum/events-amqp @@ -227,7 +227,16 @@ const adapter = AmqpAdapter({ | `exchangeOptions` | `AmqpExchangeOptions` | `undefined` | Exchange declaration options | | `queueOptions` | `AmqpQueueOptions` | `undefined` | Queue declaration options (durable, TTL, max length, DLX) | | `consumerOptions` | `AmqpConsumerOptions` | `undefined` | Consumer tuning (prefetch, exclusive) | -| `publisherOptions` | `AmqpPublisherOptions` | `undefined` | Publisher options (persistent, mandatory) | +| `publisherOptions` | `AmqpPublisherOptions` | `undefined` | Publisher options (persistent, mandatory, return correlation) | +| `serialization` | `AmqpSerializationOptions` | `undefined` | `contentType` label and optional wire transcoding | +| `topology` | `AmqpTopology` | `undefined` | Explicit topology: exchanges, queues with raw arguments, bindings | +| `topologyMode` | `"assert" \| "check" \| "skip"` | `"assert"` | How topology is established (declare / verify existence / none) | +| `queueOverrides` | `Record` | `undefined` | Map a consumer group to an externally named queue | +| `recovery` | `boolean \| AmqpRecoveryOptions` | `true` | Automatic connection recovery; `false` disables | +| `lifecycle` | `AmqpLifecycleCallbacks` | `undefined` | Connection lifecycle callbacks | +| `publishTimeoutMs` | `number` | `30000` | Per-publish broker-outcome deadline | + +Every `publish()` resolves on its own broker acknowledgement (per-message confirms) and rejects with a typed error (`AmqpUnroutableError`, `AmqpPublishNackError`, `AmqpPublishTimeoutError`, `AmqpConnectionError`). See the [package page](/en/packages/events-amqp) for the error taxonomy, recovery semantics, and an external-contract recipe. ### LavinMQ Compatibility diff --git a/en/packages/events-amqp.md b/en/packages/events-amqp.md index 26d5269..44322ae 100644 --- a/en/packages/events-amqp.md +++ b/en/packages/events-amqp.md @@ -5,7 +5,7 @@ description: AMQP / RabbitMQ adapter for @connectum/events EventBus # @connectum/events-amqp -AMQP 0-9-1 adapter for the Connectum EventBus. Provides persistent at-least-once delivery through [RabbitMQ](https://www.rabbitmq.com/) (or compatible brokers like [LavinMQ](https://lavinmq.com/)) using topic exchanges, competing consumers, dead letter exchanges, and metadata propagation via AMQP message headers. +AMQP 0-9-1 adapter for the Connectum EventBus. Provides persistent at-least-once delivery through [RabbitMQ](https://www.rabbitmq.com/) (or compatible brokers like [LavinMQ](https://lavinmq.com/)) using topic exchanges, competing consumers, dead letter exchanges, explicit external topology, automatic connection recovery, per-message publisher confirms, and metadata propagation via AMQP message headers. **Layer**: 2 (Broker Adapters) @@ -27,7 +27,7 @@ pnpm add @connectum/events-amqp **Peer dependency**: `@connectum/events` -**Transitive dependency**: [amqplib](https://www.npmjs.com/package/amqplib) (installed automatically) +**Transitive dependency**: [amqplib](https://www.npmjs.com/package/amqplib) `^2.0.1` (installed automatically; v2 provides native connection recovery) ## Quick Start @@ -72,12 +72,19 @@ Pass the result to `createEventBus({ adapter })`. |--------|------|---------|-------------| | `url` | `string` | *(required)* | AMQP connection URL (e.g., `"amqp://guest:guest@localhost:5672"`) | | `socketOptions` | `Record` | `undefined` | Low-level socket options passed to `amqplib.connect()` (TLS certificates, timeouts) | -| `exchange` | `string` | `"connectum.events"` | Exchange name. Created automatically on `connect()` if it does not exist | +| `exchange` | `string` | `"connectum.events"` | Exchange name. Created automatically on `connect()` if it does not exist (in `assert` mode) | | `exchangeType` | `"topic" \| "direct" \| "fanout" \| "headers"` | `"topic"` | Exchange type. `"topic"` enables wildcard routing keys | | `exchangeOptions` | `AmqpExchangeOptions` | `undefined` | Exchange declaration options | -| `queueOptions` | `AmqpQueueOptions` | `undefined` | Queue declaration options | +| `queueOptions` | `AmqpQueueOptions` | `undefined` | Default queue declaration options | | `consumerOptions` | `AmqpConsumerOptions` | `undefined` | Consumer tuning options | | `publisherOptions` | `AmqpPublisherOptions` | `undefined` | Publisher tuning options | +| `serialization` | `AmqpSerializationOptions` | `undefined` | `contentType` label and optional wire transcoding | +| `topology` | `AmqpTopology` | `undefined` | Explicit topology declared on connect (and re-applied after recovery) | +| `topologyMode` | `"assert" \| "check" \| "skip"` | `"assert"` | How topology is established | +| `queueOverrides` | `Record` | `undefined` | Map a consumer group to an externally named queue | +| `recovery` | `boolean \| AmqpRecoveryOptions` | `true` | Automatic connection recovery (amqplib native); `false` disables | +| `lifecycle` | `AmqpLifecycleCallbacks` | `undefined` | Connection lifecycle callbacks | +| `publishTimeoutMs` | `number` | `30000` | Per-publish broker-outcome deadline in milliseconds | ### `AmqpExchangeOptions` @@ -108,7 +115,87 @@ Pass the result to `createEventBus({ adapter })`. | Option | Type | Default | Description | |--------|------|---------|-------------| | `persistent` | `boolean` | `true` | Mark messages as persistent (`deliveryMode: 2`). Messages survive broker restarts when the queue is durable | -| `mandatory` | `boolean` | `false` | Return unroutable messages to the publisher via the `return` event | +| `mandatory` | `boolean` | `false` | Reject the publish with `AmqpUnroutableError` when the broker cannot route the message to any queue | +| `correlationHeader` | `boolean` | `true` | Correlate `basic.return` frames to mandatory publishes via a private `x-connectum-publish-id` header (visible on the wire); `false` switches to single-flight serialization of mandatory publishes | + +### `AmqpSerializationOptions` + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `contentType` | `string` | `"application/protobuf"` | AMQP `contentType` message property | +| `encode` | `(payload: Uint8Array) => Uint8Array` | `undefined` | Transform the outgoing wire body. Failures reject the publish with `AmqpSerializationError` | +| `decode` | `(content: Uint8Array) => Uint8Array` | `undefined` | Transform the incoming wire body before it reaches the handler. Failures nack the message without requeue (DLX or drop) | + +::: info contentType is a label, not a converter +The adapter receives payloads as bytes -- the EventBus serializes protobuf upstream. Setting `contentType: "application/json"` does not make the EventBus emit JSON. For external JSON contracts the application serializes JSON itself, publishes the pre-serialized bytes through the adapter directly, and sets `contentType` to match. See [External AMQP Contract](#external-amqp-contract). +::: + +### `AmqpTopology` + +Declarative topology applied on `connect()` and re-applied after every recovery: + +| Field | Type | Description | +|-------|------|-------------| +| `exchanges` | `AmqpExchangeDeclaration[]` | Exchanges: `name`, `type`, `durable`, `autoDelete`, raw `arguments` passthrough | +| `queues` | `AmqpQueueDeclaration[]` | Queues: `name`, `durable`, `autoDelete`, `exclusive`, raw `arguments` passthrough (e.g. `x-dead-letter-exchange`) | +| `bindings` | `AmqpBindingDeclaration[]` | Bindings from a `source` exchange with a `routingKey` to either a destination `queue` or a destination `exchange` (exchange-to-exchange) | + +::: warning Topology queues and subscribe() +Queues declared in `topology.queues` are asserted once with their full arguments when topology is applied. `subscribe()` does **not** re-assert them -- it only binds the subscription patterns. Re-asserting without the original arguments would be a conflicting redeclare (`PRECONDITION_FAILED` 406). +::: + +### Topology Modes + +| Mode | Behavior | +|------|----------| +| `"assert"` *(default)* | Declare topology idempotently (`assertExchange` / `assertQueue` / bind) | +| `"check"` | Existence-only verification (`checkExchange` / `checkQueue`); fails fast with `AmqpTopologyError` on missing objects | +| `"skip"` | No topology operations at all; the application owns topology | + +::: warning check mode verifies existence only +AMQP offers no passive introspection: `"check"` mode confirms that exchanges and queues *exist*, but argument equivalence and binding presence are NOT verifiable. A conflicting redeclare made elsewhere still fails with `PRECONDITION_FAILED` (406). +::: + +### `AmqpQueueOverride` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `queue` | `string` | *(required)* | Externally defined queue name to consume from | +| `arguments` | `Record` | `undefined` | Raw AMQP arguments used when asserting the queue (`assert` mode only) | +| `durable` | `boolean` | `true` | Queue durability | + +By default a consumer group consumes from `${exchange}.${group}`. A `queueOverrides` entry attaches the subscription to a queue from an external contract instead: + +```typescript +const adapter = AmqpAdapter({ + url: 'amqp://localhost:5672', + queueOverrides: { + partner: { queue: 'partner.inbound.v1' }, + }, +}); +// group "partner" now consumes from "partner.inbound.v1" +``` + +### `AmqpRecoveryOptions` + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `initialDelay` | `number` | `100` | First reconnect delay in ms | +| `maxDelay` | `number` | `30000` | Delay cap in ms | +| `factor` | `number` | `2` | Exponential backoff factor | +| `jitter` | `number` | `0.2` | Randomization factor (0..1) | +| `maxRetries` | `number` | `Infinity` | Give up after this many attempts | + +### `AmqpLifecycleCallbacks` + +| Callback | Signature | Fires when | +|----------|-----------|------------| +| `onConnected` | `() => void` | Connection established (initial connect and after each recovery) | +| `onDisconnected` | `(cause: Error) => void` | Connection lost | +| `onReconnecting` | `(info: { attempt: number; delay: number; error: Error }) => void` | A reconnect attempt is scheduled | +| `onReconnectFailed` | `(cause: Error) => void` | Recovery exhausted (`maxRetries` reached) | + +Connection errors are surfaced through these callbacks -- never console-only. ## Configuration Examples @@ -146,6 +233,19 @@ const adapter = AmqpAdapter({ persistent: true, mandatory: false, }, + recovery: { + initialDelay: 100, + maxDelay: 30_000, + factor: 2, + jitter: 0.2, + }, + lifecycle: { + onConnected: () => console.log('AMQP connected'), + onDisconnected: (cause) => console.error('AMQP disconnected', cause), + onReconnecting: ({ attempt, delay }) => console.warn(`Reconnect #${attempt} in ${delay}ms`), + onReconnectFailed: (cause) => console.error('AMQP recovery exhausted', cause), + }, + publishTimeoutMs: 30_000, }); ``` @@ -190,6 +290,108 @@ const adapter = AmqpAdapter({ }); ``` +## External AMQP Contract + +A complete recipe for integrating with an externally defined AMQP contract (AsyncAPI-style): direct exchange, named durable queue with DLQ arguments, JSON `contentType`, mandatory routing, and per-message confirms. The application serializes JSON itself and publishes through the adapter directly: + +```typescript +import { AmqpAdapter, AmqpUnroutableError } from '@connectum/events-amqp'; + +const adapter = AmqpAdapter({ + url: 'amqp://broker:5672', + exchange: 'partner.direct', + exchangeType: 'direct', + serialization: { contentType: 'application/json' }, + topology: { + exchanges: [{ name: 'partner.dlx', type: 'direct' }], + queues: [ + { name: 'partner.dead.v1', durable: true }, + { + name: 'partner.inbound.v1', + durable: true, + arguments: { + 'x-dead-letter-exchange': 'partner.dlx', + 'x-dead-letter-routing-key': 'inbound.dead', + }, + }, + ], + bindings: [ + { queue: 'partner.dead.v1', source: 'partner.dlx', routingKey: 'inbound.dead' }, + { queue: 'partner.inbound.v1', source: 'partner.direct', routingKey: 'inbound' }, + ], + }, + queueOverrides: { + partner: { queue: 'partner.inbound.v1' }, + }, + publisherOptions: { persistent: true, mandatory: true }, +}); + +await adapter.connect(); + +// Consume from the external queue (group "partner" → partner.inbound.v1) +await adapter.subscribe( + ['inbound'], + async (event, ack) => { + const message = JSON.parse(new TextDecoder().decode(event.payload)); + // ... + await ack(); + }, + { group: 'partner' }, +); + +// Publish pre-serialized JSON bytes; resolves on broker ack, +// rejects with AmqpUnroutableError if no queue is bound +const body = new TextEncoder().encode(JSON.stringify({ code: '0104603...' })); +await adapter.publish('inbound', body); +``` + +::: warning Wire-visible correlation header +With `mandatory: true` and the default `correlationHeader: true`, every mandatory publish carries a private `x-connectum-publish-id` header that external consumers will see. Either document the header in the contract, or set `publisherOptions.correlationHeader: false` for a clean wire -- mandatory publishes are then serialized single-flight (throughput trade-off). +::: + +## Reliable Publishing + +The adapter publishes on a confirm channel with **per-message confirms**: every `publish()` resolves when the broker acknowledges that specific message and rejects when the broker nacks it. There is no confirm batching -- each publish has its own outcome. + +- No broker outcome (ack/nack/return/connection loss) within `publishTimeoutMs` (default 30000 ms) → rejects with `AmqpPublishTimeoutError`. The message state is UNKNOWN -- it may or may not have been routed; an at-least-once producer should republish. +- Publishing during a disconnected window (or while recovery is in progress) fails fast with `AmqpConnectionError`. In-flight publishes at the moment of a connection loss also reject with `AmqpConnectionError`. +- With `mandatory: true`, an unroutable message rejects with `AmqpUnroutableError` (carries `.routingKey`). + +::: tip Per-message confirms +Confirms are always per-message: every `publish()` resolves on its own broker ack (or rejects with a typed error). There is no fire-and-forget mode. The legacy `PublishOptions.sync` flag was removed from `@connectum/events` ahead of the first stable release. +::: + +### Error Taxonomy + +Every terminal publish/topology outcome is distinguishable by error class -- what an at-least-once producer needs for an "advance cursor after confirm" pattern: + +| Error | Meaning | +|-------|---------| +| `AmqpAdapterError` | Base class for all adapter errors | +| `AmqpConnectionError` | Connection absent, lost, or recovery in progress / exhausted | +| `AmqpUnroutableError` | Broker returned a `mandatory` message as unroutable (`basic.return`); has `.routingKey` | +| `AmqpPublishNackError` | Broker negatively acknowledged (nacked) a published message | +| `AmqpPublishTimeoutError` | No broker outcome within `publishTimeoutMs`; message state UNKNOWN | +| `AmqpTopologyError` | Topology declaration or verification failed (missing object in `check` mode, conflicting redeclare in `assert` mode) | +| `AmqpSerializationError` | Payload encoding failed in a custom `serialization.encode` hook | + +## Connection Recovery + +Recovery is delegated to amqplib v2 native opt-in recovery and is **enabled by default**. Pass `recovery: false` to restore single-shot, no-reconnect behavior, or pass an `AmqpRecoveryOptions` object to tune the backoff. + +On every successful (re)connect the adapter: + +1. Re-creates its publish and consumer channels. +2. Re-applies topology (per `topologyMode`). +3. Replays all active subscriptions. + +Connection behavior: + +- **With recovery enabled**, `connect()` retries with backoff until the broker becomes reachable -- convenient for `docker-compose` startup ordering where the broker may not be up yet. +- **With `recovery: false`**, `connect()` rejects immediately if the broker is unreachable, and a lost connection is not restored. + +Observe connection state through the `lifecycle` callbacks (`onConnected`, `onDisconnected`, `onReconnecting`, `onReconnectFailed`). + ## Adapter Lifecycle The `AmqpAdapter` follows the `EventAdapter` interface lifecycle managed by `createEventBus()`: @@ -200,13 +402,13 @@ connect() → publish() / subscribe() → disconnect() | Method | Description | |--------|-------------| -| `connect(context?)` | Opens an AMQP connection, creates a channel, asserts the exchange. Uses `context.serviceName` as `clientProperties.connection_name` if not set explicitly. | -| `disconnect()` | Cancels all active consumers, closes the channel and connection | -| `publish()` | Publishes a serialized event to the exchange with `eventType` as the routing key. Metadata is propagated as AMQP message headers | -| `subscribe()` | Declares queues (named or auto-delete), binds them to the exchange with topic patterns, and starts consuming with explicit ack/nack | +| `connect(context?)` | Opens an AMQP connection (retrying per recovery policy), creates the confirm channel, applies topology. Uses `context.serviceName` as `clientProperties.connection_name` if not set explicitly. | +| `disconnect()` | Cancels all active consumers, closes channels and the connection | +| `publish()` | Publishes a serialized event to the exchange with `eventType` as the routing key; resolves on the broker's per-message confirm. Metadata is propagated as AMQP message headers | +| `subscribe()` | Declares queues (named, auto-delete, or contract-overridden), binds them to the exchange with topic patterns, and starts consuming with explicit ack/nack | ::: info Automatic Exchange Creation -On `connect()`, the adapter asserts the configured exchange (default: `"connectum.events"` with type `"topic"`). If the exchange does not exist, it is created. For production, you may want to pre-create exchanges with specific policies via RabbitMQ management. +On `connect()`, the adapter asserts the configured exchange (default: `"connectum.events"` with type `"topic"`) plus any explicit `topology` -- in the default `"assert"` mode. If your topology is provisioned externally (RabbitMQ management, IaC), use `topologyMode: "check"` to fail fast on missing objects, or `"skip"` to perform no topology operations at all. ::: ## AMQP Concepts @@ -221,6 +423,7 @@ An **exchange** receives published messages and routes them to bound queues base A **queue** stores messages until they are consumed. The adapter creates queues automatically: - **With `group`**: named queue `{exchange}.{group}` -- durable, shared across instances (competing consumers) +- **With `group` + `queueOverrides[group]`**: the externally named queue from the override (external contract) - **Without `group`**: anonymous auto-delete queue -- exclusive to this consumer, deleted on disconnect ### Routing Keys @@ -266,11 +469,13 @@ const adapter = AmqpAdapter({ }); ``` +For externally defined DLQ topologies, declare the queue with raw arguments instead (see [External AMQP Contract](#external-amqp-contract)). + This works alongside the EventBus-level DLQ middleware. For broker-native DLQ handling, configure `queueOptions.deadLetterExchange`. For application-level DLQ, use the `middleware.dlq` option in `createEventBus()`. ### At-Least-Once Delivery -The adapter uses **manual acknowledgment** (AMQP `noAck: false`). Each message must be acknowledged by the handler. If the handler throws an error, the message is negatively acknowledged (`nack`) and requeued for redelivery (subject to `maxDeliver` / retry limits). +The adapter uses **manual acknowledgment** (AMQP `noAck: false`). Each message must be acknowledged by the handler. If the handler throws an error, the message is negatively acknowledged (`nack`) and requeued for redelivery (subject to `maxDeliver` / retry limits). A message that fails a custom `serialization.decode` hook is nacked without requeue (DLX or drop) -- a payload that cannot be decoded will never succeed. ### Delivery Attempts @@ -307,17 +512,32 @@ const routes = (events: EventRouter) => { }; ``` +Internal headers (`x-event-id`, `x-published-at`, `x-connectum-publish-id`) are set by the adapter on publish and stripped from metadata on delivery. + ## Exports Summary | Export | Description | |--------|-------------| | `AmqpAdapter` | Factory function that creates an AMQP adapter | | `toAmqpPattern` | Converts EventBus wildcard pattern (`>`) to AMQP routing key pattern (`#`) | +| `AmqpTopologyMode` | Topology mode constants (`ASSERT` / `CHECK` / `SKIP`) | +| `AmqpAdapterError` | Base class for all adapter errors | +| `AmqpConnectionError` | Connection absent, lost, or recovery in progress / exhausted | +| `AmqpUnroutableError` | Mandatory message returned as unroutable (has `.routingKey`) | +| `AmqpPublishNackError` | Broker nacked a published message | +| `AmqpPublishTimeoutError` | No broker outcome within `publishTimeoutMs` | +| `AmqpTopologyError` | Topology declaration or verification failed | +| `AmqpSerializationError` | Payload encoding/decoding failed in a custom hook | | `AmqpAdapterOptions` | Configuration options type | | `AmqpExchangeOptions` | Exchange declaration options type | | `AmqpQueueOptions` | Queue declaration options type | | `AmqpConsumerOptions` | Consumer tuning options type | | `AmqpPublisherOptions` | Publisher tuning options type | +| `AmqpSerializationOptions` | Serialization options type | +| `AmqpTopology` | Declarative topology type (with `AmqpExchangeDeclaration`, `AmqpQueueDeclaration`, `AmqpBindingDeclaration`) | +| `AmqpQueueOverride` | External queue override type | +| `AmqpRecoveryOptions` | Recovery backoff options type | +| `AmqpLifecycleCallbacks` | Connection lifecycle callbacks type | ## Related Packages diff --git a/en/packages/events.md b/en/packages/events.md index 00b31dc..ab48647 100644 --- a/en/packages/events.md +++ b/en/packages/events.md @@ -150,7 +150,6 @@ function createEventBus(options: EventBusOptions): EventBus & EventBusLike; |--------|------|---------|-------------| | `topic` | `string` | `schema.typeName` | Override topic name | | `key` | `string` | `undefined` | Partition/routing key for ordered delivery | -| `sync` | `boolean` | `false` | Wait for broker confirmation | | `group` | `string` | `undefined` | Named group tag for workflow grouping | | `metadata` | `Record` | `undefined` | Additional metadata / headers |