Skip to content

feat: vector ingestion + queue scaffolding (PR-A)#14

Merged
rtpa25 merged 26 commits into
mainfrom
feat/vector-ingestion-and-queue
May 13, 2026
Merged

feat: vector ingestion + queue scaffolding (PR-A)#14
rtpa25 merged 26 commits into
mainfrom
feat/vector-ingestion-and-queue

Conversation

@rtpa25

@rtpa25 rtpa25 commented May 13, 2026

Copy link
Copy Markdown
Owner

Sets up forward-only vector indexing of every human message and every final assistant message (the answer-carrying one, not the intermediate tool-call carriers) via Cloudflare Queues producer/consumer on top of Cloudflare Vectorize + SQLite FTS5 in the per-user Kernel DO. Plus a hybridSearch function (BM25 + Vectorize + RRF + rerank-2.5-lite) and a debug HTTP route for operator verification.

PR-A is pure infrastructure — agent behavior is unchanged. PR-B (separate spec, separate PR) wires the agent-facing surface on top: context-message injection (<memory>, <sessions_recent>, <sessions_fragments>), session tools (sessions_search, session_info, sessions_list), and the read-side prompt block. PR-B reuses Kernel.searchMessages verbatim — same function, same scores, same ordering.

Key design decisions

  • Vectorize: index agent-os-messages, 1024-dim cosine.
  • Embeddings: voyage-4 via the official voyageai SDK, asymmetric inputType ("document" ingest, "query" search).
  • Reranker: rerank-2.5-lite (precision stage after RRF fusion of BM25 + vector recall).
  • BM25 half: SQLite FTS5 virtual table inside the Kernel DO with UNINDEXED message_id + porter unicode61 tokenizer. Mirrored from a new content_text column on message via three AFTER-INSERT/UPDATE/DELETE triggers.
  • Projection (role-aware): user → concat all text parts; assistant → concat only the trailing text parts (after the last tool-result), dropping mid-turn narration.
  • No backfill — pre-PR-A messages stay unindexed forever. Accepted.
  • Queue topology: one queue + DLQ, max_batch_size: 100, max_retries: 3. Future async-job features (thread naming, integration-skill authoring) drop in as additional switch (batch.queue) cases.
  • Queue handler is a thin shim that RPCs Kernel.processIngestionBatch(items) so all SQL + Voyage + Vectorize work lives in one place inside the DO.
  • Search architecture: hybridSearch(args) is a pure function exposed via Kernel.searchMessages DO method. GET /api/__debug/search?q=...&top_k=N&exclude_thread=... returns JSON for operator smoke. Retained, not removed before merge — PR-B reuses the same DO method.
  • Idempotency: at-least-once semantics via WHERE indexed_at IS NULL filter on consumer + naturally-idempotent Vectorize upserts by id.

Operator provisioning (before deploy)

wrangler vectorize create agent-os-messages --dimensions=1024 --metric=cosine
wrangler vectorize create-metadata-index agent-os-messages \
  --property-name=threadId --type=string
wrangler queues create ingest-vectors
wrangler queues create ingest-vectors-dlq
wrangler secret put VOYAGE_API_KEY

The create-metadata-index step is required for hybridSearch's cross-thread $ne filter to fire at runtime.

Out of scope (PR-B)

  • <memory>, <sessions_recent>, <sessions_fragments> injection in context-messages.ts
  • Agent-facing session tools (sessions_search, session_info, sessions_list)
  • Read-side system-prompt block teaching the agent how to consume injected context
  • Backfill of pre-PR-A message history

🤖 Generated with Claude Code

rtpa25 and others added 26 commits May 13, 2026 15:38
Removes the docs/**/*.md gitignore that kept specs/plans/notes
local-only and adds all existing docs to the repo. From here on,
brainstorming specs and implementation plans live in git alongside
the code so PRs can carry their design context.

Backfills 26 existing files under docs/superpowers/ (specs, plans, notes).
Greenfield infrastructure spec for forward-only indexing of human and
final-assistant chat messages into Cloudflare Vectorize, via a
Cloudflare Queues producer/consumer pattern that doubles as scaffolding
for future async jobs (thread naming, integration-skill authoring).

PR-A scope is pure infra — agent behavior is unchanged. PR-B (separate
spec) builds cross-thread search, memory injection, session tools, and
the read-side prompt block on top.

Key decisions pinned:

- Vectorize: 1024-dim cosine, voyage-4 embeddings (rerank deferred to PR-B).
- SQLite FTS5 in DO for BM25 half: UNINDEXED message_id column +
  porter unicode61 tokenizer.
- content_text column on message as single source of truth, kept
  in sync by AFTER INSERT/UPDATE/DELETE triggers on message.
- Role-aware projection: user → all text parts concat;
  assistant → trailing text parts only (drops mid-turn narration
  before the final answer).
- Queue handler is a thin shim that RPCs the Kernel DO; all SQL
  + Voyage + Vectorize work lives in processIngestionBatch().
- No backfill — pre-PR-A messages stay unindexed forever.
- Idempotency via indexed_at IS NULL filter on consumer.
Implementation plan for the spec at
docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md.

9 tasks, each with bite-sized checkbox steps:
1. Drizzle migration 0008 (columns + FTS5 + triggers)
2. project-message-text utility (role-aware projection)
3. Voyage embedBatch client
4. Wrangler bindings + env declarations (vectorize, queues, VOYAGE_API_KEY)
5. Kernel.processIngestionBatch() DO method
6. queue() handler + ingest-vectors router
7. Producer hook: user message (handlers.ts)
8. Producer hook: assistant message (turn.ts streaming + complete)
9. End-to-end smoke (operator-driven, includes temporary debug SQL RPC)

Per agent-os conventions: no unit tests in the plan (pnpm check-types
between tasks is the safety net), subagents dispatched on opus, every
implementation commit carries the Claude attribution footer.
Two updates to the PR-A spec:

1. Voyage client now uses the official @voyageai/typescript-sdk
   (npm package: voyageai). Drops the raw-fetch client design in
   favor of VoyageAIClient.embed() with camelCase params per the
   SDK's EmbedRequest type (inputType, outputDimension). Pattern
   matches the existing parallel-web SDK usage in web-search.ts.

2. Adds a hybridSearch() function + GET /api/__debug/search route
   so the operator can verify the index works end-to-end before
   PR-B builds the agent-facing surface on top. Hybrid algorithm
   is BM25 (FTS5) + Vectorize, RRF-fused, NO reranker — the
   reranker stays in PR-B's scope. Route is retained (not removed
   pre-merge) — PR-B reuses the searchMessages DO method.

Updated sections:
- §6 Voyage client (SDK shape)
- §9 NEW: Hybrid search function
- §10 NEW: Debug search route
- §13 Smoke (now exercises the search route end-to-end)
- §14 File-level list (adds search/hybrid.ts, voyageai dep)
- §15 Out-of-scope (rerank explicitly deferred to PR-B)
Plan now reflects:
- Task 3 rewritten: install voyageai dep, use VoyageAIClient.embed()
  with camelCase params (inputType, outputDimension). Drops the raw
  VoyageError class — bubble the SDK's errors unchanged.
- Task 9 NEW: hybridSearch function (BM25 + Vectorize + RRF, no
  reranker). Pure function over DB + env so it's reusable by PR-B.
- Task 10 NEW: Kernel.searchMessages DO method + /api/__debug/search
  HTTP route. Retained, not removed pre-merge.
- Task 11 (was Task 9, smoke): now exercises the debug route end-to-end
  for cross-thread filter + multi-tool-call assistant projection +
  streaming-state non-ingestion.

11 tasks total (was 9). Subagent-driven-development on opus per
agent-os convention.
Reranker is no longer deferred to PR-B — it's added to hybridSearch
in PR-A so the debug search route exercises the EXACT query path that
PR-B will consume via context injection and session tools. The only
delta between PRs is the wrapping surface (debug HTTP vs. tools +
context-messages.ts).

Changes:

- Voyage SDK helper (apps/kernel/src/utils/voyage.ts) now exports
  both embed() and rerank(). rerank uses VoyageAIClient.rerank()
  with returnDocuments=false (caller has the docs locally).
- hybridSearch is now two-stage: recall (BM25 + Vectorize + RRF fuse
  to top 20 candidates) then precision (rerank-2.5-lite to user's
  topK). HybridHit gains a rerankScore field; hits[] is ordered by
  rerankScore desc.
- New failure mode in §11: rerank call fail → bubbles to debug
  route 500. Indexing path is unaffected (rerank is read-side only).
