Skip to content

emam07/DocFlow

Repository files navigation

DocFlow

Asynchronous document processing backend — production-style, locally runnable at zero recurring cost.

DocFlow lets authenticated users upload PDFs, images, or text; enqueues a background job for each upload; extracts metadata and text in a worker; stores the results; and exposes everything via a versioned REST API with OpenAPI docs, Prometheus metrics, and structured logs. The full stack — api, worker, Postgres, Redis, MinIO, Nginx, Prometheus, Grafana — runs with docker compose up --build.

Why this project is interesting to look at. It is a deliberately boring implementation of a deliberately ambitious problem. Every feature a real backend needs — retries, idempotency, ownership checks, presigned URLs, dashboards, migrations, CI — is there and wired correctly. Nothing in it is a demo mock.


Table of contents


Architecture

flowchart LR
    client([Client])

    subgraph edge[Edge]
      nginx[Nginx\n:8080]
    end

    subgraph app[Application]
      api[FastAPI API\n:8000]
      worker[Celery Worker\nmetrics :9100]
    end

    subgraph state[State]
      postgres[(Postgres 16)]
      redis[(Redis 7)]
      minio[(MinIO S3)]
    end

    subgraph observ[Observability]
      prom[Prometheus\n:9090]
      graf[Grafana\n:3000]
    end

    client -->|HTTPS terminates<br/>elsewhere in prod| nginx
    nginx -->|X-Request-ID<br/>X-Forwarded-*| api

    api --> postgres
    api --> minio
    api -->|enqueue| redis
    redis -->|consume| worker
    worker --> postgres
    worker --> minio

    prom -->|scrape /metrics| api
    prom -->|scrape :9100| worker
    graf --> prom
Loading

Data flow for one job. POST /documents/upload streams bytes to MinIO and writes a row to documents. POST /jobs writes a row to jobs and enqueues a Celery task on Redis. The worker picks up, transitions the job to processing, downloads the object, runs the content-type-specific handler, writes a row to processing_results, and flips the job to succeeded (or, after attempt-capped retries, failed).


Tech stack and rationale

Concern Choice Why
HTTP framework FastAPI Typed, Pydantic-validated request/response; free OpenAPI.
Schemas Pydantic v2 Validation + serialization + OpenAPI in one layer.
ORM + migrations SQLAlchemy 2.0 + Alembic Typed declarative mapping; reproducible schema history from commit #1.
Database PostgreSQL 16 Transactions, FKs, JSONB, strong indexing.
Queue / broker Celery 5 + Redis 7 Mature retry/backoff/chain; Redis is the simplest broker that works.
Object storage MinIO S3-compatible; identical to AWS S3 for our code.
Reverse proxy Nginx Models the production boundary — proxy headers, size limits, security headers.
Metrics Prometheus + Grafana Pull-model metrics, PromQL, provisioned dashboard.
Logs python-json-logger One structured JSON object per line; grep-and-aggregate friendly.
Auth JWT (HS256) Stateless verification; simple for this scope.
Rate limit slowapi Per-IP caps on expensive endpoints.
PDF / image / OCR pypdf, Pillow, pytesseract Reliable open-source extraction; OCR guarded by an env flag.
Dev tooling ruff, mypy, pytest, pre-commit, Docker Compose Fast, strict, reproducible.

Detailed rationale for every choice is in docs/architecture_decisions.md.


Repository layout

docflow/
├── app/                       # Python package shared by api + worker
│   ├── api/                   # FastAPI routers (HTTP concerns only)
│   ├── core/                  # config, security, logging, errors, metrics, request-id
│   ├── db/                    # SQLAlchemy models + session
│   ├── schemas/               # Pydantic request/response schemas
│   ├── services/              # business logic (auth, document, job, processing, storage)
│   ├── workers/               # Celery app + tasks
│   ├── utils/                 # pure helpers
│   └── main.py                # FastAPI factory + middleware wiring
├── migrations/                # Alembic (env.py + versions/)
├── tests/                     # unit + integration (SQLite + fake storage for speed)
├── infra/                     # nginx.conf, prometheus.yml, grafana provisioning
├── scripts/                   # seed_demo.py, smoke_test.sh
├── docs/                      # engineering journal, ADRs, runbook, errors log
├── docker-compose.yml
├── Dockerfile  /  Dockerfile.worker
├── Makefile
├── pyproject.toml
├── alembic.ini
├── .env.example
└── .github/workflows/ci.yml

Quickstart

# 1. Clone and set up env
cp .env.example .env
# Generate a real JWT secret:
python -c "import secrets; print(secrets.token_urlsafe(48))" \
  | sed "s/^/JWT_SECRET=/" >> .env   # or edit .env manually

# 2. Bring up the full stack (api, worker, postgres, redis, minio, nginx, prometheus, grafana)
make up

# 3. Apply migrations
make migrate

# 4. Seed a demo user and a demo job
make seed

# 5. Open Swagger
open http://localhost:8080/docs

Services surfaced to the host:

