Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vitepress/config/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ export const enConfig: LocaleSpecificConfig<DefaultTheme.Config> = {
{ text: 'ADR-024: Auth & Authz Strategy', link: '/en/contributing/adr/024-auth-authz-strategy' },
{ text: 'ADR-025: Package Versioning Strategy', link: '/en/contributing/adr/025-package-versioning-strategy' },
{ text: 'ADR-026: EventBus Architecture', link: '/en/contributing/adr/026-eventbus-architecture' },
{ text: 'ADR-027: External Contracts vs EventBus', link: '/en/contributing/adr/027-external-contracts-vs-eventbus' },
],
},
],
Expand Down
58 changes: 58 additions & 0 deletions en/contributing/adr/027-external-contracts-vs-eventbus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# ADR-027: External Message Contracts vs the Internal EventBus

## Status

Accepted -- 2026-06-12

## Context

[ADR-026](./026-eventbus-architecture.md) established the EventBus as a proto-first, adapter-based internal event system: `EventBus.publish()` takes a protobuf message, serializes it with `toBinary`, and routes it by proto type. The four adapters (NATS, Kafka, Redis, AMQP) implement a deliberately minimal `EventAdapter` interface that moves opaque `Uint8Array` payloads.

A production adopter then needed to publish to a **partner's** broker under an externally agreed AsyncAPI contract: a named exchange and durable queue, `contentType: application/json`, `deliveryMode=2`, `mandatory`, and per-message publisher confirms. This drove the `events-amqp-external-contract` change, which added adapter-level serialization (`contentType` + `encode`/`decode`), explicit topology declaration, `queueOverrides`, automatic recovery, and reliable publishing to `@connectum/events-amqp`.

That work surfaced a recurring design question that will reappear for every adapter:

- Should the **core** `@connectum/events` interfaces grow to express external-contract variability — per-publish `contentType`, per-subscribe queue names, a `wireFormat: "binary" | "json"` switch on `EventBus`?
- Or do external contracts belong to a different layer than the internal bus?

Without a stated principle, each adapter would answer this differently, and the core interfaces would accrete passthrough options that weaken the typed proto contract.

## Decision

**The EventBus is an internal protobuf bus. External message contracts are served at the adapter layer (or by publishing directly through an adapter), not by extending the core EventBus interfaces.**

Concretely:

1. **`EventBus.publish()` stays protobuf-only.** No `wireFormat` switch is introduced. Putting JSON (or any non-protobuf encoding) on the bus would force every subscriber, middleware, retry, and DLQ path to understand multiple wire formats, and would make `RawEvent.payload` ambiguous. protobuf-es `toJson` only solves the producer side; the consumer side would require format detection (content-sniffing), which the AMQP change already rejected as implicit magic that must fail loudly instead.

