Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions apps/backend/src/__tests__/conversations.cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,14 @@ describe('GET /conversations/:id/search', () => {
vi.clearAllMocks();
});

it('returns 501 for E2EE environments', async () => {
it('returns 410 Gone for E2EE environments', async () => {
const res = await request(makeApp()).get('/conversations/conv-1/search?q=hello');

expect(res.status).toBe(501);
expect(res.status).toBe(410);
expect(res.body).toEqual({
error: 'Server-side search removed; search is now client-side over decrypted messages',
docs: 'https://github.com/DripWave/clicked/blob/main/docs/message-encryption-migration.md'
});
});
});

Expand Down
5 changes: 4 additions & 1 deletion apps/backend/src/routes/conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,10 @@ conversationsRouter.get('/:id/messages', async (req: AuthRequest, res) => {
});

conversationsRouter.get('/:id/search', async (req: AuthRequest, res) => {
res.status(501).json({ error: 'Search is not supported in E2EE conversations' });
res.status(410).json({
error: 'Server-side search removed; search is now client-side over decrypted messages',
docs: 'https://github.com/DripWave/clicked/blob/main/docs/message-encryption-migration.md'
});
});

// PATCH /conversations/:id/settings — update muted/archived state for the authenticated user
Expand Down
3 changes: 2 additions & 1 deletion apps/backend/src/routes/devices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ async function emitDeviceChangeEvent(userId: string, change: 'device_added' | 'd
.values({
conversationId: m.conversationId,
senderId: userId,
content: JSON.stringify({ userId, change }),
contentType: 'system',
ciphertext: JSON.stringify({ userId, change }),
})
.returning();

Expand Down
76 changes: 44 additions & 32 deletions apps/backend/src/routes/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,39 +61,51 @@ messagesRouter.post('/', validate(SendMessageSchema), async (req: AuthRequest, r
return;
}

// ── persist ────────────────────────────────────────────────────────────────
const [message] = await db
.insert(messages)
.values({
id: messageId,
conversationId,
senderId: userId,
senderDeviceId: deviceId ?? null,
contentType: contentType?.trim().toLowerCase() || 'text',
ciphertext: ciphertext || null,
})
.returning();

if (envelopes && envelopes.length > 0) {
const deviceIds = envelopes.map((e) => e.recipientDeviceId);
const devicesList = await db.query.userDevices.findMany({
where: inArray(userDevices.id, deviceIds),
columns: { id: true, userId: true },
// ── persist in transaction ─────────────────────────────────────────────────
let message;
try {
message = await db.transaction(async (tx) => {
const [insertedMessage] = await tx
.insert(messages)
.values({
id: messageId,
conversationId,
senderId: userId,
senderDeviceId: deviceId ?? null,
contentType: contentType?.trim().toLowerCase() || 'text',
ciphertext: ciphertext || null,
fileId: fileId ?? null,
})
.returning();

if (envelopes && envelopes.length > 0) {
const deviceIds = envelopes.map((e) => e.recipientDeviceId);
const devicesList = await tx.query.userDevices.findMany({
where: inArray(userDevices.id, deviceIds),
columns: { id: true, userId: true },
});
const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId]));

const validEnvelopes = envelopes
.filter((env) => deviceToUser.has(env.recipientDeviceId))
.map((env) => ({
messageId,
recipientDeviceId: env.recipientDeviceId,
recipientUserId: deviceToUser.get(env.recipientDeviceId)!,
ciphertext: env.ciphertext,
}));

if (validEnvelopes.length > 0) {
await tx.insert(messageEnvelopes).values(validEnvelopes);
}
}

return insertedMessage;
});
const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId]));

const validEnvelopes = envelopes
.filter((env) => deviceToUser.has(env.recipientDeviceId))
.map((env) => ({
messageId,
recipientDeviceId: env.recipientDeviceId,
recipientUserId: deviceToUser.get(env.recipientDeviceId)!,
ciphertext: env.ciphertext,
}));

if (validEnvelopes.length > 0) {
await db.insert(messageEnvelopes).values(validEnvelopes);
}
} catch (error) {
console.error('Transaction failed for message insert:', error);
res.status(500).json({ error: 'Failed to persist message' });
return;
}

// ── broadcast via Socket.IO ────────────────────────────────────────────────
Expand Down
221 changes: 135 additions & 86 deletions apps/backend/src/socket/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,63 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
fileId = fileRow?.id ?? payloadFileId;
}

const [message] = await db
.insert(messages)
.values({
id: messageId,
conversationId,
senderId: userId,
senderDeviceId: deviceId,
contentType: resolvedContentType,
ciphertext: effectiveCiphertext,
fileId: fileId ?? null,
})
.returning();

let message;
let recipientDeviceIds: string[] = [];
try {
message = await db.transaction(async (tx) => {
const [insertedMessage] = await tx
.insert(messages)
.values({
id: messageId,
conversationId,
senderId: userId,
senderDeviceId: deviceId,
contentType: resolvedContentType,
ciphertext: effectiveCiphertext,
fileId: fileId ?? null,
})
.returning();

if (envelopes && envelopes.length > 0) {
const deviceIds = envelopes.map((e) => e.recipientDeviceId);
const devicesList = await tx.query.userDevices.findMany({
where: inArray(userDevices.id, deviceIds),
columns: { id: true, userId: true },
});
const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId]));

const validEnvelopes = envelopes
.filter((env) => deviceToUser.has(env.recipientDeviceId))
.map((env) => ({
messageId,
recipientDeviceId: env.recipientDeviceId,
recipientUserId: deviceToUser.get(env.recipientDeviceId)!,
ciphertext: env.ciphertext,
}));

if (validEnvelopes.length > 0) {
await tx.insert(messageEnvelopes).values(validEnvelopes);
recipientDeviceIds = validEnvelopes.map((e) => e.recipientDeviceId);
}
}

