From 31e58c49171f0e37557682e77e4ac0830a6ea268 Mon Sep 17 00:00:00 2001 From: chitcommit <208086304+chitcommit@users.noreply.github.com> Date: Thu, 4 Jun 2026 03:32:21 +0000 Subject: [PATCH] =?UTF-8?q?feat(daemon):=20loop=20body=20wires=20executeIn?= =?UTF-8?q?tent=20end-to-end=20(claim=20=E2=86=92=20dispatch=20=E2=86=92?= =?UTF-8?q?=20heartbeat)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacked on #106. Replaces the injected-executor abstraction in daemon/loop.ts with a direct call to meta/intent.ts::executeIntent, closing the meta-orchestrator loop. Status transitions, audit-row writes, and the second sovereignty gate are all owned by executeIntent → dispatch; the loop's responsibility is leader lifecycle, intent claiming, heartbeats, and outcome classification. Four outcomes are handled distinctly: - ok=true (executed) → bump processed counter, reset error backoff - ok=true (replayed) → bump replayed counter, no backoff, no double-count - ok=false (refused) → bump refused counter, no backoff (steady-state) - ok=false (exec error) → bump errored counter, bounded exp backoff Sovereignty refusals are identified by canonical error prefixes emitted by meta/executors/dispatch.ts ("sovereignty re-reckon:" / "sovereignty snapshot stale ..."). Refusals are NOT treated as transient faults — they are valid outcomes and do not trigger backoff. The loop honors options.signal via AbortController throughout, including inside the sleep helper, so SIGTERM from daemon/runtime/entrypoint.ts (PR #105) unwinds cleanly through releaseLeadership. tests/daemon/loop.spec.ts — real Neon integration. Seeds two pending intents against the update_obligation_status executor, runs runLeaderLoop with maxIntents=2, asserts: - both intents reach status='done' - each produces exactly one cc_actions_log row (attempt=1, key set, status='completed') - cc_obligations rows actually moved to 'deferred' - cc_node_leases shows leadership released on clean exit - log stream contains intent_heartbeat_before / intent_heartbeat_after pairs Out of scope (not in this PR): - new executors (mercury, etc.) - production deploy - multi-node coordination beyond single-node leader - schema additions on cc_intents / cc_actions_log Co-Authored-By: Claude Opus 4.7 (1M context) --- daemon/loop.ts | 296 +++++++++++++++++++++++++------------- tests/daemon/loop.spec.ts | 208 +++++++++++++++++++++++++++ 2 files changed, 401 insertions(+), 103 deletions(-) create mode 100644 tests/daemon/loop.spec.ts diff --git a/daemon/loop.ts b/daemon/loop.ts index 84c6de2..7015117 100644 --- a/daemon/loop.ts +++ b/daemon/loop.ts @@ -1,19 +1,31 @@ /** - * Cluster daemon — persistent leader loop skeleton. + * Cluster daemon — persistent leader loop. * * Lifecycle: * 1. Try to claim leadership (claimLeadership). * 2. On success: enter inner loop. * a. Heartbeat the lease every `heartbeatMs`. - * b. Claim the next pending Intent (claimNextIntent) and dispatch it - * through the supplied executor. - * c. Mark intent done/failed. + * b. Reclaim stuck intents (idempotent). + * c. Claim the next pending Intent (claimNextIntent). + * d. Drive it through `executeIntent`, which dispatches via the executor + * registry, applies the second sovereignty gate, and atomically updates + * `cc_intents.status` + writes `cc_actions_log`. * 3. On failure / lost lease: park `parkMs`, then retry. - * 4. On AbortSignal: heartbeat is interrupted, lease is released, loop exits. + * 4. On AbortSignal: lease released, loop exits. * - * The executor is injected so this PR introduces no coupling to the existing - * ActionAgent — the wiring to ActionAgent comes in a follow-up PR per - * ADR-001's out-of-scope list. + * The loop classifies four outcomes from `executeIntent`'s `ExecutorResult`: + * + * | Outcome | ok | replayed | Action | + * |---------------------------------|-------|----------|-----------------------------------| + * | Fresh successful execution | true | false | Heartbeat, count, continue | + * | Replayed (terminal audit exists)| true | true | Log, continue, no count, no retry | + * | Sovereignty refusal | false | false | Log refusal, continue, no backoff | + * | Executor error | false | false | Log error, bounded exp backoff | + * + * Refusals are distinguished from generic executor errors by inspecting + * `result.error` for the canonical refusal prefixes emitted by + * `meta/executors/dispatch.ts` (`sovereignty re-reckon:` and + * `sovereignty snapshot stale ...`). * * @canonical-uri chittycanon://docs/architecture/chittycommand/ADR-001 */ @@ -27,24 +39,14 @@ import { } from './leader'; import { claimNextIntent, - completeIntent, - failIntent, - markIntentDispatched, + executeIntent, reclaimStuckIntents, type Intent, type IntentEnv, } from '../meta/intent'; +import type { ExecutorResult } from '../meta/executors/types'; -export type LoopEnv = LeaderEnv & IntentEnv; - -export interface IntentExecutor { - /** - * Execute one claimed Intent. Implementations should be idempotent and - * return a `dispatchedTaskId` (e.g. an ActionAgent task ID) so the loop - * can persist it. Throwing causes the loop to mark the intent failed. - */ - (intent: Intent): Promise<{ dispatchedTaskId: string }>; -} +export type LoopEnv = LeaderEnv & IntentEnv & Record; export interface RunLeaderLoopOptions { /** ChittyID of the running node (Location type — L). */ @@ -59,21 +61,37 @@ export interface RunLeaderLoopOptions { heartbeatMs?: number; /** Park-and-retry interval ms when not leader. Default 5000. */ parkMs?: number; + /** Initial backoff ms for executor errors. Default 1000. */ + errorBackoffMs?: number; + /** Max backoff ms cap for executor errors. Default 30000. */ + errorBackoffMaxMs?: number; /** Optional cap on intent iterations — useful for tests. */ maxIntents?: number; + /** + * Optional cap on loop iterations regardless of intents processed — useful + * for tests when the queue might drain to empty before reaching maxIntents. + */ + maxIterations?: number; /** AbortSignal to terminate the loop cleanly. */ signal?: AbortSignal; /** Role to claim. Defaults to META_LEADER_ROLE. */ role?: string; /** Optional log sink. */ log?: (msg: string, meta?: Record) => void; - /** Executor for claimed intents. */ - executor: IntentExecutor; + /** + * Optional override for actorChittyId passed to executeIntent. When omitted, + * the dispatcher reads it from `intent.metadata.actorChittyId` / + * `intent.metadata.ownerChittyId` if needed. + */ + actorChittyId?: string; } export interface RunLeaderLoopResult { intentsProcessed: number; - reason: 'aborted' | 'maxIntents' | 'leaseLost' | 'error'; + intentsReplayed: number; + intentsRefused: number; + intentsErrored: number; + reason: 'aborted' | 'maxIntents' | 'maxIterations' | 'leaseLost' | 'error'; error?: string; } @@ -90,7 +108,12 @@ export async function runLeaderLoop( const parkMs = options.parkMs ?? 5_000; const signal = options.signal; - let intentsProcessed = 0; + const counters = { + intentsProcessed: 0, + intentsReplayed: 0, + intentsRefused: 0, + intentsErrored: 0, + }; while (!signal?.aborted) { // 1. Acquire leadership. @@ -116,26 +139,35 @@ export async function runLeaderLoop( continue; } - log('leader_acquired', { role, nodeId: options.nodeId, expiresAt: lease.leaseExpiresAt }); + log('leader_acquired', { + role, + nodeId: options.nodeId, + expiresAt: lease.leaseExpiresAt, + }); // 2. Inner loop: heartbeat + drain intents until lease lost or aborted. - const innerResult = await innerLoop(env, options, role, leaseSeconds, heartbeatMs, intentsProcessed); - intentsProcessed = innerResult.intentsProcessed; + const innerResult = await innerLoop(env, options, role, leaseSeconds, heartbeatMs, counters); - if (innerResult.reason === 'aborted' || innerResult.reason === 'maxIntents') { + if ( + innerResult.reason === 'aborted' || + innerResult.reason === 'maxIntents' || + innerResult.reason === 'maxIterations' + ) { // Best-effort release on clean exit — pass sessionId so the release - // refuses to clear a newer leader's lease (fixes codex-p2 PR#101 finding-2). + // refuses to clear a newer leader's lease. try { - await releaseLeadership(env, options.nodeId, { role, sessionId: options.sessionId }); + await releaseLeadership(env, options.nodeId, { + role, + sessionId: options.sessionId, + }); } catch (err) { log('release_error', { error: err instanceof Error ? err.message : String(err) }); } - return { intentsProcessed, reason: innerResult.reason }; + return { ...counters, reason: innerResult.reason }; } if (innerResult.reason === 'error') { log('inner_loop_error', { error: innerResult.error }); - // Park briefly, then attempt to reclaim. await sleep(parkMs, signal); continue; } @@ -145,7 +177,14 @@ export async function runLeaderLoop( await sleep(parkMs, signal); } - return { intentsProcessed, reason: 'aborted' }; + return { ...counters, reason: 'aborted' }; +} + +interface Counters { + intentsProcessed: number; + intentsReplayed: number; + intentsRefused: number; + intentsErrored: number; } async function innerLoop( @@ -154,22 +193,28 @@ async function innerLoop( role: string, leaseSeconds: number, heartbeatMs: number, - startCount: number, -): Promise<{ intentsProcessed: number; reason: 'aborted' | 'maxIntents' | 'leaseLost' | 'error'; error?: string }> { + counters: Counters, +): Promise<{ reason: 'aborted' | 'maxIntents' | 'maxIterations' | 'leaseLost' | 'error'; error?: string }> { const log = options.log ?? noopLog; const signal = options.signal; - let intentsProcessed = startCount; + const errorBackoffStartMs = options.errorBackoffMs ?? 1_000; + const errorBackoffMaxMs = options.errorBackoffMaxMs ?? 30_000; + let currentErrorBackoffMs = errorBackoffStartMs; let lastHeartbeat = Date.now(); + let iterations = 0; - // Heartbeat cadence inside executor.execute() — half the lease so a slow - // executor can't let the lease lapse mid-flight. - // fixes codex-p2 PR#101 finding-3 + // Heartbeat cadence inside executeIntent() — half the lease so a slow + // dispatch can't let the lease lapse mid-flight. const innerHeartbeatMs = Math.max(1_000, Math.floor((leaseSeconds * 1000) / 2)); while (!signal?.aborted) { + iterations += 1; + if (options.maxIterations && iterations > options.maxIterations) { + return { reason: 'maxIterations' }; + } + // Heartbeat if due. Session-scoped so a restarted process can't extend // a lease that already belongs to a newer leader. - // fixes codex-p2 PR#101 finding-5 if (Date.now() - lastHeartbeat >= heartbeatMs) { try { const renewed = await heartbeat(env, options.nodeId, { @@ -178,23 +223,19 @@ async function innerLoop( sessionId: options.sessionId, }); if (!renewed) { - return { intentsProcessed, reason: 'leaseLost' }; + return { reason: 'leaseLost' }; } lastHeartbeat = Date.now(); log('heartbeat_ok', { expiresAt: renewed.leaseExpiresAt }); } catch (err) { return { - intentsProcessed, reason: 'error', error: err instanceof Error ? err.message : String(err), }; } } - // Reclaim intents stuck in running/claimed past 2x the lease window before - // we ask for new work. Idempotent and cheap; if nothing is stuck this is - // a single UPDATE returning 0 rows. - // fixes codex-p2 PR#101 finding-1 + // Reclaim intents stuck past 2x the lease window before claiming new work. try { const reclaimed = await reclaimStuckIntents(env, leaseSeconds * 2); if (reclaimed > 0) log('intents_reclaimed', { count: reclaimed }); @@ -202,13 +243,13 @@ async function innerLoop( log('reclaim_error', { error: err instanceof Error ? err.message : String(err) }); } - // Claim and dispatch one intent. + // Claim one intent. The intent moves pending -> claimed atomically; + // executeIntent picks it up from there. let intent: Intent | null; try { intent = await claimNextIntent(env); } catch (err) { return { - intentsProcessed, reason: 'error', error: err instanceof Error ? err.message : String(err), }; @@ -221,13 +262,19 @@ async function innerLoop( continue; } - log('intent_claimed', { intentId: intent.id, intentType: intent.intentType }); + log('intent_claimed', { + intentId: intent.id, + intentType: intent.intentType, + }); + // Pre-execution heartbeat marker. The DB heartbeat is driven by + // `lastHeartbeat`; this log line marks the boundary for the operational + // record. + log('intent_heartbeat_before', { intentId: intent.id }); - // Background heartbeat ticker covering the executor.execute() span. - // Uses the current session token so the heartbeat is rejected if a newer - // leader has taken over. - // fixes codex-p2 PR#101 finding-3, finding-5 - const executorHeartbeat = setInterval(() => { + // Background heartbeat ticker covering the dispatch() span. Uses the + // current session token so the heartbeat is rejected if a newer leader + // has taken over. + const dispatchHeartbeat = setInterval(() => { heartbeat(env, options.nodeId, { role, leaseSeconds, @@ -248,63 +295,106 @@ async function innerLoop( }); }, innerHeartbeatMs); - // fixes codex-p2 PR#103 P1-B — capture the dispatched_task_id from - // markIntentDispatched as an execution token. completeIntent / failIntent - // gate on it so a stale leader returning from executor() after a fresher - // leader has reclaimed + redispatched the intent cannot mark the fresher - // execution done / failed. The token is set on dispatch and cleared by - // reclaimStuckIntents, so it's monotonic-per-execution. - let dispatchedTaskId: string | null = null; + let result: ExecutorResult; try { - const result = await options.executor(intent); - const dispatched = await markIntentDispatched(env, intent.id, result.dispatchedTaskId); - if (!dispatched) { - // Status was no longer 'claimed' — another leader reclaimed and is - // (re)driving this intent. Do not touch completion. - log('intent_dispatch_lost', { - intentId: intent.id, - dispatchedTaskId: result.dispatchedTaskId, - }); - } else { - dispatchedTaskId = result.dispatchedTaskId; - const completed = await completeIntent(env, intent.id, dispatchedTaskId); - if (!completed) { - log('intent_completion_ignored_stale', { - intentId: intent.id, - dispatchedTaskId, - }); - } else { - intentsProcessed += 1; - log('intent_completed', { - intentId: intent.id, - dispatchedTaskId, - }); - } - } + result = await executeIntent(env, intent.id, { + actorChittyId: options.actorChittyId, + }); } catch (err) { + // executeIntent / dispatch threw unhandled (e.g., DB connectivity, no + // executor registered — a wiring bug). cc_intents was NOT necessarily + // flipped to terminal; leave it as-is so reclaimStuckIntents recovers. const msg = err instanceof Error ? err.message : String(err); - // If we already captured a dispatch token, gate failure on it so we - // can't fail a fresher leader's running execution. Otherwise we failed - // before dispatch (intent still 'claimed') and the legacy unguarded - // status-only WHERE applies. - const failed = dispatchedTaskId - ? await failIntent(env, intent.id, msg, dispatchedTaskId).catch(() => null) - : await failIntent(env, intent.id, msg).catch(() => null); - if (!failed) { - log('intent_failure_ignored_stale', { intentId: intent.id, error: msg }); - } else { - log('intent_failed', { intentId: intent.id, error: msg }); - } + counters.intentsErrored += 1; + log('intent_dispatch_threw', { intentId: intent.id, error: msg }); + clearInterval(dispatchHeartbeat); + log('intent_heartbeat_after', { intentId: intent.id, outcome: 'threw' }); + await sleep(currentErrorBackoffMs, signal); + currentErrorBackoffMs = Math.min(currentErrorBackoffMs * 2, errorBackoffMaxMs); + continue; } finally { - clearInterval(executorHeartbeat); + clearInterval(dispatchHeartbeat); + } + + // Classify the outcome. + if (result.ok && result.replayed) { + // Replay short-circuit: prior terminal audit row exists. Not an error, + // not new work — just continue. Do NOT bump processed counter; do NOT + // trigger error backoff. + counters.intentsReplayed += 1; + log('intent_replayed', { + intentId: intent.id, + idempotencyKey: result.idempotencyKey, + actionLogId: result.actionLogId, + }); + currentErrorBackoffMs = errorBackoffStartMs; + } else if (result.ok) { + // Fresh successful execution. + counters.intentsProcessed += 1; + log('intent_completed', { + intentId: intent.id, + idempotencyKey: result.idempotencyKey, + actionLogId: result.actionLogId, + }); + currentErrorBackoffMs = errorBackoffStartMs; + } else if (isSovereigntyRefusal(result.error)) { + // executeIntent already flipped cc_intents to 'failed' via dispatch's + // refusal path. Log + continue, no backoff (refusals are a valid + // steady-state outcome, not a transient fault). + counters.intentsRefused += 1; + log('intent_refused', { + intentId: intent.id, + reason: result.error, + actionLogId: result.actionLogId, + }); + currentErrorBackoffMs = errorBackoffStartMs; + } else { + // Executor error path (payload validation failure, downstream API + // failure, domain failure like "Obligation not found"). cc_intents was + // flipped to 'failed' inside executeIntent. Apply bounded exponential + // backoff so a stream of failing intents doesn't hot-loop the daemon. + counters.intentsErrored += 1; + log('intent_failed', { + intentId: intent.id, + error: result.error, + actionLogId: result.actionLogId, + }); + await sleep(currentErrorBackoffMs, signal); + currentErrorBackoffMs = Math.min(currentErrorBackoffMs * 2, errorBackoffMaxMs); } - if (options.maxIntents && intentsProcessed >= options.maxIntents) { - return { intentsProcessed, reason: 'maxIntents' }; + log('intent_heartbeat_after', { + intentId: intent.id, + ok: result.ok, + replayed: !!result.replayed, + }); + + const totalTerminal = + counters.intentsProcessed + + counters.intentsReplayed + + counters.intentsRefused + + counters.intentsErrored; + if (options.maxIntents && totalTerminal >= options.maxIntents) { + return { reason: 'maxIntents' }; } } - return { intentsProcessed, reason: 'aborted' }; + return { reason: 'aborted' }; +} + +/** + * Identify sovereignty refusals emitted by meta/executors/dispatch.ts. + * Canonical refusal strings: + * - "sovereignty snapshot stale and no actorChittyId available for re-reckon" + * - "sovereignty re-reckon: requires_human (...)" + * - "sovereignty re-reckon: blocked (...)" + */ +function isSovereigntyRefusal(error: string | undefined): boolean { + if (!error) return false; + return ( + error.startsWith('sovereignty re-reckon:') || + error.startsWith('sovereignty snapshot stale') + ); } function sleep(ms: number, signal?: AbortSignal): Promise { diff --git a/tests/daemon/loop.spec.ts b/tests/daemon/loop.spec.ts new file mode 100644 index 0000000..37286f1 --- /dev/null +++ b/tests/daemon/loop.spec.ts @@ -0,0 +1,208 @@ +/** + * Integration test for daemon/loop.ts wired end-to-end through executeIntent. + * + * Covers (per stacked PR on #106): + * - runLeaderLoop acquires leadership against real cc_node_leases + * - Two seeded pending intents (real Goals / Plans / Obligations) are claimed + * and dispatched via executeIntent → dispatch → update_obligation_status + * - Each intent produces exactly one cc_actions_log row with intent_id set, + * attempt=1, idempotency_key set, action_type='status_change', + * status='completed' + * - cc_obligations rows actually move to the target status + * - cc_intents move to status='done' + * - cc_node_leases shows the leader released its hold on clean exit + * + * Real Neon only. Skipped without DATABASE_URL — same pattern as + * tests/meta/intent-lifecycle.spec.ts and tests/meta/executor.spec.ts. + */ + +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { neon } from '@neondatabase/serverless'; +import { runLeaderLoop } from '../../daemon/loop'; +import { + createGoal, + createPlan, + createIntent, + getIntent, + type IntentEnv, + type SovereigntyAssessmentSnapshot, +} from '../../meta/intent'; +import { META_LEADER_ROLE } from '../../daemon/leader'; +// Importing the executor barrel ensures registration side effects run. +import '../../meta/executors'; +import { UPDATE_OBLIGATION_STATUS_INTENT } from '../../meta/executors/update-obligation-status'; + +const DATABASE_URL = process.env.DATABASE_URL; +const SKIP = !DATABASE_URL || process.env.SKIP_INTEGRATION === '1'; + +const env: IntentEnv & Record = { DATABASE_URL }; +const OWNER = '01-A-NB-0001-P-66-1-1'; +// Use a Location-typed ChittyID for the node, per canon (L = Location). +const NODE_ID = '01-A-NB-T01-L-66-1-1'; +const TEST_TAG = `loop-test-${Date.now()}-${Math.floor(Math.random() * 1e6)}`; + +const obligationIds: string[] = []; + +async function cleanup() { + if (!DATABASE_URL) return; + const sql = neon(DATABASE_URL); + await sql`DELETE FROM cc_goals WHERE owner_chitty_id = ${OWNER} AND title LIKE ${TEST_TAG + '%'}`; + for (const id of obligationIds) { + await sql`DELETE FROM cc_actions_log WHERE target_id = ${id}::uuid`; + await sql`DELETE FROM cc_obligations WHERE id = ${id}::uuid`; + } + // Release any lease this test left behind so re-runs are clean. + await sql` + UPDATE cc_node_leases + SET node_id = NULL, node_descriptor = NULL, session_id = NULL, + lease_expires_at = NULL, heartbeat_at = NULL + WHERE node_id = ${NODE_ID}`; +} + +describe.skipIf(SKIP)('daemon/loop — end-to-end through executeIntent (real Neon)', () => { + beforeAll(async () => { + await cleanup(); + const sql = neon(DATABASE_URL!); + // Insert two real cc_obligations rows for the executor to update. + for (let i = 0; i < 2; i++) { + const rows = await sql` + INSERT INTO cc_obligations (payee, category, due_date, status, metadata) + VALUES (${TEST_TAG + '-payee-' + i}, 'utilities', CURRENT_DATE + 7, 'pending', '{}'::jsonb) + RETURNING id`; + obligationIds.push(String(rows[0].id)); + } + }); + + afterAll(async () => { + await cleanup(); + }); + + it('drains two pending intents, writes audit rows, heartbeats, releases on clean exit', async () => { + expect(obligationIds).toHaveLength(2); + + const goal = await createGoal(env, { + ownerChittyId: OWNER, + title: `${TEST_TAG}-goal`, + }); + const plan = await createPlan(env, { + goalId: goal.id, + title: `${TEST_TAG}-plan`, + }); + + // Pre-seed fresh sovereignty so dispatch() takes the autonomous path + // without calling out to trust.chitty.cc. + const sovereignty: SovereigntyAssessmentSnapshot = { + decision: 'autonomous', + trustScore: 0.95, + reasoning: 'pre-seeded for daemon loop integration test', + assessedAt: new Date().toISOString(), + }; + + const intentIds: string[] = []; + for (let i = 0; i < 2; i++) { + const intent = await createIntent(env, { + planId: plan.id, + goalId: goal.id, + intentType: UPDATE_OBLIGATION_STATUS_INTENT, + payload: { + obligation_id: obligationIds[i], + status: 'deferred', + notes: `${TEST_TAG}-intent-${i}`, + }, + sovereigntyAssessment: sovereignty, + metadata: { actorChittyId: OWNER }, + priority: i, // lower number first + }); + expect(intent.status).toBe('pending'); + intentIds.push(intent.id); + } + + // Drive the loop until both intents reach terminal status or we hit the + // bounded iteration cap. maxIntents=2 lets the loop exit cleanly after + // both are processed; maxIterations provides a hard safety net. + const logLines: Array<{ msg: string; meta?: Record }> = []; + const controller = new AbortController(); + const sessionId = `${process.pid}@${Date.now()}-test`; + + const result = await runLeaderLoop(env, { + nodeId: NODE_ID, + nodeDescriptor: 'integration-test', + sessionId, + leaseSeconds: 30, + heartbeatMs: 1_000, + parkMs: 250, + maxIntents: 2, + maxIterations: 50, + signal: controller.signal, + actorChittyId: OWNER, + log: (msg, meta) => logLines.push({ msg, meta }), + }); + + expect(result.reason).toBe('maxIntents'); + expect(result.intentsProcessed).toBe(2); + expect(result.intentsErrored).toBe(0); + expect(result.intentsRefused).toBe(0); + + // Both intents should be terminal=done. + for (const id of intentIds) { + const after = await getIntent(env, id); + expect(after?.status).toBe('done'); + } + + // Each intent produced exactly one cc_actions_log row. + const sql = neon(DATABASE_URL!); + for (const id of intentIds) { + const rows = (await sql` + SELECT id, intent_id, attempt, idempotency_key, status, action_type + FROM cc_actions_log + WHERE intent_id = ${id}::uuid + `) as unknown as Array<{ + id: string; + intent_id: string; + attempt: number; + idempotency_key: string; + status: string; + action_type: string; + }>; + expect(rows.length).toBe(1); + expect(rows[0].attempt).toBe(1); + expect(rows[0].idempotency_key).toMatch(/^[0-9a-f]{64}$/); + expect(rows[0].status).toBe('completed'); + expect(rows[0].action_type).toBe('status_change'); + } + + // Obligations actually moved to 'deferred'. + for (const id of obligationIds) { + const rows = (await sql` + SELECT status FROM cc_obligations WHERE id = ${id}::uuid + `) as unknown as Array<{ status: string }>; + expect(rows[0].status).toBe('deferred'); + } + + // cc_node_leases shows leadership was released on clean exit (maxIntents + // path -> releaseLeadership). The row persists with node_id=NULL. + const leaseRows = (await sql` + SELECT node_id, session_id, heartbeat_at + FROM cc_node_leases WHERE role = ${META_LEADER_ROLE} + `) as unknown as Array<{ + node_id: string | null; + session_id: string | null; + heartbeat_at: string | null; + }>; + expect(leaseRows.length).toBeGreaterThan(0); + expect(leaseRows[0].node_id).toBeNull(); + expect(leaseRows[0].session_id).toBeNull(); + + // Log evidence of heartbeat activity bracketing each intent. + const heartbeatBefore = logLines.filter((l) => l.msg === 'intent_heartbeat_before').length; + const heartbeatAfter = logLines.filter((l) => l.msg === 'intent_heartbeat_after').length; + expect(heartbeatBefore).toBe(2); + expect(heartbeatAfter).toBe(2); + const leaderAcquired = logLines.filter((l) => l.msg === 'leader_acquired').length; + expect(leaderAcquired).toBeGreaterThanOrEqual(1); + + // Sanity: abort the controller — no-op for this run (already returned), + // but proves the wiring compiles. + controller.abort(); + }, 60_000); +});