- Smoke step 4 now asserts rerankScore is populated and explains
  the troubleshooting path for a rerank-only failure.

Rationale: rerank is one extra SDK call against an already-instantiated
client — negligible scope cost for verifying production-quality
behavior in the same PR. The alternative (PR-B finds a rerank surface
bug post-merge) is more expensive.
Two early-fail guards for the two assumptions in the plan that I'm
least confident about, surfaced for review in the conversation:

1. Task 3 (Voyage helper): embed() now throws loudly if voyage-4
   returns vectors whose dim differs from EMBEDDING_DIM (the
   Vectorize index dim). Catches Matryoshka-dim mismatches at the
   helper boundary instead of at the Vectorize.upsert call site
   where the error message is less actionable.

2. Task 8 (turn.ts producer hook): new step before the typecheck
   to specifically check that kernel.env.INGEST_VECTORS is
   accessible from runChatTurn. If Agent<Env> doesn't expose env
   publicly, the step documents two refactor options (public getter
   on Kernel, or pass env as a runChatTurn parameter).

Step renumbering: Task 8 Step 4 (was Typecheck) split into Step 4
(env verification) + Step 5 (typecheck) + Step 6 (commit).
…essage.content_text via triggers

Migration 0008 adds the two storage columns the vector-ingestion
pipeline needs and creates a SQLite FTS5 virtual table mirroring
content_text. AFTER INSERT/UPDATE/DELETE triggers keep message_fts
in sync with the source.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§4)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User/system messages project all text parts. Assistant messages project
only the trailing text parts (drops mid-turn narration before the final
answer). Returns empty string when no eligible parts exist — callers
gate enqueue on result.trim().length > 0.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§4.4)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`p.type === "text"` already narrows `p` to TextUIPart via the
discriminated union from the `ai` package — the `as { text: string }`
cast was no-op noise (the first branch's `.map((p) => p.text)` shows
the narrowing works without help).

Caught in Task 2 code quality review.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two thin wrappers around VoyageAIClient:
- embed(apiKey, texts, inputType) — used by both the queue consumer
  (input_type=document) and hybridSearch (input_type=query). Centralizes
  the voyage-4 / 1024-dim constants and sorts response data by index.
- rerank(apiKey, query, documents, topK) — used by hybridSearch as the
  precision stage after RRF fusion. rerank-2.5-lite, returnDocuments=false
  since the caller already has the docs locally.

Pattern matches the existing parallel-web SDK usage in
apps/kernel/src/tools/web-search.ts. Errors bubble unchanged — the
queue consumer relies on them to trigger CF Queues retry; hybridSearch
lets them surface as a 500 on the debug route.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§6, §6.1)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…_MODEL

Two fixes from Task 3 code quality review:

1. rerank() now throws when a result item has `d.index == null`. The
   SDK types it optional but Voyage's API always populates it on rerank
   responses — the score is meaningless without it. Previous `?? 0`
   would have silently misattributed the score to documents[0] on
   the impossible-but-not-rejected case. Embed-path validates by
   length + dim; rerank now validates by index presence. Symmetric
   "fail loud on protocol violation" posture.

2. RERANK_MODEL is internal — only referenced inside rerank(). Dropping
   `export` keeps the module's public surface honest. EMBEDDING_MODEL
   and EMBEDDING_DIM stay exported (Task 4 Vectorize provisioning +
   future telemetry).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds:
- vectorize binding (VECTORIZE -> agent-os-messages, 1024-dim cosine)
- queues producer (INGEST_VECTORS -> ingest-vectors)
- queues consumer (ingest-vectors, batch=100, timeout=10s, retries=3, DLQ)
- VOYAGE_API_KEY secret declaration (env.d.ts + .dev.vars.example)

Operator must provision out-of-band before deploy:
  wrangler vectorize create agent-os-messages --dimensions=1024 --metric=cosine
  wrangler queues create ingest-vectors
  wrangler queues create ingest-vectors-dlq
  wrangler secret put VOYAGE_API_KEY

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§5)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Consumer-side of the ingest-vectors queue. Called via DO RPC from the
queue() handler in apps/kernel/src/index.ts.

