Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hub/src/sse/sseManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export class SSEManager {
}
}

if (event.type === 'message-received') {
if (event.type === 'message-received' || event.type === 'scheduled-matured') {
return connection.all || connection.sessionId === event.sessionId
}

Expand Down
10 changes: 9 additions & 1 deletion hub/src/store/messageStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Database } from 'bun:sqlite'

import type { StoredMessage } from './types'
import { addMessage, cancelQueuedMessage, deleteQueuedMessageById, lookupQueuedMessage, getMessages, getFirstMessages, getDeliverableMessagesAfter, getMessagesByPosition, getUninvokedLocalMessages, getMatureScheduledMessages, getImmediateQueuedLocalMessages, markMessagesInvoked, mergeSessionMessages, type CancelQueuedMessageResult, type LookupQueuedMessageResult } from './messages'
import { addMessage, cancelQueuedMessage, deleteQueuedMessageById, lookupQueuedMessage, getMessages, getFirstMessages, getDeliverableMessagesAfter, getMessagesByPosition, getUninvokedLocalMessages, getMatureScheduledMessages, getImmediateQueuedLocalMessages, countFutureScheduledBySessionIds, countFutureScheduledLocalMessages, markMessagesInvoked, mergeSessionMessages, type CancelQueuedMessageResult, type LookupQueuedMessageResult } from './messages'

export class MessageStore {
private readonly db: Database
Expand Down Expand Up @@ -42,6 +42,14 @@ export class MessageStore {
return getImmediateQueuedLocalMessages(this.db, sessionId)
}

countFutureScheduledLocalMessages(sessionId: string, now: number = Date.now()): number {
return countFutureScheduledLocalMessages(this.db, sessionId, now)
}

countFutureScheduledBySessionIds(sessionIds: string[], now: number = Date.now()): Map<string, number> {
return countFutureScheduledBySessionIds(this.db, sessionIds, now)
}

cancelQueuedMessage(sessionId: string, messageId: string): CancelQueuedMessageResult {
return cancelQueuedMessage(this.db, sessionId, messageId)
}
Expand Down
57 changes: 57 additions & 0 deletions hub/src/store/messages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,60 @@ describe('getDeliverableMessagesAfter: CLI backfill excludes future-scheduled ro
expect(empty).toHaveLength(0)
})
})

describe('countFutureScheduledLocalMessages', () => {
it('counts only future scheduled uninvoked local messages', () => {
const store = makeStore()
const session = makeSession(store, 'sched-count')
const now = Date.now()

store.messages.addMessage(
session.id,
{ role: 'user', content: { type: 'text', text: 'immediate queued' } },
'local-immediate'
)
store.messages.addMessage(
session.id,
{ role: 'user', content: { type: 'text', text: 'future scheduled' } },
'local-future',
now + 60_000
)
store.messages.addMessage(
session.id,
{ role: 'user', content: { type: 'text', text: 'mature scheduled' } },
'local-mature',
now - 1
)

expect(store.messages.countFutureScheduledLocalMessages(session.id, now)).toBe(1)
})

it('batch query returns counts keyed by session id', () => {
const store = makeStore()
const sessionA = makeSession(store, 'sched-batch-a')
const sessionB = makeSession(store, 'sched-batch-b')
const now = Date.now()

store.messages.addMessage(
sessionA.id,
{ role: 'user', content: { type: 'text', text: 'a1' } },
'a-1',
now + 60_000
)
store.messages.addMessage(
sessionA.id,
{ role: 'user', content: { type: 'text', text: 'a2' } },
'a-2',
now + 120_000
)
store.messages.addMessage(
sessionB.id,
{ role: 'user', content: { type: 'text', text: 'immediate' } },
'b-1'
)

const counts = store.messages.countFutureScheduledBySessionIds([sessionA.id, sessionB.id], now)
expect(counts.get(sessionA.id)).toBe(2)
expect(counts.get(sessionB.id)).toBeUndefined()
})
})
47 changes: 47 additions & 0 deletions hub/src/store/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,53 @@ export function getImmediateQueuedLocalMessages(
return rows.map(toStoredMessage)
}

/** Count uninvoked local messages scheduled for a future time (session list indicator). */
export function countFutureScheduledLocalMessages(
db: Database,
sessionId: string,
now: number
): number {
const row = db.prepare(`
SELECT COUNT(*) AS count
FROM messages
WHERE session_id = ?
AND invoked_at IS NULL
AND local_id IS NOT NULL
AND scheduled_at IS NOT NULL
AND scheduled_at > ?
`).get(sessionId, now) as { count: number } | undefined
return row?.count ?? 0
}