URL What
http://localhost:8080/docs OpenAPI docs (through Nginx)
http://localhost:8080/api/v1/health/ready Readiness with DB/Redis/MinIO checks
http://localhost:9001 MinIO console (admin: docflow / docflow123)
http://localhost:9090 Prometheus
http://localhost:3000 Grafana (auto-provisioned dashboard)

Full operations reference: docs/runbook.md.


Environment variables

See .env.example for the complete list with defaults. The important ones:

Var Default Purpose
JWT_SECRET (required) Signing secret — generate with secrets.token_urlsafe(48).
DATABASE_URL postgresql+psycopg://docflow:docflow@postgres:5432/docflow SQLAlchemy URL.
CELERY_BROKER_URL / CELERY_RESULT_BACKEND redis://redis:6379/1 / …/2 Broker + result store.
S3_ENDPOINT_URL / S3_BUCKET MinIO / docflow-documents Storage.
MAX_UPLOAD_BYTES 26214400 (25 MiB) Upload cap (enforced by app; Nginx is slightly larger).
ALLOWED_CONTENT_TYPES application/pdf,image/png,image/jpeg,text/plain Accepted uploads.
JOB_MAX_ATTEMPTS 3 Terminal cap across all attempts.
TESSERACT_ENABLED true OCR on images; worker image installs Tesseract.

API overview

Method Path Auth What
POST /api/v1/auth/register Create user.
POST /api/v1/auth/login Exchange credentials for an access + refresh token pair.
POST /api/v1/auth/refresh Rotate the refresh token; issue a new access.
POST /api/v1/auth/logout Blocklist the access jti; optionally revoke a refresh jti.
GET /api/v1/auth/me Current user.
POST /api/v1/documents/upload Multipart upload; streaming SHA-256, size + type validation, storage in MinIO.
GET /api/v1/documents List owner's documents (paginated).
GET /api/v1/documents/{id} Detail.
GET /api/v1/documents/{id}/download-url Short-lived presigned URL to the file.
GET /api/v1/documents/{id}/result Latest processing result.
POST /api/v1/jobs Create a job (supports Idempotency-Key header).
GET /api/v1/jobs List jobs (status filter + pagination).
GET /api/v1/jobs/{id} Job detail; includes result when terminal.
GET /api/v1/jobs/{id}/events SSE stream of event: status frames until terminal.
GET /api/v1/admin/failed-jobs ✔ admin DLQ entries (filter by reason, since).
GET /api/v1/admin/failed-jobs/depth ✔ admin Current DLQ depth.
POST /api/v1/admin/failed-jobs/{id}/requeue ✔ admin Reset job + re-enqueue.
GET /api/v1/health/live Liveness (process responds).
GET /api/v1/health/ready Readiness (DB + Redis + MinIO).
GET /metrics Prometheus exposition.

Errors use a single envelope:

{ "error": { "code": "not_found", "message": "Document not found.", "request_id": "abc..." } }

Sample curl flow

# Register + login
curl -sX POST localhost:8080/api/v1/auth/register \
     -H 'content-type: application/json' \
     -d '{"email":"alice@example.com","password":"supersecret1"}'

TOKEN=$(curl -sX POST localhost:8080/api/v1/auth/login \
          -H 'content-type: application/json' \
          -d '{"email":"alice@example.com","password":"supersecret1"}' \
        | jq -r .access_token)

# Upload
DOC=$(curl -sX POST localhost:8080/api/v1/documents/upload \
        -H "authorization: bearer $TOKEN" \
        -F "file=@/path/to/file.pdf;type=application/pdf" \
      | jq -r .id)

# Create job (idempotency-safe)
JOB=$(curl -sX POST localhost:8080/api/v1/jobs \
        -H "authorization: bearer $TOKEN" \
        -H "content-type: application/json" \
        -H "idempotency-key: my-client-uuid" \
        -d "{\"document_id\":\"$DOC\"}" \
      | jq -r .id)

# Poll
watch -n1 "curl -s localhost:8080/api/v1/jobs/$JOB -H 'authorization: bearer $TOKEN' | jq .status"

# Fetch result
curl -s localhost:8080/api/v1/documents/$DOC/result -H "authorization: bearer $TOKEN" | jq

The scripts/smoke_test.sh script performs the full flow and verifies idempotent replay.


How background jobs work

  1. Client POSTs /jobs with a document_id (and optional Idempotency-Key).
  2. API: ownership + readiness checks → inserts a jobs row with status=queued → enqueues Celery task process_document_job(job_id) on Redis (task_id = job_id for enqueue-idempotency).
  3. Worker pulls the task:
    • Looks up the job row; short-circuits if already terminal.
    • Sets status=processing, increments attempts, commits.
    • Calls processing_service.run_processing_pipeline, which downloads from MinIO and dispatches to a handler (PDF / text / image).
    • On success: writes processing_results, sets status=succeeded, records completed_at.
    • On TransientProcessingError: if attempts < max_attempts, calls self.retry(countdown=...) with exponential backoff + jitter; otherwise marks failed with last_error.
    • On PermanentProcessingError (corrupt PDF, unsupported type) or anything else: marks failed immediately.