Idempotency:
- Filters rows on indexed_at IS NULL before doing any Voyage/Vectorize work.
- Vectorize upsert is naturally idempotent (id-keyed).
- Empty-text rows get indexed_at set without an embed call so retry loops
  don't form on rows that slipped past the producer's content gate.

Failure modes:
- Errors from voyage.embed() or env.VECTORIZE.upsert() bubble -> CF Queues
  retries the batch (exp. backoff, 3 retries, then DLQ).
- Partial commit (Vectorize upsert succeeded but indexed_at UPDATE failed):
  retry re-runs the embed wastefully but reaches the same end state.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (sec 8.3, 11)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Worker's queue() export is a thin router that switches on batch.queue
and delegates to per-queue handlers. The ingest-vectors handler is a
shim: it RPCs into the Kernel DO's processIngestionBatch() method so
all SQL + Voyage + Vectorize work lives in one place.

Pattern is extension-friendly: future queues (thread naming, integration
skill authoring) drop in as additional switch cases.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§8)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code quality review of Task 6 flagged the manual `for (const msg of
batch.messages) msg.ack()` loops as more code + more allocations than
the documented CF Queues idiom `batch.ackAll()` / `batch.retryAll()`.
Three call sites switched (one in ingest-vectors-handler.ts success
path, one in its failure path, one in index.ts queue() default branch).

Functionally identical, intent-clearer, and aligns with CF docs
examples. Added a one-line comment on the index.ts default branch
explaining the ack-on-unknown-queue choice (prevents retry storms on
misconfiguration; the error log is the alarm).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
handleChatRequest computes content_text via projectMessageText before
the user-INSERT transaction opens, sets it on the row, and enqueues
{ messageId } to INGEST_VECTORS after commit. Enqueue is gated on
non-empty content_text and wrapped in try/catch so a Queues outage
doesn't fail the chat request.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§7.1)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task 7 code quality review suggested adding `threadId` and `runId` to
the enqueue-failed log. Both are in scope at this point (threadId is
a handler param, runId is returned from the just-committed tx).
Operator triaging "thread X looks search-broken" can correlate without
joining back through the message table.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sets content_text on both the streaming-state and final-state assistant
INSERTs in runChatTurn (the latter wins via onConflictDoUpdate). Only
the final state (state='complete', inside onFinish) enqueues to
INGEST_VECTORS — intermediate streaming snapshots are never indexed.

Enqueue is gated on non-empty content_text (trailing-text projection
naturally excludes pure-tool-call assistant messages) and wrapped in
try/catch with messageId/threadId/runId logged for operator correlation.

The kernel.env verification hook was a no-op: env is already redeclared
as a public field on the Kernel class (kernel.ts:98), so turn.ts —
which is a free function taking kernel as a parameter — can reach
kernel.env.INGEST_VECTORS directly without needing a getEnv() getter.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§7.2)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pure function over a Drizzle DB handle and the bindings. Two-stage
retrieval:

  RECALL: BM25 (SQLite FTS5) and vector (Voyage query embed +
  Vectorize.query) in parallel, fused via Reciprocal Rank Fusion
  (k=60), top 20 candidates hydrated from the message table.

  PRECISION: rerank-2.5-lite reorders the 20 candidates by relevance,
  returns the user's requested topK best-first.

Returns HybridHit[] with both recall scores (bm25Rank, vectorRank,
rrfScore) and precision score (rerankScore) attached so the operator
can see the full pipeline's behavior.

excludeThreadId filters cross-thread search results (Vectorize via
metadata $ne filter; BM25 via post-filter since FTS5 has no native
metadata filter on UNINDEXED columns).

Same function PR-B will consume via context-messages.ts injection and
agent-facing session tools — verbatim, no shape change between PRs.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§9)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small fixes flagged in Task 9 review:

1. hybridSearch now early-returns [] on whitespace-only queries. Was
   asymmetric: BM25 short-circuited via escapeFtsQuery, but vector path
   would still send an empty string to Voyage and get a 400. Single
   args.query.trim().length === 0 guard at the function top covers both.

2. system-role rows defensively filtered at hydration via flatMap.
   Today producer hooks only enqueue user/assistant — but if a future
   ingestion path adds system rows, this filter excludes them at the
   search boundary instead of relying on the producer-side invariant
   holding from several files away. Dropped the `as "user"|"assistant"`
   cast at the final mapping site — TS now narrows naturally.