/** Batch variant for GET /sessions — one query for all session IDs in a namespace. */
export function countFutureScheduledBySessionIds(
db: Database,
sessionIds: string[],
now: number
): Map<string, number> {
const counts = new Map<string, number>()
if (sessionIds.length === 0) {
return counts
}

const placeholders = sessionIds.map(() => '?').join(',')
const rows = db.prepare(`
SELECT session_id, COUNT(*) AS count
FROM messages
WHERE session_id IN (${placeholders})
AND invoked_at IS NULL
AND local_id IS NOT NULL
AND scheduled_at IS NOT NULL
AND scheduled_at > ?
Comment thread
heavygee marked this conversation as resolved.
GROUP BY session_id
`).all(...sessionIds, now) as { session_id: string; count: number }[]

for (const row of rows) {
counts.set(row.session_id, row.count)
}
return counts
}

export function getMaxSeq(db: Database, sessionId: string): number {
const row = db.prepare(
'SELECT COALESCE(MAX(seq), 0) AS maxSeq FROM messages WHERE session_id = ?'
Expand Down
53 changes: 53 additions & 0 deletions hub/src/sync/messageService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,59 @@ describe('MessageService.releaseMatureScheduledMessages', () => {
expect(cliEmitted).toHaveLength(1)
})

it('emits scheduled-matured once per session for web session-list refresh', async () => {
const store = makeStore()
const session = makeSession(store, 'release-sse')
const publisher = makePublisher()
const { io } = makeTrackingIo()

const now = Date.now()
const past = now - 1000
store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'one' } }, 'local-a', past)
store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'two' } }, 'local-b', past)

const service = new MessageService(store, io, publisher as any)
service.releaseMatureScheduledMessages(now)

const matured = publisher.events.filter((event) => event.type === 'scheduled-matured')
expect(matured).toEqual([{ type: 'scheduled-matured', sessionId: session.id }])
})

it('does NOT re-emit scheduled-matured on later ticks while CLI ack is pending', async () => {
const store = makeStore()
const session = makeSession(store, 'release-sse-no-repeat')
const publisher = makePublisher()
const { io } = makeTrackingIo()

const now = Date.now()
const past = now - 1000
store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'hi' } }, 'local-repeat', past)

const service = new MessageService(store, io, publisher as any)
service.releaseMatureScheduledMessages(now)
service.releaseMatureScheduledMessages(now + 60_000)

const matured = publisher.events.filter((event) => event.type === 'scheduled-matured')
expect(matured).toHaveLength(1)
})

it('emits scheduled-matured when first scan is long after scheduled_at', async () => {
const store = makeStore()
const session = makeSession(store, 'release-sse-late-scan')
const publisher = makePublisher()
const { io } = makeTrackingIo()

const now = Date.now()
const past = now - 60_000
store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'hi' } }, 'local-late', past)

const service = new MessageService(store, io, publisher as any)
service.releaseMatureScheduledMessages(now)

const matured = publisher.events.filter((event) => event.type === 'scheduled-matured')
expect(matured).toEqual([{ type: 'scheduled-matured', sessionId: session.id }])
})

