From 52ab92e4a8618da9dc6de2e1e480d7ec18dce287 Mon Sep 17 00:00:00 2001 From: Johnalex-hub Date: Tue, 30 Jun 2026 10:13:38 +0100 Subject: [PATCH 1/2] wip(183): scaffold fanout service --- apps/backend/src/services/fanout.ts | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 apps/backend/src/services/fanout.ts diff --git a/apps/backend/src/services/fanout.ts b/apps/backend/src/services/fanout.ts new file mode 100644 index 0000000..536e16e --- /dev/null +++ b/apps/backend/src/services/fanout.ts @@ -0,0 +1,3 @@ +// #183 — fan-out service: resolve members -> devices -> envelopes +// Implementation in progress. +export {}; From 28a46c1823fc86afc53ec7546705a2eebafe34e3 Mon Sep 17 00:00:00 2001 From: Johnalex-hub Date: Thu, 2 Jul 2026 07:45:16 +0100 Subject: [PATCH 2/2] feat(183): implement fan-out service with device-set validation and atomic envelope insert --- apps/backend/src/services/fanout.ts | 93 ++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/services/fanout.ts b/apps/backend/src/services/fanout.ts index 536e16e..3cd0ef0 100644 --- a/apps/backend/src/services/fanout.ts +++ b/apps/backend/src/services/fanout.ts @@ -1,3 +1,92 @@ // #183 — fan-out service: resolve members -> devices -> envelopes -// Implementation in progress. -export {}; +// +// Given an unpersisted message and a sender-provided map of +// { recipientDeviceId -> ciphertext }, validates that the client encrypted +// to exactly the conversation's current active recipient devices (including +// the sender's *other* devices, for multi-device self-sync — but excluding +// the device that is doing the sending). If the client's device set is +// stale, returns device_set_mismatch with the authoritative device list +// instead of guessing or dropping ciphertext. On success, the message and +// its envelopes are persisted atomically. + +import { and, eq, inArray, isNull } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { conversationMembers, messages, messageEnvelopes, userDevices } from '../db/schema.js'; +import type { Message, NewMessage } from '../db/schema.js'; + +export interface FanoutSuccess { + ok: true; + message: Message; +} + +export interface FanoutDeviceSetMismatch { + ok: false; + error: 'device_set_mismatch'; + expectedDeviceIds: string[]; +} + +export type FanoutResult = FanoutSuccess | FanoutDeviceSetMismatch; + +/** + * Persists `newMessage` and its per-device envelopes in a single transaction, + * after verifying `envelopeCiphertexts` covers exactly the conversation's + * current active recipient devices. + * + * @param newMessage - Message row to insert (id may be omitted; defaultRandom). + * @param senderDeviceId - The device sending this message; excluded from the + * authoritative recipient set (it doesn't need its own envelope). + * @param envelopeCiphertexts - Sender-provided map of recipientDeviceId -> ciphertext. + */ +export async function fanoutMessage( + newMessage: NewMessage, + senderDeviceId: string | null, + envelopeCiphertexts: Record, +): Promise { + const members = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.conversationId, newMessage.conversationId), + columns: { userId: true }, + }); + const memberIds = members.map((m) => m.userId); + + const activeDevices = await db.query.userDevices.findMany({ + where: and(inArray(userDevices.userId, memberIds), isNull(userDevices.revokedAt)), + columns: { id: true, userId: true }, + }); + + const expectedDevices = activeDevices.filter((d) => d.id !== senderDeviceId); + const deviceToUser = new Map(expectedDevices.map((d) => [d.id, d.userId])); + const expectedDeviceIds = new Set(deviceToUser.keys()); + + const providedDeviceIds = Object.keys(envelopeCiphertexts); + const setsMatch = + providedDeviceIds.length === expectedDeviceIds.size && + providedDeviceIds.every((id) => expectedDeviceIds.has(id)); + + if (!setsMatch) { + return { + ok: false, + error: 'device_set_mismatch', + expectedDeviceIds: [...expectedDeviceIds], + }; + } + + const message = await db.transaction(async (tx) => { + const [inserted] = await tx.insert(messages).values(newMessage).returning(); + const persisted = inserted!; + + const envelopeRows = providedDeviceIds.map((deviceId) => ({ + messageId: persisted.id, + recipientDeviceId: deviceId, + recipientUserId: deviceToUser.get(deviceId)!, + ciphertext: envelopeCiphertexts[deviceId]!, + })); + + if (envelopeRows.length > 0) { + await tx.insert(messageEnvelopes).values(envelopeRows); + } + + return persisted; + }); + + return { ok: true, message }; +}