2. **External contracts are produced through the adapter.** When an application must emit JSON (or a partner's contentType) against an external contract, it serializes the bytes itself and publishes through the adapter, which controls the wire `contentType` and optional `encode`/`decode` transcoding. This is an outbound producer to a foreign system — it does not need the EventBus's typed routing, middleware, or DLQ.

3. **Core interfaces stay minimal; variability lives in adapter options.** `PublishOptions` / `RawSubscribeOptions` are not extended with generic passthrough bags, declaration merging, or a viral `EventBus<TAdapterOpts>` generic. Contract variability that is genuinely whole-channel (the whole queue is JSON, the whole group reads a named queue) is expressed by adapter-level options (`serialization`, `queueOverrides`, `topology`). A future need for genuine *per-message* variability — if one ever arises with a concrete use case — would be added as typed options on a specific adapter's constructor, not on the core interface.

4. **`PublishOptions.sync` is removed.** It was a no-op across all four adapters: each already confirms publishes per-message (NATS `PubAck`, Kafka `producer.send`, Redis `XADD`, AMQP per-message broker ack with typed errors). A resolved `publish()` already means the broker accepted the message; there was no fire-and-forget mode to opt out of. Removed ahead of the first stable release.

## Consequences

### Positive

- The internal bus keeps one wire format; subscribers, middleware, retry, and DLQ reason about exactly one payload shape.
- Core `EventAdapter` / `PublishOptions` interfaces stay small and typed — no passthrough erosion across four heterogeneous adapters.
- External-contract concerns (content type, topology, confirms, recovery) are isolated to the adapter that actually talks to the foreign broker.

### Negative / Trade-offs

- **Direct adapter publishing bypasses EventBus middleware** (producer-side retry / DLQ). For an external at-least-once producer this is compensated by the adapter's per-message confirms and typed error taxonomy (`AmqpUnroutableError`, `AmqpPublishNackError`, `AmqpPublishTimeoutError`, …), which let the application implement an "advance cursor after confirm" loop. This trade-off must be stated explicitly in the events-amqp guide.
- Per-message wire-format variability is intentionally unsupported. If a real use case appears, it is added per-adapter, not retrofitted into the core interface.

## Alternatives Considered

- **`adapterOptions?: Record<string, unknown>` passthrough on `PublishOptions`** — rejected: kills type safety, silently ignores typos, becomes a junk drawer; contradicts the project's fail-fast discipline.
- **Module augmentation / declaration merging per adapter** — rejected: fragile under tsup `.d.ts` compilation, augmentations from two adapters in one app collide, poor discoverability.
- **Generic `EventBus<TAdapterOpts>`** — rejected: the generic goes viral through `EventBusOptions`, `createServer({ eventBus })`, and middleware for disproportionate cost.
- **`wireFormat: "binary" | "json"` on EventBus** — rejected as an anti-feature (see Decision 1).

## References

- [ADR-026: EventBus Architecture](./026-eventbus-architecture.md)
- [ADR-003: Package Decomposition](./003-package-decomposition.md) (layer rules)
- Change: `events-amqp-external-contract`
1 change: 1 addition & 0 deletions en/contributing/adr/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Architecture Decision Records (ADRs) capture important design decisions with the
| 024 | [Auth/Authz Strategy](/en/contributing/adr/024-auth-authz-strategy) | 2026-02-15 | @connectum/auth package with JWT, RBAC, context propagation |
| 025 | [Package Versioning Strategy](/en/contributing/adr/025-package-versioning-strategy) | 2026-02-20 | Two-phase versioning: Fixed for rc, Hybrid after 1.0.0 stable |
| 026 | [EventBus Architecture](/en/contributing/adr/026-eventbus-architecture) | 2026-03-07 | Proto-first EventBus with pluggable broker adapters |
| 027 | [External Contracts vs EventBus](/en/contributing/adr/027-external-contracts-vs-eventbus) | 2026-06-12 | External contracts at adapter layer; EventBus stays protobuf-only; remove `sync` |

## Creating a New ADR

Expand Down
13 changes: 11 additions & 2 deletions en/guide/events/adapters.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ const adapter = RedisAdapter({

## AMQP / RabbitMQ Adapter

Uses the AMQP 0-9-1 protocol via [amqplib](https://amqp-node.github.io/amqplib/) for durable messaging with topic exchanges, competing consumers, and native dead letter exchange (DLX) support.
Uses the AMQP 0-9-1 protocol via [amqplib](https://amqp-node.github.io/amqplib/) for durable messaging with topic exchanges, competing consumers, and native dead letter exchange (DLX) support. Provides per-message publisher confirms, automatic connection recovery (enabled by default), explicit external topology (`topology` / `topologyMode` / `queueOverrides`), and serialization control for external AMQP contracts -- see [@connectum/events-amqp](/en/packages/events-amqp) for the full reference.

```bash
pnpm add @connectum/events-amqp
Expand Down Expand Up @@ -227,7 +227,16 @@ const adapter = AmqpAdapter({
| `exchangeOptions` | `AmqpExchangeOptions` | `undefined` | Exchange declaration options |
| `queueOptions` | `AmqpQueueOptions` | `undefined` | Queue declaration options (durable, TTL, max length, DLX) |
| `consumerOptions` | `AmqpConsumerOptions` | `undefined` | Consumer tuning (prefetch, exclusive) |
| `publisherOptions` | `AmqpPublisherOptions` | `undefined` | Publisher options (persistent, mandatory) |
| `publisherOptions` | `AmqpPublisherOptions` | `undefined` | Publisher options (persistent, mandatory, return correlation) |
| `serialization` | `AmqpSerializationOptions` | `undefined` | `contentType` label and optional wire transcoding |
| `topology` | `AmqpTopology` | `undefined` | Explicit topology: exchanges, queues with raw arguments, bindings |
| `topologyMode` | `"assert" \| "check" \| "skip"` | `"assert"` | How topology is established (declare / verify existence / none) |
| `queueOverrides` | `Record<string, AmqpQueueOverride>` | `undefined` | Map a consumer group to an externally named queue |
| `recovery` | `boolean \| AmqpRecoveryOptions` | `true` | Automatic connection recovery; `false` disables |
| `lifecycle` | `AmqpLifecycleCallbacks` | `undefined` | Connection lifecycle callbacks |
| `publishTimeoutMs` | `number` | `30000` | Per-publish broker-outcome deadline |

Every `publish()` resolves on its own broker acknowledgement (per-message confirms) and rejects with a typed error (`AmqpUnroutableError`, `AmqpPublishNackError`, `AmqpPublishTimeoutError`, `AmqpConnectionError`). See the [package page](/en/packages/events-amqp) for the error taxonomy, recovery semantics, and an external-contract recipe.

### LavinMQ Compatibility

Expand Down
Loading
Loading