it('does NOT call markMessagesInvoked (pitfall #2 guard): message is re-emitted on next tick', async () => {
const store = makeStore()
const session = makeSession(store, 'release-no-mark')
Expand Down
24 changes: 24 additions & 0 deletions hub/src/sync/messageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ function toVisibleDecryptedMessages(messages: StoredMessageForDelivery[]): Decry
}

export class MessageService {
/** One scheduled-matured SSE per localId per hub process (cleared on cancel/consume paths here). */
private readonly scheduledMatureNotifiedLocalIds = new Set<string>()

constructor(
private readonly store: Store,
private readonly io: Server,
Expand All @@ -36,6 +39,12 @@ export class MessageService {
) {
}

private forgetScheduledMatureNotified(localIds: Iterable<string>): void {
for (const localId of localIds) {
this.scheduledMatureNotifiedLocalIds.delete(localId)
}
}

getMessages(sessionId: string, limit: number = 200): DecryptedMessage[] {
const stored = this.store.messages.getMessages(sessionId, limit)
return toVisibleDecryptedMessages(stored)
Expand Down Expand Up @@ -196,6 +205,7 @@ export class MessageService {
const now = Date.now()
if (scheduledAt !== null && scheduledAt > now) {
this.store.messages.deleteQueuedMessageById(sessionId, resolvedId)
this.forgetScheduledMatureNotified([localId])
this.publisher.emit({
type: 'message-cancelled',
sessionId,
Expand Down Expand Up @@ -224,6 +234,7 @@ export class MessageService {
const recheck = this.store.messages.lookupQueuedMessage(sessionId, resolvedId)
if (recheck.status === 'invoked') {
// CLI beat us — treat identically to Race-B (ack returned not-found).
this.forgetScheduledMatureNotified([localId])
this.publisher.emit({
type: 'messages-consumed',
sessionId,
Expand All @@ -233,6 +244,7 @@ export class MessageService {
return recheck
}
// Row is gone (absent) — clean cancel.
this.forgetScheduledMatureNotified([localId])
this.publisher.emit({
type: 'message-cancelled',
sessionId,
Expand All @@ -257,6 +269,7 @@ export class MessageService {
// DB write failed — let the HTTP 500 surface to the caller.
throw err
}
this.forgetScheduledMatureNotified([localId])
// Notify all SSE subscribers (other open tabs) that this queued row is now
// invoked so they remove it from the floating bar. Without this emit, only
// the tab that sent the DELETE request learns about the status change via the
Expand All @@ -282,6 +295,7 @@ export class MessageService {

// Phase 3: CLI confirmed removal. Now DELETE the DB row and broadcast SSE.
this.store.messages.deleteQueuedMessageById(sessionId, resolvedId)
this.forgetScheduledMatureNotified([localId])
this.publisher.emit({
type: 'message-cancelled',
sessionId,
Expand Down Expand Up @@ -455,6 +469,7 @@ export class MessageService {
.filter((id): id is string => typeof id === 'string')
if (localIds.length === 0) return null
this.store.messages.markMessagesInvoked(sessionId, localIds, invokedAt)
this.forgetScheduledMatureNotified(localIds)
this.publisher.emit({ type: 'messages-consumed', sessionId, localIds, invokedAt })
return { localIds, invokedAt }
}
Expand All @@ -476,7 +491,13 @@ export class MessageService {
* expected behaviour. */
releaseMatureScheduledMessages(now: number): void {
const mature = this.store.messages.getMatureScheduledMessages(now)
const maturedSessionIds = new Set<string>()
for (const msg of mature) {
const localId = msg.localId
if (typeof localId === 'string' && !this.scheduledMatureNotifiedLocalIds.has(localId)) {
this.scheduledMatureNotifiedLocalIds.add(localId)
maturedSessionIds.add(msg.sessionId)
}
const update = {
id: msg.id,
seq: msg.seq,
Expand All @@ -497,5 +518,8 @@ export class MessageService {
// NOTE: do NOT call markMessagesInvoked here (pitfall #2).
// CLI ack (messages-consumed) will handle invoked_at stamping.
}
for (const sessionId of maturedSessionIds) {
this.publisher.emit({ type: 'scheduled-matured', sessionId })
}
}
}
4 changes: 4 additions & 0 deletions hub/src/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ export class SyncEngine {
return this.sessionCache.getSessionsByNamespace(namespace)
}

getFutureScheduledMessageCounts(sessionIds: string[], now: number = Date.now()): Map<string, number> {
return this.store.messages.countFutureScheduledBySessionIds(sessionIds, now)
}

getSession(sessionId: string): Session | undefined {
return this.sessionCache.getSession(sessionId) ?? this.sessionCache.refreshSession(sessionId) ?? undefined
}
Expand Down
11 changes: 9 additions & 2 deletions hub/src/web/routes/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export function createSessionsRoutes(getSyncEngine: () => SyncEngine | null): Ho
const getPendingCount = (s: Session) => s.agentState?.requests ? Object.keys(s.agentState.requests).length : 0

const namespace = c.get('namespace')
const sessions = engine.getSessionsByNamespace(namespace)
const sessionRecords = engine.getSessionsByNamespace(namespace)
.sort((a, b) => {
// Active sessions first
if (a.active !== b.active) {
Expand All @@ -79,7 +79,14 @@ export function createSessionsRoutes(getSyncEngine: () => SyncEngine | null): Ho
// Then by updatedAt
return b.updatedAt - a.updatedAt
})
.map(toSessionSummary)
const scheduledCounts = engine.getFutureScheduledMessageCounts(sessionRecords.map((session) => session.id))
const sessions = sessionRecords.map((session) => {
const summary = toSessionSummary(session)
return {
...summary,
futureScheduledMessageCount: scheduledCounts.get(session.id) ?? 0
}
})

return c.json({ sessions })
})
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
"typecheck:cli": "cd cli && bun run typecheck",
"typecheck:hub": "cd hub && bun run typecheck",
"typecheck:web": "cd web && bun run typecheck",
"test": "bun run test:cli && bun run test:hub && bun run test:web",
"test": "bun run test:cli && bun run test:hub && bun run test:web && bun run test:shared",
"test:cli": "cd cli && bun run test",
"test:hub": "cd hub && bun run test",
"test:web": "cd web && bun run test",
"test:shared": "cd shared && bun run test",
"clean-session": "bun run hub/scripts/cleanup-sessions.ts",
"release-all": "cd cli && bun run release-all"
},
Expand Down
3 changes: 3 additions & 0 deletions shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
"./voice": "./src/voice.ts"
},
"sideEffects": false,
"scripts": {
"test": "bun test"
},
"dependencies": {
"zod": "^4.2.1"
}
Expand Down
3 changes: 3 additions & 0 deletions shared/src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ export const SyncEventSchema = z.discriminatedUnion('type', [
SessionChangedSchema.extend({
type: z.literal('messages-invalidated')
}),
SessionChangedSchema.extend({
type: z.literal('scheduled-matured')
}),
SessionChangedSchema.extend({
type: z.literal('session-ended'),
reason: SessionEndReasonSchema.optional()
Expand Down
Loading
Loading