State lives in Postgres. The broker is the delivery channel, not the source of truth.


Retries and idempotency

Retries

  • JOB_MAX_ATTEMPTS (default 3) is the hard cap, in our DB, not Celery's max_retries counter.
  • Exponential backoff with jitter: min(60, 2^attempt) * rand(0.5, 1.0) seconds.
  • Only TransientProcessingError retries. Permanent / unknown errors fail fast.
  • After the cap, the job is terminal-failed with last_error populated.

Idempotency

  • POST /jobs accepts an Idempotency-Key header scoped to the user.
  • UNIQUE(owner_id, idempotency_key) enforces at-most-once in the DB.
  • On replay within TTL → the same job is returned with HTTP 200 (vs. 201 on first creation), so clients can distinguish.
  • Same key with a different document_id409 Conflict.

ADR-008 and ADR-009 carry the full reasoning.


Observability

  • Structured JSON logs. One object per line: timestamp, level, logger, message, request_id, + custom fields.
  • Request correlation. Middleware assigns or honors X-Request-ID; Nginx forwards it; task enqueues carry it into the worker log context.
  • Prometheus metrics. Emitted from both api (/metrics) and worker (:9100):
    • http_requests_total{method,route,status_code} (counter)
    • http_request_duration_seconds{method,route} (histogram)
    • jobs_created_total{result}created vs. idempotent_replay
    • jobs_succeeded_total
    • jobs_failed_total{reason}transient, transient_exhausted, permanent, unhandled
    • job_processing_duration_seconds{result}
  • Grafana. Pre-provisioned dashboard (DocFlow — System Overview) with request rates, error rates, and job p95 latency.
  • Health.
    • /health/live — does the process respond? (liveness)
    • /health/ready — DB + Redis + MinIO reachable? (readiness)

Testing

make test          # pytest (40+ tests)
make test-cov      # with coverage
make lint          # ruff check
make type          # mypy (strict)
make check         # all three

Tests run against SQLite with dialect-compile shims for PG-specific types (JSONB/UUID) and a fake in-memory storage — fast, parallel, no Docker required. CI additionally boots a real Postgres service and runs alembic upgrade/downgrade to catch migration drift.


Production-grade extras

These five capabilities ship in the box on top of the core scope and are individually worth interview discussions:

Extra What lives where Why it's interesting
SSE for job status app/api/v1/jobs.py::stream_job_events Push-based status without WebSockets; bounded by terminal state, client disconnect, and a 60-s hard cap. Keepalives prevent proxy idle-timeouts; X-Accel-Buffering: no disables Nginx response buffering.
Refresh tokens + revocation app/services/{auth,token_store}.py Two-token model (access 15 min / refresh 30 days) with jti-keyed Redis blocklist; refresh rotates on every use; logout blocklists the access token's remaining lifetime. TokenStore is a Protocol; tests swap an in-memory implementation.
Dead-letter queue + admin app/services/dlq_service.py, app/api/v1/admin.py, migrations/...0002_dlq_and_admin.py Permanently-failed jobs land in dead_letter_jobs; admins list/inspect/requeue via /api/v1/admin/failed-jobs; Prometheus alerts (infra/prometheus/alerts.yml) fire on depth.
Testcontainers full-fidelity tests tests_containers/ (separate from tests/) Real Postgres + MinIO via testcontainers. JSONB-operator query roundtrip and Alembic up/down roundtrip — both impossible against the SQLite shim path. Marked containers, deselected by default.
OpenTelemetry tracing app/core/tracing.py, docker-compose.yml::jaeger OTLP/gRPC exporter to Jaeger. FastAPI / SQLAlchemy / Redis / Botocore / Celery / requests instrumented; LoggingInstrumentor adds otelTraceID to every log line. W3C Trace Context propagates api → worker via Celery instrumentation.

Run them:

make test                # 59 fast tests (containers excluded)
make test-containers     # 3 full-fidelity tests against real Postgres + MinIO via Docker

Open Jaeger at http://localhost:16686 after make up.


Tradeoffs and future work

Honest list:

  • No refresh tokens / token revocation. Short-lived access tokens only. Production: add refresh + a Redis blocklist.
  • No dead-letter queue. Permanently failed jobs are flagged in the DB but not routed elsewhere.
  • No distributed tracing. Request IDs + metrics, not OpenTelemetry. Future upgrade.
  • Per-route rate limits are basic. SlowAPIMiddleware globally; granular limits would come with a Redis backend.
  • Sync SQLAlchemy. Simpler for api + worker sharing; async would raise the per-process concurrency ceiling.
  • Single worker pool / priority-queue-less. Everything lands on celery queue; larger systems split by priority and tenant.
  • Tests use SQLite for speed. CI additionally runs migrations against real Postgres. Full-fidelity integration tests against all services are a future upgrade (testcontainers-python).

See docs/engineering_journal.md#production-evolution for the longer narrative on scaling this up.


DocFlow was built to be discussed in detail — feel free to open any file; every nontrivial choice has a comment or doc pointing to the reason.

About

A complete solution for ur doc problems.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors