Skip to content

feat: refactor to pull-based worker architecture #41

@rorybyrne

Description

@rorybyrne

Summary

Refactor the event processing architecture from push-based to pull-based workers. This addresses several issues identified during PR #35 review related to partial failure complexity, batching at the wrong layer, and scalability limitations.

Problems with Current Design

1. Partial Failure Complexity

The current BatchEventListener processes multiple events together. When processing fails partway through, we face a dilemma:

  • Events successfully processed aren't marked as delivered
  • Events that should be skipped get mixed with failures
  • The worker can't distinguish between "skip these" and "retry these"

2. Batching at the Wrong Layer

Batching was introduced for GPU efficiency (embedding generation), but batching at the event processing layer conflates:

  • Reliable event delivery (outbox pattern) - at-least-once semantics
  • Efficient batch operations (GPU optimization) - throughput concern

3. Push-Based Limits Scalability

Current design has a single worker that fetches events and pushes to listeners. For distributed/out-of-process workers this creates:

  • Worker becomes a coordinator/bottleneck
  • Need transport layer between worker and backends
  • Batching config split between worker and remote backends

4. Backend-Specific Batching Not Possible

Different backends have different needs (vector wants batching, keyword doesn't), but current design batches at event type level, not backend level.

Proposed Architecture

Flip from push to pull. Each worker/backend claims events it cares about directly from the outbox. PostgreSQL becomes the message broker via SELECT ... FOR UPDATE SKIP LOCKED.

┌─────────────────────────────────────────────────────┐
│                    Outbox (PostgreSQL)              │
│                                                     │
│  SELECT ... WHERE event_type = ? AND routing_key = ?│
│  FOR UPDATE SKIP LOCKED                             │
└─────────────────────────────────────────────────────┘
        ▲               ▲               ▲
        │ claim         │ claim         │ claim
        │               │               │
┌───────┴───────┐ ┌─────┴─────┐ ┌───────┴───────┐
│  Validation   │ │  Vector   │ │   Keyword     │
│  Worker       │ │  Worker   │ │   Worker      │
└───────────────┘ └───────────┘ └───────────────┘

Key Design Decisions

  1. Workers Claim, Not Receive - Workers actively claim events they're interested in
  2. Routing Key for Sub-Filtering - Events have optional routing_key for finer-grained claiming
  3. Concurrency via SKIP LOCKED - Multiple workers run concurrently without coordination
  4. Batching is Worker's Choice - Each worker decides its own batching strategy

Changes Required

Outbox / Event Repository

  • Add claim() method with event_types, routing_key, limit, max_wait parameters
  • Add routing_key column to events table (nullable, indexed)
  • Use FOR UPDATE SKIP LOCKED for concurrent access

Worker Refactoring

  • Replace single BackgroundWorker with WorkerPool managing multiple workers
  • Each worker configured with event types and optional routing key
  • Workers run their own poll loops

Remove BatchEventListener

  • Remove BatchEventListener protocol
  • Remove IndexRecordBatch listener
  • Each worker processes events (single or batch) as it sees fit

Migration Path

  1. Add routing_key column to events table
  2. Implement claim() method on outbox
  3. Create new worker infrastructure alongside existing
  4. Migrate listeners one-by-one to pull-based workers
  5. Remove old BackgroundWorker and BatchEventListener

Acceptance Criteria

  • Events table has routing_key column with index
  • Outbox has claim() method using FOR UPDATE SKIP LOCKED
  • WorkerPool manages multiple workers with independent poll loops
  • Workers can filter by event type and routing key
  • Each worker controls its own batching strategy
  • BatchEventListener and IndexRecordBatch removed
  • Multiple workers can run concurrently without coordination
  • All existing tests pass with new architecture

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions