diff --git a/agent-node/src/db.js b/agent-node/src/db.js index 189d06e..2bbdd21 100644 --- a/agent-node/src/db.js +++ b/agent-node/src/db.js @@ -42,15 +42,16 @@ const SCHEMA = ` id INTEGER PRIMARY KEY AUTOINCREMENT, stream_id TEXT NOT NULL, repository TEXT NOT NULL, - pr_number INTEGER NOT NULL, + pr_number INTEGER, + event_ref TEXT, chain_id INTEGER NOT NULL, chain_name TEXT NOT NULL, tx_hash TEXT, block_number INTEGER, gas_used TEXT, - voucher_expiry INTEGER, - created_at INTEGER NOT NULL DEFAULT (unixepoch()), - UNIQUE (stream_id, repository, pr_number) + voucher_expiry INTEGER, + extension_seconds INTEGER, + created_at INTEGER NOT NULL DEFAULT (unixepoch()) ); CREATE TABLE IF NOT EXISTS stream_registry ( @@ -96,6 +97,21 @@ const SCHEMA = ` account TEXT, created_at INTEGER NOT NULL DEFAULT (unixepoch()) ); + + -- Verified work earned but not yet applied on-chain. Each verified deliverable + -- is banked here first, then drawn down as the stream's runway + weekly cap + -- allow. Overflow (e.g. PRs beyond the weekly cap) carries into later weeks + -- instead of being lost. + CREATE TABLE IF NOT EXISTS banked_work ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + stream_id TEXT NOT NULL, + source TEXT, + repository TEXT, + event_ref TEXT NOT NULL, + extension_seconds INTEGER NOT NULL, + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + UNIQUE (stream_id, event_ref) + ); `; /** @@ -169,6 +185,46 @@ export async function initDb() { try { await db.execute(sql); } catch { /* column already exists */ } } + // ── processed_extensions rebuild ─────────────────────────────────────────── + // Older DBs created pr_number as INTEGER NOT NULL with UNIQUE(stream_id, + // repository, pr_number). Webhook extensions pass prNumber=null, so the row + // violated NOT NULL and was silently dropped by INSERT OR IGNORE — extensions + // never reached the activity feed. Rebuild with pr_number nullable; event_ref + // is the real dedup key. Atomic via batch — rolls back if anything fails. + try { + const info = await db.execute('PRAGMA table_info(processed_extensions)'); + const prCol = info.rows.find(r => r.name === 'pr_number'); + if (prCol && Number(prCol.notnull) === 1) { + console.log('[db] Rebuilding processed_extensions (pr_number → nullable, event_ref key)…'); + await db.batch([ + `CREATE TABLE processed_extensions_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + stream_id TEXT NOT NULL, repository TEXT NOT NULL, + pr_number INTEGER, event_ref TEXT, + chain_id INTEGER NOT NULL, chain_name TEXT NOT NULL, + tx_hash TEXT, block_number INTEGER, gas_used TEXT, + voucher_expiry INTEGER, extension_seconds INTEGER, + created_at INTEGER NOT NULL DEFAULT (unixepoch()) + )`, + `INSERT INTO processed_extensions_new + (id, stream_id, repository, pr_number, event_ref, chain_id, chain_name, + tx_hash, block_number, gas_used, voucher_expiry, extension_seconds, created_at) + SELECT id, stream_id, repository, pr_number, event_ref, chain_id, chain_name, + tx_hash, block_number, gas_used, voucher_expiry, extension_seconds, created_at + FROM processed_extensions`, + 'DROP TABLE processed_extensions', + 'ALTER TABLE processed_extensions_new RENAME TO processed_extensions', + ], 'write'); + console.log('[db] ✓ processed_extensions rebuilt'); + } + } catch (err) { + console.warn('[db] processed_extensions rebuild skipped:', err.message); + } + // Dedup on the real key. SQLite allows multiple NULL event_refs (legacy rows). + try { + await db.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_proc_ext_event ON processed_extensions (stream_id, repository, event_ref)'); + } catch { /* duplicate legacy data — replay guard still dedups */ } + console.log('[db] ✓ Schema initialized'); } @@ -253,6 +309,62 @@ export async function getWeeklyExtendedSeconds(streamId, weekStartTime) { return Number(result.rows[0]?.total ?? 0); } +// ─── Banked Work ────────────────────────────────────────────────────────────── +// Verified deliverables earned but not yet applied on-chain. The agent drains +// these as the stream's runway and weekly cap allow, so bursty work (or work +// beyond the weekly cap) is carried forward rather than lost. + +/** Queue a verified deliverable. Idempotent on (stream_id, event_ref). */ +export async function bankWork({ streamId, source, repository, eventRef, extensionSeconds }) { + const db = getDb(); + if (!db) return; + await db.execute({ + sql: `INSERT OR IGNORE INTO banked_work (stream_id, source, repository, event_ref, extension_seconds) + VALUES (?, ?, ?, ?, ?)`, + args: [streamId, source ?? null, repository ?? null, String(eventRef), Number(extensionSeconds)], + }); +} + +/** Has this event already been banked (still waiting to apply)? */ +export async function isWorkBanked(streamId, eventRef) { + const db = getDb(); + if (!db) return false; + const r = await db.execute({ + sql: 'SELECT 1 FROM banked_work WHERE stream_id = ? AND event_ref = ? LIMIT 1', + args: [streamId, String(eventRef)], + }); + return r.rows.length > 0; +} + +/** Banked entries for a stream, oldest first (FIFO drain order). */ +export async function getBankedWork(streamId) { + const db = getDb(); + if (!db) return []; + const r = await db.execute({ + sql: 'SELECT * FROM banked_work WHERE stream_id = ? ORDER BY created_at ASC, id ASC', + args: [streamId], + }); + return r.rows; +} + +/** Remove a banked entry once it has been applied on-chain. */ +export async function deleteBankedWork(id) { + const db = getDb(); + if (!db) return; + await db.execute({ sql: 'DELETE FROM banked_work WHERE id = ?', args: [id] }); +} + +/** Registry rows for every stream that has at least one banked entry waiting. */ +export async function getStreamsWithBankedWork() { + const db = getDb(); + if (!db) return []; + const r = await db.execute( + `SELECT s.* FROM stream_registry s + WHERE s.stream_id IN (SELECT DISTINCT stream_id FROM banked_work)`, + ); + return r.rows; +} + // ─── Stream Repo Lookup ─────────────────────────────────────────────────────── /** diff --git a/agent-node/src/server.js b/agent-node/src/server.js index d14194b..6423855 100644 --- a/agent-node/src/server.js +++ b/agent-node/src/server.js @@ -15,10 +15,10 @@ import helmet from 'helmet'; import rateLimit from 'express-rate-limit'; import { verifyMilestone, VerificationError } from './verifyMilestone.js'; -import { verifyGitHubWebhook, verifyJiraWebhook, verifyBitbucketWebhook, verifyFigmaWebhook, extendFromEvent, checkStream } from './verificationEngine.js'; +import { verifyGitHubWebhook, verifyJiraWebhook, verifyBitbucketWebhook, verifyFigmaWebhook, extendFromEvent, checkStream, drainAllBankedWork } from './verificationEngine.js'; import { signExtensionVoucher, getSignerAddress } from './agentSigner.js'; import { submitExtension, getAllBalances, readStreamBatch } from './chainSubmitter.js'; -import { initDb, isAlreadyProcessed, recordExtension, getExtensionCount, registerStream, getStream, getStreamsByRepo, getStreamsBySource, getStreamsForAddress, getDb, upsertProfile, getProfile, getProfileByUsername, getProfileByApiKey, searchProfiles, isUsernameTaken, addToWaitlist, getWaitlistCount, saveOAuthTokens, disconnectOAuth, saveRepoInstallation, removeRepoInstallation, saveJiraWebhookIds, getProfileByJiraWebhookId, getInstallationIdForRepo } from './db.js'; +import { initDb, isAlreadyProcessed, recordExtension, getExtensionCount, registerStream, getStream, getStreamsByRepo, getStreamsBySource, getStreamsForAddress, getDb, upsertProfile, getProfile, getProfileByUsername, getProfileByApiKey, searchProfiles, isUsernameTaken, addToWaitlist, getWaitlistCount, saveOAuthTokens, disconnectOAuth, saveRepoInstallation, removeRepoInstallation, saveJiraWebhookIds, getProfileByJiraWebhookId, getInstallationIdForRepo, getBankedWork } from './db.js'; import { publicProfile } from './encryption.js'; import publicApiRouter from './publicApi.js'; import { startStreamListeners } from './streamListener.js'; @@ -1527,15 +1527,18 @@ app.get('/api/v1/stream-status/:streamId', async (req, res) => { try { const db = getDb(); - const [stream, extResult] = await Promise.all([ + const [stream, extResult, banked] = await Promise.all([ getStream(streamId), db.execute({ sql: 'SELECT * FROM processed_extensions WHERE stream_id = ? ORDER BY created_at DESC', args: [streamId], }), + getBankedWork(streamId), ]); - return res.json({ streamId, stream, extensions: extResult.rows }); + // banked = verified deliverables earned but not yet applied on-chain (queued + // behind the runway / weekly cap). Returned newest-first to match extensions. + return res.json({ streamId, stream, extensions: extResult.rows, banked: [...banked].reverse() }); } catch (err) { console.error('[stream-status] Error:', err); return res.status(500).json({ error: 'Failed to fetch stream status' }); @@ -1877,4 +1880,12 @@ app.listen(PORT, async () => { } console.log('═══════════════════════════════════════════════════'); + + // Periodic banked-work drainer — applies earned work that couldn't be applied + // when it was verified (weekly cap hit, or stream still had runway) once a new + // week resets the cap or the runway frees up, even with no new webhook. + const DRAIN_INTERVAL_MS = parseInt(process.env.BANK_DRAIN_INTERVAL_MS ?? String(10 * 60 * 1000), 10); + setInterval(() => { + drainAllBankedWork().catch(err => console.error('[drain] sweep failed:', err.message)); + }, DRAIN_INTERVAL_MS).unref(); }); diff --git a/agent-node/src/verificationEngine.js b/agent-node/src/verificationEngine.js index 4e8ab3a..bdb4f1b 100644 --- a/agent-node/src/verificationEngine.js +++ b/agent-node/src/verificationEngine.js @@ -14,7 +14,7 @@ * Rule-based. No LLM. Deterministic. No scheduled polling — triggered by events. */ -import { getLastExtensionTime, isAlreadyProcessed, recordExtension, getProfile, getInstallationIdForRepo, getWeeklyExtendedSeconds } from './db.js'; +import { getLastExtensionTime, isAlreadyProcessed, recordExtension, getProfile, getInstallationIdForRepo, getWeeklyExtendedSeconds, bankWork, isWorkBanked, getBankedWork, deleteBankedWork, getStreamsWithBankedWork } from './db.js'; import { readStreamBatch, submitExtension } from './chainSubmitter.js'; import { signExtensionVoucher } from './agentSigner.js'; import { getInstallationToken } from './githubApp.js'; @@ -481,7 +481,7 @@ async function _checkStream({ streamId, chainId, source, target, sender, periodS * @param {string} sourceLabel - log prefix e.g. 'jira' | 'bitbucket' */ export async function extendFromEvent(dbRow, eventRef, sourceLabel) { - const { stream_id: streamId, chain_id: chainId, verification_target: target, period_seconds: periodSeconds, hours_per_week: hoursPerWeek } = dbRow; + const { stream_id: streamId, verification_target: target, period_seconds: periodSeconds, hours_per_week: hoursPerWeek } = dbRow; if (_inFlight.has(streamId)) { console.log(`[verify:${sourceLabel}] Skipping duplicate for ${streamId?.slice(0, 10)}… (already in-flight)`); @@ -490,91 +490,134 @@ export async function extendFromEvent(dbRow, eventRef, sourceLabel) { _inFlight.add(streamId); try { - // On-chain state - let onChain; - try { - const results = await readStreamBatch([streamId], Number(chainId ?? 421614)); - onChain = results[0]; - } catch { return; } - - if (!onChain || onChain.sender === '0x0000000000000000000000000000000000000000') return; - - const now = Math.floor(Date.now() / 1000); - const streamValidUntil = Number(onChain.streamValidUntil ?? 0n); - const startTime = Number(onChain.startTime ?? 0n); - const totalDeposited = BigInt(onChain.totalDeposited ?? 0n); - const nonce = Number(onChain.nonce ?? 0n); - const period = Number(periodSeconds ?? 604800); + // Replay guard — already applied on-chain, or already waiting in the bank? + if (await isAlreadyProcessed(streamId, target, eventRef)) { + console.log(`[verify:${sourceLabel}] Already processed ${eventRef} for ${streamId?.slice(0, 10)}… — skipping`); + return; + } + if (await isWorkBanked(streamId, eventRef)) { + console.log(`[verify:${sourceLabel}] Already banked ${eventRef} for ${streamId?.slice(0, 10)}… — skipping`); + return; + } + + // Bank the verified deliverable first so it can never be lost, then drain. const dailyWorkSeconds = hoursPerWeek ? Math.round((hoursPerWeek / 5) * 3600) : null; - const extensionSeconds = dailyWorkSeconds ?? period; + const extensionSeconds = dailyWorkSeconds ?? Number(periodSeconds ?? 604800); + await bankWork({ streamId, source: sourceLabel, repository: target, eventRef, extensionSeconds }); + console.log(`[verify:${sourceLabel}] Banked ${eventRef} (+${extensionSeconds}s earned) for ${streamId?.slice(0, 10)}…`); - const isPending = streamValidUntil <= startTime && totalDeposited > 0n; - const isExpiring = !isPending && streamValidUntil > now && (streamValidUntil - now) <= WARN_WINDOW_S; - const isFrozen = !isPending && streamValidUntil <= now && (now - streamValidUntil) <= FROZEN_LOOKBACK_S && totalDeposited > 0n; + await applyBankedWork(dbRow, sourceLabel); + } finally { + _inFlight.delete(streamId); + } +} - if (!isPending && !isExpiring && !isFrozen) { - console.log(`[verify:${sourceLabel}] Stream ${streamId?.slice(0, 10)}… has healthy runway — skipping`); - return; - } +/** + * Draw down a stream's banked work onto the chain, FIFO, respecting: + * - runway: only while the stream is pending/expiring/frozen (never stacks + * beyond WARN_WINDOW_S of runway), + * - weekly cap: never exceeds hours_per_week × 3600 in a rolling 7-day window. + * Whatever can't be applied now stays banked for a later week / when runway frees. + * + * The caller is responsible for the _inFlight guard. + */ +async function applyBankedWork(dbRow, sourceLabel = 'drain') { + const { stream_id: streamId, chain_id: chainId, hours_per_week: hoursPerWeek } = dbRow; - // Replay guard - if (await isAlreadyProcessed(streamId, target, eventRef)) { - console.log(`[verify:${sourceLabel}] Already processed ${eventRef} for ${streamId?.slice(0, 10)}… — skipping`); - return; + const banked = await getBankedWork(streamId); + if (!banked.length) return; + + let onChain; + try { + const results = await readStreamBatch([streamId], Number(chainId ?? 421614)); + onChain = results[0]; + } catch { return; } + if (!onChain || onChain.sender === '0x0000000000000000000000000000000000000000') return; + + let nonce = Number(onChain.nonce ?? 0n); + let streamValidUntil = Number(onChain.streamValidUntil ?? 0n); + const startTime = Number(onChain.startTime ?? 0n); + const totalDeposited = BigInt(onChain.totalDeposited ?? 0n); + if (totalDeposited === 0n) return; + + const maxWeeklySeconds = hoursPerWeek ? Math.round(hoursPerWeek * 3600) : null; + + for (const entry of banked) { + const now = Math.floor(Date.now() / 1000); + + const isPending = streamValidUntil <= startTime; + const isExpiring = !isPending && streamValidUntil > now && (streamValidUntil - now) <= WARN_WINDOW_S; + const isFrozen = !isPending && streamValidUntil <= now && (now - streamValidUntil) <= FROZEN_LOOKBACK_S; + if (!isPending && !isExpiring && !isFrozen) { + // Healthy runway — leave the remaining entries banked for later. + break; } - // Weekly hours cap - if (hoursPerWeek && dailyWorkSeconds) { - const weekDuration = 7 * 86400; - const weeksSinceStart = Math.floor((now - startTime) / weekDuration); - const weekStart = startTime + weeksSinceStart * weekDuration; - const weekSecondsUsed = await getWeeklyExtendedSeconds(streamId, weekStart); - const maxWeeklySeconds = Math.round(hoursPerWeek * 3600); - - if (weekSecondsUsed + extensionSeconds > maxWeeklySeconds) { - console.log(`[verify:${sourceLabel}] Weekly cap reached for ${streamId?.slice(0, 10)}… (${weekSecondsUsed}/${maxWeeklySeconds}s) — skipping`); - return; + // Weekly cap — overflow carries to a later week. + if (maxWeeklySeconds) { + const weekDuration = 7 * 86400; + const weeksSinceStart = Math.floor((now - startTime) / weekDuration); + const weekStart = startTime + weeksSinceStart * weekDuration; + const weekSecondsUsed = await getWeeklyExtendedSeconds(streamId, weekStart); + if (weekSecondsUsed + entry.extension_seconds > maxWeeklySeconds) { + console.log(`[verify:${sourceLabel}] Weekly cap reached for ${streamId?.slice(0, 10)}… — ${banked.length} item(s) stay banked`); + break; } } const stateLabel = isPending ? 'pending' : isExpiring ? 'expiring' : 'frozen'; - console.log(`[verify:${sourceLabel}] Extending ${stateLabel} stream ${streamId?.slice(0, 10)}… +${extensionSeconds}s via ${eventRef}`); - - const extensionDurationSeconds = extensionSeconds; const expiry = now + VOUCHER_TTL_S; - let signature; + let signature, onChainResult; try { - signature = await signExtensionVoucher({ streamId, extensionDurationSeconds, nonce, expiry }); + signature = await signExtensionVoucher({ streamId, extensionDurationSeconds: entry.extension_seconds, nonce, expiry }); } catch (err) { console.error(`[verify:${sourceLabel}] Signing failed for ${streamId?.slice(0, 10)}…: ${err.message}`); - return; + break; } - - let onChainResult; try { - onChainResult = await submitExtension({ streamId, extensionDurationSeconds, nonce, expiry, signature }); + onChainResult = await submitExtension({ streamId, extensionDurationSeconds: entry.extension_seconds, nonce, expiry, signature }); } catch (err) { console.error(`[verify:${sourceLabel}] On-chain submission failed for ${streamId?.slice(0, 10)}…: ${err.message}`); - return; + break; } await recordExtension({ streamId, - repository: target, - prNumber: null, - eventRef, - chainId: onChainResult.chainId, - chainName: onChainResult.chainName, - txHash: onChainResult.txHash, - blockNumber: onChainResult.blockNumber, - gasUsed: onChainResult.gasUsed, - voucherExpiry: expiry, - extensionSeconds, + repository: entry.repository, + prNumber: null, + eventRef: entry.event_ref, + chainId: onChainResult.chainId, + chainName: onChainResult.chainName, + txHash: onChainResult.txHash, + blockNumber: onChainResult.blockNumber, + gasUsed: onChainResult.gasUsed, + voucherExpiry: expiry, + extensionSeconds: entry.extension_seconds, }); + await deleteBankedWork(entry.id); + console.log(`[verify:${sourceLabel}] ✓ Applied banked ${entry.event_ref} (${stateLabel}) +${entry.extension_seconds}s | tx=${onChainResult.txHash}`); - console.log(`[verify:${sourceLabel}] ✓ Extended stream ${streamId?.slice(0, 10)}… | tx=${onChainResult.txHash}`); - } finally { - _inFlight.delete(streamId); + // Advance local state for the next iteration's runway check. + nonce += 1; + const base = (isFrozen || isPending) ? now : streamValidUntil; + streamValidUntil = base + entry.extension_seconds; + } +} + +/** + * Periodic drainer — applies banked work for any stream that now has room + * (runway freed up, or a new week reset the cap) even if no new webhook arrived. + * Called on an interval from server startup. + */ +export async function drainAllBankedWork() { + let streams; + try { streams = await getStreamsWithBankedWork(); } catch { return; } + for (const row of streams) { + if (_inFlight.has(row.stream_id)) continue; + _inFlight.add(row.stream_id); + try { await applyBankedWork(row, 'drain'); } + catch (err) { console.error(`[drain] ${row.stream_id?.slice(0, 10)}…: ${err.message}`); } + finally { _inFlight.delete(row.stream_id); } } } diff --git a/frontend/src/pages/app/StreamDetail.jsx b/frontend/src/pages/app/StreamDetail.jsx index b57b334..74d2cbb 100644 --- a/frontend/src/pages/app/StreamDetail.jsx +++ b/frontend/src/pages/app/StreamDetail.jsx @@ -203,6 +203,7 @@ export default function StreamDetail() { const [idCopied, setIdCopied] = useState(false); const [agentStatus, setAgentStatus] = useState(null); // null | 'registered' | 'unregistered' const [extensions, setExtensions] = useState(null); // null = loading, [] = none + const [banked, setBanked] = useState([]); // verified work queued, not yet on-chain const [registering, setRegistering] = useState(false); const [manualTarget, setManualTarget] = useState(''); const [localVerificationTarget, setLocalVerificationTarget] = useState(''); @@ -269,8 +270,9 @@ export default function StreamDetail() { .then(data => { setAgentStatus(data?.stream ? 'registered' : 'unregistered'); setExtensions(Array.isArray(data?.extensions) ? data.extensions : []); + setBanked(Array.isArray(data?.banked) ? data.banked : []); }) - .catch(() => { setAgentStatus(null); setExtensions([]); }); + .catch(() => { setAgentStatus(null); setExtensions([]); setBanked([]); }); }, [id, stream?.streamId]); async function registerWithAgent() { @@ -791,6 +793,42 @@ export default function StreamDetail() { )} + {/* Queued — verified deliverables earned but not yet applied on-chain + (held behind the stream's runway / weekly cap). */} + {banked.length > 0 && ( +
No extensions yet.
+{banked.length > 0 ? 'No extensions applied on-chain yet.' : 'No extensions yet.'}
- {isPending + {banked.length > 0 + ? 'Queued work above applies as the stream needs runway.' + : isPending ? 'Submit verified work to trigger the first extension.' : 'Extensions appear here as work is verified.'}