3. Plan Task 4 + Task 11 amended to include the `wrangler vectorize
   create-metadata-index agent-os-messages --property-name=threadId
   --type=string` provisioning step. Without it, the $ne metadata
   filter in runVector silently produces empty matches at runtime —
   not something the Task 11 smoke test would catch without this hint.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wraps hybridSearch() so the function is callable both from this debug
route (PR-A operator smoke) and from PR-B's context-message injection
and session tools.

The route accepts ?q=<query>&top_k=N&exclude_thread=<id> and returns
{ count, hits: HybridHit[] } as JSON. No auth — operator-only namespace
under /api/__debug/*. Retained, not removed before merge — PR-B reuses
the underlying searchMessages method.

Spec: docs/superpowers/specs/2026-05-13-vector-ingestion-and-queue-design.md (§10)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…deThreadId

Task 10 code-quality review suggested adding the other two query params
to the failure log for easier repro. Cheap change; surfaces things like
"topK=50 hit rerank ceiling" or "excludeThreadId pointed at a thread
without indexed messages" without a back-and-forth with the operator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1. CRITICAL — hybrid.ts BM25 SQL referenced m.thread_id, but the actual
   SQLite column is `threadId` (camelCase). Drizzle uses the JS field
   name verbatim when text() has no explicit SQL name argument. Every
   /api/__debug/search call would have returned 500 on first hit.
   Fixed by quoting the camelCase column name: m."threadId" AS thread_id.

2. IMPORTANT — turn-compaction.ts forkAndCompact() was a third
   user-message INSERT path with no producer hook. When auto-compact
   fires at the 800k-token boundary, the triggering user message was
   invisible to vector search forever. Added the same producer-hook
   pattern from handlers.ts (set contentText on insert, enqueue
   post-commit if non-empty, swallow Queues errors).

3. MINOR — dropped `export` from EMBEDDING_MODEL and EMBEDDING_DIM
   in voyage.ts. Grep confirmed neither is referenced outside the
   module. Matches the RERANK_MODEL un-export from Task 3's review.

4. MINOR — flipped index.ts default-arm comment from "the error log
   above is the alarm" to "below" — the console.error sits on the
   line after the comment.

All four caught by the final whole-PR reviewer (commits 95ee16b..87a27dc),
none by the per-task reviews. Smoke (Task 11) is now ready to run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n in Workers

Discovered at deploy time:
  ✘ Could not resolve "../api"
    node_modules/voyageai/dist/esm/extended/index.mjs:2:14

The voyageai 0.2.1 ESM bundle ships extensionless imports (../api,
../errors, ./ExtendedClient, etc.) that fail Workers' strict ESM
resolution. Stainless-generated SDKs are a known offender here.

Voyage's REST API is two endpoints (embeddings, rerank). Raw fetch
matches the Firecrawl/Parallel-web patterns elsewhere in this repo
with no npm-compat tax:
  - POST https://api.voyageai.com/v1/embeddings
  - POST https://api.voyageai.com/v1/rerank

The public surface of voyage.ts is unchanged — kernel.ts and hybrid.ts
import the same `embed` / `rerank` functions with the same signatures.
Response shape adapted from camelCase (SDK convention) to snake_case
(REST convention): `relevance_score`, `total_tokens`, `input_type`,
`output_dimension`. VoyageError now has 5 codes (added rerank_failed).

The original spec actually called for raw fetch; switching back was
prompted by user preference for the SDK during planning. That trade
didn't survive contact with Workers' bundler.

voyageai dep dropped from package.json (pnpm remove voyageai).
api.voyageai.com returns 403 for keys issued through the MongoDB Atlas
"Model API Keys" dashboard — the al-... format. Voyage AI was acquired
by MongoDB; those keys only authenticate against ai.mongodb.com.

Same Bearer auth, same request/response shape — just the host changes.
Verified end-to-end: GET /api/__debug/search?q=hello now returns
{"count":0,"hits":[]} (200 OK) instead of 500 voyage auth failed.

Refs:
https://www.mongodb.com/docs/voyageai/api-reference/overview/
https://www.mongodb.com/docs/voyageai/api-and-clients/

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rtpa25 rtpa25 merged commit 85e7dd2 into main May 13, 2026
3 checks passed
@rtpa25 rtpa25 deleted the feat/vector-ingestion-and-queue branch May 14, 2026 02:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant