diff --git a/.env.example b/.env.example index d3e01af..95be6c0 100644 --- a/.env.example +++ b/.env.example @@ -13,3 +13,9 @@ HL=en-US # Cadences (seconds) POLL_LIVE_INTERVAL=30 POLL_FRAME_INTERVAL=10 + +# Live push (WebSocket /live/{game_id}) +# Prod (separate worker + api processes): set REDIS_URL so the bus bridges them. +# REDIS_URL=redis://localhost:6379/0 +# Dev single-process: run ingestion inside the API so live push works without Redis. +RUN_WORKER_IN_API=false diff --git a/CHANGELOG.md b/CHANGELOG.md index ecbddac..c964649 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ this project adheres to [Semantic Versioning](https://semver.org/). - Async Alembic migrations with the initial schema (production source of truth). - OpenAPI tags and richer API metadata (description, contact, license). - API endpoint tests (response shapes + pagination). +- WebSocket `/live/{game_id}` live push (derived events + score snapshots). +- Live event bus with in-process (dev/test) and Redis (prod) backends. +- Optional `RUN_WORKER_IN_API` to run ingestion inside the API process for single-process live push. ### Changed diff --git a/Dockerfile b/Dockerfile index 6a8c697..37d6f66 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,5 +8,5 @@ RUN pip install --no-cache-dir . COPY app ./app -# Surcharger la commande pour le worker : python -m app.ingestion.worker +# Override the command for the worker: python -m app.ingestion.worker CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/README.md b/README.md index be7dfdd..d8b2106 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ make worker # worker → ingests when a match is live | GET | `/matches/{id}` | match + games | | GET | `/games/{id}/events` | derived events (KILL/TOWER/DRAGON/BARON/INHIB) | | GET | `/games/{id}/frames` | raw frames | +| WS | `/live/{id}` | live push: `subscribed` ack, then `event` / `score` messages | ## Prod (Docker, Postgres) diff --git a/app/api/routes.py b/app/api/routes.py index f324d02..7634aee 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -7,12 +7,15 @@ from __future__ import annotations +import asyncio +import contextlib from collections.abc import AsyncIterator -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query, WebSocket, WebSocketDisconnect from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession +from app.bus import get_bus from app.db import models from app.db.base import SessionLocal from app.schemas.api import ( @@ -196,3 +199,38 @@ async def game_frames( for f in rows ] return Page[FrameOut](items=items, total=total, limit=limit, offset=offset) + + +@router.websocket("/live/{game_id}") +async def live_feed(websocket: WebSocket, game_id: str): + """Stream derived events and score snapshots for a game in real time. + + On connect, sends a `{"type": "subscribed"}` ack, then forwards each + `event` / `score` message published by the ingestion worker for this game. + """ + await websocket.accept() + await websocket.send_json({"type": "subscribed", "game_id": game_id}) + bus = get_bus() + + async with bus.subscribe(game_id) as subscription: + + async def _forward() -> None: + # Sending on a half-closed socket raises; treat it as a disconnect. + with contextlib.suppress(WebSocketDisconnect, RuntimeError): + while True: + await websocket.send_json(await subscription.get()) + + async def _watch_disconnect() -> None: + # No inbound messages expected; this just detects the client closing + # the socket so we can stop forwarding. + with contextlib.suppress(WebSocketDisconnect): + while True: + await websocket.receive_text() + + tasks = [asyncio.create_task(_forward()), asyncio.create_task(_watch_disconnect())] + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + for task in done: + task.exception() # retrieve so it isn't flagged as "never retrieved" diff --git a/app/bus.py b/app/bus.py new file mode 100644 index 0000000..d60edfd --- /dev/null +++ b/app/bus.py @@ -0,0 +1,104 @@ +"""Live event bus: bridges the ingestion worker to WebSocket clients. + +Two implementations: +- `InProcessBus`: asyncio-only, for single-process dev/test (and the optional + worker-in-API mode). Subscribers get an in-memory queue. +- `RedisBus`: Redis pub/sub, for production where the worker and the API run as + separate processes and need a broker between them. + +`get_bus()` selects one based on `settings.redis_url`. +""" + +from __future__ import annotations + +import asyncio +import json +from collections import defaultdict +from contextlib import asynccontextmanager +from typing import Any + +from app.config import settings + +Message = dict[str, Any] + + +class _QueueSubscription: + def __init__(self) -> None: + self._queue: asyncio.Queue[Message] = asyncio.Queue() + + def push(self, message: Message) -> None: + self._queue.put_nowait(message) + + async def get(self) -> Message: + return await self._queue.get() + + +class InProcessBus: + name = "in-process" + + def __init__(self) -> None: + self._subs: dict[str, set[_QueueSubscription]] = defaultdict(set) + + async def publish(self, game_id: str, message: Message) -> None: + for sub in list(self._subs.get(game_id, ())): + sub.push(message) + + @asynccontextmanager + async def subscribe(self, game_id: str): + sub = _QueueSubscription() + self._subs[game_id].add(sub) + try: + yield sub + finally: + self._subs[game_id].discard(sub) + if not self._subs[game_id]: + self._subs.pop(game_id, None) + + +class _RedisSubscription: + def __init__(self, pubsub) -> None: + self._pubsub = pubsub + + async def get(self) -> Message: + while True: + msg = await self._pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) + if msg and msg.get("type") == "message": + return json.loads(msg["data"]) + + +class RedisBus: + name = "redis" + + def __init__(self, url: str) -> None: + import redis.asyncio as aioredis + + self._redis = aioredis.from_url(url, decode_responses=True) + + @staticmethod + def _channel(game_id: str) -> str: + return f"live:{game_id}" + + async def publish(self, game_id: str, message: Message) -> None: + await self._redis.publish(self._channel(game_id), json.dumps(message)) + + @asynccontextmanager + async def subscribe(self, game_id: str): + pubsub = self._redis.pubsub() + try: + await pubsub.subscribe(self._channel(game_id)) + yield _RedisSubscription(pubsub) + finally: + # aclose() unsubscribes and releases the connection even if + # subscribe() above failed. + await pubsub.aclose() + + +_bus: InProcessBus | RedisBus | None = None + + +def get_bus() -> InProcessBus | RedisBus: + """Process-wide singleton bus (Redis in prod, in-process otherwise).""" + global _bus + if _bus is None: + _bus = RedisBus(settings.redis_url) if settings.redis_url else InProcessBus() + return _bus diff --git a/app/config.py b/app/config.py index 484459e..ff75150 100644 --- a/app/config.py +++ b/app/config.py @@ -21,5 +21,12 @@ class Settings(BaseSettings): poll_live_interval: float = 30.0 # detection of live games poll_frame_interval: float = 10.0 # native cadence of the window feed + # Live push (WebSocket). With REDIS_URL set, the worker and API processes are + # bridged via Redis pub/sub; otherwise an in-process bus is used. + redis_url: str | None = None + # Dev convenience: run the ingestion worker inside the API process so the + # in-process bus works end-to-end without Redis. + run_worker_in_api: bool = False + settings = Settings() diff --git a/app/ingestion/pollers.py b/app/ingestion/pollers.py index d7a2d3c..fedfdea 100644 --- a/app/ingestion/pollers.py +++ b/app/ingestion/pollers.py @@ -17,7 +17,7 @@ from app.db.base import SessionLocal from app.db.repository import Repository from app.engine.diff import EventDeriver -from app.schemas.domain import GameRef +from app.schemas.domain import DerivedEvent, GameRef, NormalizedFrame from app.sources.base import SourceAdapter from app.timeutil import round10, utcnow @@ -27,10 +27,46 @@ MAX_IDLE_SLICES = 6 +def _event_message(ev: DerivedEvent) -> dict: + return { + "type": "event", + "game_id": ev.game_id, + "ts": ev.ts.isoformat(), + "event_type": ev.type, + "side": ev.side, + "info": ev.info, + } + + +def _score_message(frame: NormalizedFrame) -> dict: + return { + "type": "score", + "game_id": frame.game_id, + "ts": frame.ts.isoformat(), + "state": frame.state, + "blue": {"kills": frame.blue.kills, "towers": frame.blue.towers, "gold": frame.blue.gold}, + "red": {"kills": frame.red.kills, "towers": frame.red.towers, "gold": frame.red.gold}, + } + + class GamePoller: - def __init__(self, adapter: SourceAdapter, game: GameRef): + def __init__(self, adapter: SourceAdapter, game: GameRef, bus): self.adapter = adapter self.game = game + self.bus = bus + + async def _publish(self, events: list[DerivedEvent], frame: NormalizedFrame) -> None: + """Push to the live bus. Must never break ingestion: swallow + log failures.""" + try: + for ev in events: + await self.bus.publish(self.game.game_id, _event_message(ev)) + await self.bus.publish(self.game.game_id, _score_message(frame)) + except Exception: # noqa: BLE001 - a bus blip must not stop persistence + log.warning( + "bus.publish failed for game %s; ingestion continues", + self.game.game_id, + exc_info=True, + ) async def run(self) -> None: deriver = EventDeriver() @@ -61,8 +97,10 @@ async def run(self) -> None: await repo.save_metadata(self.game.game_id, sl.metadata) for frame in sl.frames: - await repo.save_events(deriver.push(frame)) + events = deriver.push(frame) + await repo.save_events(events) await repo.save_frame(frame) + await self._publish(events, frame) last = sl.frames[-1] if last.state == "finished": @@ -78,8 +116,9 @@ async def run(self) -> None: class LiveGameTracker: - def __init__(self, adapter: SourceAdapter): + def __init__(self, adapter: SourceAdapter, bus): self.adapter = adapter + self.bus = bus self._tasks: dict[str, asyncio.Task] = {} async def run(self) -> None: @@ -100,11 +139,13 @@ async def run(self) -> None: game.red_code, ) self._tasks[game.game_id] = asyncio.create_task( - GamePoller(self.adapter, game).run() + GamePoller(self.adapter, game, self.bus).run() ) for gid, task in list(self._tasks.items()): if task.done(): self._tasks.pop(gid) + if not task.cancelled() and task.exception() is not None: + log.error("game poller %s crashed: %r", gid, task.exception()) await asyncio.sleep(settings.poll_live_interval) diff --git a/app/ingestion/worker.py b/app/ingestion/worker.py index 8f9e663..96b355d 100644 --- a/app/ingestion/worker.py +++ b/app/ingestion/worker.py @@ -10,6 +10,7 @@ import httpx +from app.bus import get_bus from app.db.base import init_db from app.ingestion.pollers import LiveGameTracker from app.sources.lol_feed import LolFeedAdapter @@ -21,7 +22,7 @@ async def main() -> None: await init_db() async with httpx.AsyncClient(timeout=20) as client: adapter = LolFeedAdapter(client) - await LiveGameTracker(adapter).run() + await LiveGameTracker(adapter, get_bus()).run() if __name__ == "__main__": diff --git a/app/main.py b/app/main.py index 1ace8ff..d49e1b7 100644 --- a/app/main.py +++ b/app/main.py @@ -2,12 +2,19 @@ from __future__ import annotations +import asyncio +import contextlib from contextlib import asynccontextmanager +import httpx from fastapi import FastAPI from app.api.routes import router +from app.bus import get_bus +from app.config import settings from app.db.base import init_db +from app.ingestion.pollers import LiveGameTracker +from app.sources.lol_feed import LolFeedAdapter TAGS_METADATA = [ {"name": "catalog", "description": "Leagues, teams and players."}, @@ -21,7 +28,25 @@ async def lifespan(app: FastAPI): # Dev convenience: create tables on startup (SQLite). In production, use the # Alembic migrations (see `alembic/`) as the source of truth instead. await init_db() - yield + + worker_task: asyncio.Task | None = None + client: httpx.AsyncClient | None = None + if settings.run_worker_in_api: + # Single-process mode: run ingestion in-process so the in-process bus + # bridges worker -> WebSocket without Redis. + client = httpx.AsyncClient(timeout=20) + tracker = LiveGameTracker(LolFeedAdapter(client), get_bus()) + worker_task = asyncio.create_task(tracker.run()) + + try: + yield + finally: + if worker_task is not None: + worker_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await worker_task + if client is not None: + await client.aclose() app = FastAPI( diff --git a/docker-compose.yml b/docker-compose.yml index de4e3aa..a86809f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,7 @@ services: build: . environment: DATABASE_URL: postgresql+asyncpg://esport:esport@db:5432/esport + REDIS_URL: redis://redis:6379/0 ports: ["8000:8000"] depends_on: [db, redis] @@ -24,6 +25,7 @@ services: command: python -m app.ingestion.worker environment: DATABASE_URL: postgresql+asyncpg://esport:esport@db:5432/esport + REDIS_URL: redis://redis:6379/0 depends_on: [db, redis] volumes: diff --git a/pyproject.toml b/pyproject.toml index bd4618c..8fe09c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "aiosqlite>=0.20", # Local DB by default (dev) "asyncpg>=0.29", # Postgres (prod) "alembic>=1.13", # Schema migrations + "redis>=5.0", # Live pub/sub bus (prod live push) "pydantic>=2.6", "pydantic-settings>=2.2", ] diff --git a/tests/test_bus.py b/tests/test_bus.py new file mode 100644 index 0000000..cfbbbbb --- /dev/null +++ b/tests/test_bus.py @@ -0,0 +1,26 @@ +"""Tests for the in-process live event bus.""" + +from app.bus import InProcessBus + + +async def test_publish_reaches_subscriber(): + bus = InProcessBus() + async with bus.subscribe("g1") as sub: + await bus.publish("g1", {"type": "event", "info": "first blood"}) + msg = await sub.get() + assert msg == {"type": "event", "info": "first blood"} + + +async def test_games_are_isolated(): + bus = InProcessBus() + async with bus.subscribe("g1") as sub1, bus.subscribe("g2") as sub2: + await bus.publish("g2", {"x": 1}) + assert await sub2.get() == {"x": 1} + assert sub1._queue.empty() # g1 received nothing + + +async def test_subscription_is_cleaned_up(): + bus = InProcessBus() + async with bus.subscribe("g1"): + pass + assert "g1" not in bus._subs # no lingering subscribers diff --git a/tests/test_ws.py b/tests/test_ws.py new file mode 100644 index 0000000..1fbc454 --- /dev/null +++ b/tests/test_ws.py @@ -0,0 +1,11 @@ +"""WebSocket live endpoint smoke test.""" + +from fastapi.testclient import TestClient + +from app.main import app + + +def test_live_ws_sends_subscribed_ack(): + with TestClient(app) as client, client.websocket_connect("/live/game-1") as ws: + assert ws.receive_json() == {"type": "subscribed", "game_id": "game-1"} + ws.close() # clean client close -> handler detects disconnect and returns