if (envelopes && envelopes.length > 0) {
return insertedMessage;
});
} catch (error) {
console.error('Transaction failed for message insert:', error);
socket.emit('error', { event: 'send_message', message: 'Failed to persist message' });
return;
}

if (!message) {
socket.emit('error', { event: 'send_message', message: 'Failed to persist message' });
return;
}

socket.emit('message_ack', { messageId, sequenceNumber: message.sequenceNumber });

// Publish to Redis after transaction commit
if (redis && envelopes && envelopes.length > 0) {
const deviceIds = envelopes.map((e) => e.recipientDeviceId);
const devicesList = await db.query.userDevices.findMany({
where: inArray(userDevices.id, deviceIds),
Expand All @@ -188,31 +229,16 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
ciphertext: env.ciphertext,
}));

if (validEnvelopes.length > 0) {
await db.insert(messageEnvelopes).values(validEnvelopes);

if (redis && message) {
for (const env of validEnvelopes) {
publishToDevice(redis, env.recipientDeviceId, {
messageId: message.id,
conversationId,
ciphertext: env.ciphertext,
sequenceNumber: message.sequenceNumber,
}).catch(() => {});
}
}

recipientDeviceIds = validEnvelopes.map((e) => e.recipientDeviceId);
for (const env of validEnvelopes) {
publishToDevice(redis, env.recipientDeviceId, {
messageId: message.id,
conversationId,
ciphertext: env.ciphertext,
sequenceNumber: message.sequenceNumber,
}).catch(() => {});
}
}

if (!message) {
socket.emit('error', { event: 'send_message', message: 'Failed to persist message' });
return;
}

socket.emit('message_ack', { messageId, sequenceNumber: message.sequenceNumber });

await deliverMessage(io, message, conversationId);

const members = await db.query.conversationMembers.findMany({
Expand Down Expand Up @@ -279,44 +305,54 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
return;
}

const [message] = await db
.insert(messages)
.values({
id: messageId,
conversationId,
senderId: userId,
senderDeviceId: deviceId,
contentType: contentType || original.contentType,
ciphertext: ciphertext || null,
editsMessageId: rootMessageId,
})
.returning();

let message;
let recipientDeviceIds: string[] = [];
try {
message = await db.transaction(async (tx) => {
const [insertedMessage] = await tx
.insert(messages)
.values({
id: messageId,
conversationId,
senderId: userId,
senderDeviceId: deviceId,
contentType: contentType || original.contentType,
ciphertext: ciphertext || null,
editsMessageId: rootMessageId,
})
.returning();

if (envelopes && envelopes.length > 0) {
const deviceIds = envelopes.map((e) => e.recipientDeviceId);

const devicesList = await tx.query.userDevices.findMany({
where: inArray(userDevices.id, deviceIds),
columns: { id: true, userId: true },
});

const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId]));

const validEnvelopes = envelopes
.filter((env) => deviceToUser.has(env.recipientDeviceId))
.map((env) => ({
messageId,
recipientDeviceId: env.recipientDeviceId,
recipientUserId: deviceToUser.get(env.recipientDeviceId)!,
ciphertext: env.ciphertext,
}));

if (envelopes && envelopes.length > 0) {
const deviceIds = envelopes.map((e) => e.recipientDeviceId);
if (validEnvelopes.length > 0) {
await tx.insert(messageEnvelopes).values(validEnvelopes);
recipientDeviceIds = validEnvelopes.map((e) => e.recipientDeviceId);
}
}

const devicesList = await db.query.userDevices.findMany({
where: inArray(userDevices.id, deviceIds),
columns: { id: true, userId: true },
return insertedMessage;
});

const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId]));

const validEnvelopes = envelopes
.filter((env) => deviceToUser.has(env.recipientDeviceId))
.map((env) => ({
messageId,
recipientDeviceId: env.recipientDeviceId,
recipientUserId: deviceToUser.get(env.recipientDeviceId)!,
ciphertext: env.ciphertext,
}));

if (validEnvelopes.length > 0) {
await db.insert(messageEnvelopes).values(validEnvelopes);
recipientDeviceIds = validEnvelopes.map((e) => e.recipientDeviceId);
}
} catch (error) {
console.error('Transaction failed for message edit:', error);
socket.emit('error', { event: 'edit_message', message: 'Failed to persist message edit' });
return;
}

if (message) {
Expand Down Expand Up @@ -429,25 +465,38 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
return;
}

const [message] = await db
.insert(messages)
.values({
conversationId,
senderId: userId,
content: content.trim(),
contentType,
fileId,
})
.returning();
let message;
try {
message = await db.transaction(async (tx) => {
const [insertedMessage] = await tx
.insert(messages)
.values({
conversationId,
senderId: userId,
ciphertext: content.trim(),
contentType,
fileId,
})
.returning();

return insertedMessage;
});
} catch (error) {
console.error('Transaction failed for file message:', error);
socket.emit('error', { event: 'send_file_message', message: 'Failed to persist file message' });
return;
}

io.to(conversationId).emit('new_message', message);
if (message) {
io.to(conversationId).emit('new_message', message);

const members = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.conversationId, conversationId),
columns: { userId: true },
});
const members = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.conversationId, conversationId),
columns: { userId: true },
});

await invalidateConversationCaches(members.map((member) => member.userId));
await invalidateConversationCaches(members.map((member) => member.userId));
}
},
);

Expand Down
Loading