面向耗时文档任务的异步处理与任务治理控制台
用户上传 PDF 或 TXT 后,Django 立即创建任务,Celery Worker 在后台异步处理,Redis 传递任务消息,SQLite 持久化权威任务状态、进度、日志与结果。
TaskForge Mini 聚焦批量文档解析中的几个常见工程问题:
- 同步请求等待解析,容易超时并占用 Web 请求资源;
- 任务处于等待、执行还是失败,用户难以判断;
- 短暂故障需要自动恢复,损坏文件则不应反复重试;
- 相同文件和处理参数被重复提交时会浪费计算资源;
- 处理日志、进度和结果缺少统一的查询入口。
本项目的重点是可观察、可恢复的任务生命周期,而不是追求领先的文档解析算法。它是一个两天范围内完成的后端工程 MVP,不代表已经投入生产。
- PDF、TXT 上传,包含扩展名、MIME、大小和 PDF 文件头校验;
- 使用分块读取计算 SHA-256 文件哈希;
- 通过 Celery 和 Redis 异步投递、执行任务;
- 在 SQLite 中持久化任务状态、进度、重试次数、日志和结果;
- 区分临时错误与永久错误,仅对临时错误自动重试;
- 3 秒、6 秒指数退避,以及
force_fail_once可重复演示的故障注入; - 按文件哈希、处理类型和参数哈希复用已有成功结果;
- 任务列表、详情、结果和日志 API;
- Django Template、原生 JavaScript 和 Bootstrap 5 单页控制台;
- Web、Worker、Redis 三服务 Docker Compose 启动方案。
- 正常异步处理:上传文档,观察
QUEUED → RUNNING → SUCCESS,查看解析预览和统计结果。 - 自动恢复:开启“故障演练”,观察任务进入
RETRYING,从日志确认第二次执行成功。 - 结果复用:再次上传相同内容,接口返回已有任务并标记
cache_hit=true,不再投递 Worker。
flowchart LR
Browser["Browser<br/>单页控制台"]
Web["Django Web<br/>API 与状态查询"]
DB[("SQLite<br/>权威任务状态")]
Redis[("Redis<br/>Celery 消息")]
Worker["Celery Worker"]
Parser["Document Parser<br/>TXT / PyMuPDF"]
Media[("Media Storage<br/>上传文件")]
Browser -->|"上传 PDF/TXT"| Web
Browser -->|"HTTP 轮询状态"| Web
Web -->|"写任务、日志"| DB
Web -->|"事务提交后投递"| Redis
Redis -->|"消费任务主键"| Worker
Worker -->|"读取文件"| Media
Worker -->|"调用解析器"| Parser
Worker -->|"写进度、日志、结果"| DB
Web -->|"查询权威状态"| DB
更完整的生命周期、重试时序和设计取舍见 架构设计。
- Python 3.11
- Django、Django REST Framework
- Celery、Redis
- SQLite
- PyMuPDF
- Django Templates、Bootstrap 5、原生 JavaScript、Fetch API、CSS
- Docker、Docker Compose
确保已经安装 Docker Desktop 或 Docker Engine,并在项目根目录执行:
docker compose up --build停止服务并保留数据:
docker compose down不要随意执行 docker compose down -v。-v 会删除 taskforge_db 和 taskforge_media 命名卷,其中包含 SQLite 数据库和上传文件。
Compose 启动 Web 前会执行数据库迁移;Worker 等待 Redis 和 Web 健康后启动。示例环境变量见 .env.example。
先准备项目依赖和数据库迁移,再分别打开终端启动三个进程。
-
启动本地 Redis,使其监听默认地址
redis://127.0.0.1:6379/0:redis-server
-
启动 Celery Worker:
python -m celery -A config worker -l INFO --pool=solo
Windows 开发环境使用
--pool=solo,减少多进程兼容问题。 -
启动 Django:
python manage.py migrate python manage.py runserver 8001
本地环境可以通过 CELERY_BROKER_URL、DJANGO_DB_PATH 等变量覆盖默认配置。
| 方法 | 路径 | 用途 |
|---|---|---|
GET |
/ |
单页任务控制台 |
GET |
/api/health/ |
服务健康检查 |
POST |
/api/tasks/ |
上传 PDF/TXT,创建任务或复用成功结果 |
GET |
/api/tasks/ |
最近任务、全局状态汇总,可按 status 过滤 |
GET |
/api/tasks/{task_id}/ |
查询单个任务状态和进度 |
GET |
/api/tasks/{task_id}/result/ |
查询解析结果;可选 include_full_text=true |
GET |
/api/tasks/{task_id}/logs/ |
查询最近任务日志,可按 level 过滤 |
接口错误采用统一结构:
{
"error": {
"code": "ERROR_CODE",
"message": "可读错误说明"
}
}运行 Django 配置检查与自动测试:
python manage.py check
python manage.py test本文档不声明测试数量、覆盖率、响应时间或性能结论;请以本地实际执行输出为准。
taskforge-mini/
├── config/ # Django、Celery 配置和根路由
├── tasks/
│ ├── migrations/ # 数据库迁移
│ ├── static/tasks/ # 控制台 CSS 与 JavaScript
│ ├── templates/tasks/ # Django 控制台模板
│ ├── models.py # DocumentTask、TaskResult、TaskLog
│ ├── serializers.py # API 输入与输出结构
│ ├── services.py # 上传、哈希、复用与投递服务
│ ├── selectors.py # 列表、统计、结果与日志查询
│ ├── parsers.py # TXT 与 PDF 解析
│ ├── tasks.py # Celery 执行与自动重试
│ ├── views.py # 页面和 API 视图
│ └── tests.py # 自动测试
├── docs/ # 产品、架构与演示文档
├── Dockerfile
├── compose.yaml
├── requirements.txt
├── manage.py
└── AGENTS.md # 开发范围和工程约束
当前版本是两天完成的 MVP:
- 使用 SQLite,而不是分布式或独立数据库服务;
- 使用 Django 开发服务器,而不是生产 Web Server;
- 没有用户注册、登录、角色或多租户;
- 没有 OCR,扫描版 PDF 可能只能得到空文本和警告日志;
- 没有云端正式部署;
- 没有任务取消、删除和分布式锁;
- HTTP 轮询适合演示规模,不代表大规模实时推送方案。
以下仅是未来演进方向,不是当前已经完成的能力:
- 将本地 media 迁移到对象存储;
- 将 SQLite 迁移到 PostgreSQL 或 MySQL;
- 增加更可靠的并发幂等控制和任务租约;
- 支持任务取消与人工恢复;
- 增加队列、失败率、耗时等监控指标;
- 在完成安全、配置和可观测性改造后部署到云端容器平台。
