Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ HL=en-US
# Cadences (seconds)
POLL_LIVE_INTERVAL=30
POLL_FRAME_INTERVAL=10

# Live push (WebSocket /live/{game_id})
# Prod (separate worker + api processes): set REDIS_URL so the bus bridges them.
# REDIS_URL=redis://localhost:6379/0
# Dev single-process: run ingestion inside the API so live push works without Redis.
RUN_WORKER_IN_API=false
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ this project adheres to [Semantic Versioning](https://semver.org/).
- Async Alembic migrations with the initial schema (production source of truth).
- OpenAPI tags and richer API metadata (description, contact, license).
- API endpoint tests (response shapes + pagination).
- WebSocket `/live/{game_id}` live push (derived events + score snapshots).
- Live event bus with in-process (dev/test) and Redis (prod) backends.
- Optional `RUN_WORKER_IN_API` to run ingestion inside the API process for single-process live push.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ RUN pip install --no-cache-dir .

COPY app ./app

# Surcharger la commande pour le worker : python -m app.ingestion.worker
# Override the command for the worker: python -m app.ingestion.worker
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ make worker # worker → ingests when a match is live
| GET | `/matches/{id}` | match + games |
| GET | `/games/{id}/events` | derived events (KILL/TOWER/DRAGON/BARON/INHIB) |
| GET | `/games/{id}/frames` | raw frames |
| WS | `/live/{id}` | live push: `subscribed` ack, then `event` / `score` messages |

## Prod (Docker, Postgres)

Expand Down
40 changes: 39 additions & 1 deletion app/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

from __future__ import annotations

import asyncio
import contextlib
from collections.abc import AsyncIterator

from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, Depends, HTTPException, Query, WebSocket, WebSocketDisconnect
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

from app.bus import get_bus
from app.db import models
from app.db.base import SessionLocal
from app.schemas.api import (
Expand Down Expand Up @@ -196,3 +199,38 @@ async def game_frames(
for f in rows
]
return Page[FrameOut](items=items, total=total, limit=limit, offset=offset)


@router.websocket("/live/{game_id}")
async def live_feed(websocket: WebSocket, game_id: str):
"""Stream derived events and score snapshots for a game in real time.

On connect, sends a `{"type": "subscribed"}` ack, then forwards each
`event` / `score` message published by the ingestion worker for this game.
"""
await websocket.accept()
await websocket.send_json({"type": "subscribed", "game_id": game_id})
bus = get_bus()

async with bus.subscribe(game_id) as subscription:

async def _forward() -> None:
# Sending on a half-closed socket raises; treat it as a disconnect.
with contextlib.suppress(WebSocketDisconnect, RuntimeError):
while True:
await websocket.send_json(await subscription.get())

async def _watch_disconnect() -> None:
# No inbound messages expected; this just detects the client closing
# the socket so we can stop forwarding.
with contextlib.suppress(WebSocketDisconnect):
while True:
await websocket.receive_text()

tasks = [asyncio.create_task(_forward()), asyncio.create_task(_watch_disconnect())]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
for task in done:
task.exception() # retrieve so it isn't flagged as "never retrieved"
104 changes: 104 additions & 0 deletions app/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Live event bus: bridges the ingestion worker to WebSocket clients.

Two implementations:
- `InProcessBus`: asyncio-only, for single-process dev/test (and the optional
worker-in-API mode). Subscribers get an in-memory queue.
- `RedisBus`: Redis pub/sub, for production where the worker and the API run as
separate processes and need a broker between them.

`get_bus()` selects one based on `settings.redis_url`.
"""

from __future__ import annotations

import asyncio
import json
from collections import defaultdict
from contextlib import asynccontextmanager
from typing import Any

from app.config import settings

Message = dict[str, Any]


class _QueueSubscription:
def __init__(self) -> None:
self._queue: asyncio.Queue[Message] = asyncio.Queue()

def push(self, message: Message) -> None:
self._queue.put_nowait(message)

async def get(self) -> Message:
return await self._queue.get()


class InProcessBus:
name = "in-process"

def __init__(self) -> None:
self._subs: dict[str, set[_QueueSubscription]] = defaultdict(set)

async def publish(self, game_id: str, message: Message) -> None:
for sub in list(self._subs.get(game_id, ())):
sub.push(message)

@asynccontextmanager
async def subscribe(self, game_id: str):
sub = _QueueSubscription()
self._subs[game_id].add(sub)
try:
yield sub
finally:
self._subs[game_id].discard(sub)
if not self._subs[game_id]:
self._subs.pop(game_id, None)


class _RedisSubscription:
def __init__(self, pubsub) -> None:
self._pubsub = pubsub

async def get(self) -> Message:
while True:
msg = await self._pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if msg and msg.get("type") == "message":
return json.loads(msg["data"])


class RedisBus:
name = "redis"

def __init__(self, url: str) -> None:
import redis.asyncio as aioredis

self._redis = aioredis.from_url(url, decode_responses=True)

@staticmethod
def _channel(game_id: str) -> str:
return f"live:{game_id}"

async def publish(self, game_id: str, message: Message) -> None:
await self._redis.publish(self._channel(game_id), json.dumps(message))

@asynccontextmanager
async def subscribe(self, game_id: str):
pubsub = self._redis.pubsub()
try:
await pubsub.subscribe(self._channel(game_id))
yield _RedisSubscription(pubsub)
finally:
# aclose() unsubscribes and releases the connection even if
# subscribe() above failed.
await pubsub.aclose()


_bus: InProcessBus | RedisBus | None = None


def get_bus() -> InProcessBus | RedisBus:
"""Process-wide singleton bus (Redis in prod, in-process otherwise)."""
global _bus
if _bus is None:
_bus = RedisBus(settings.redis_url) if settings.redis_url else InProcessBus()
return _bus
7 changes: 7 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,12 @@ class Settings(BaseSettings):
poll_live_interval: float = 30.0 # detection of live games
poll_frame_interval: float = 10.0 # native cadence of the window feed

# Live push (WebSocket). With REDIS_URL set, the worker and API processes are
# bridged via Redis pub/sub; otherwise an in-process bus is used.
redis_url: str | None = None
# Dev convenience: run the ingestion worker inside the API process so the
# in-process bus works end-to-end without Redis.
run_worker_in_api: bool = False


settings = Settings()
51 changes: 46 additions & 5 deletions app/ingestion/pollers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from app.db.base import SessionLocal
from app.db.repository import Repository
from app.engine.diff import EventDeriver
from app.schemas.domain import GameRef
from app.schemas.domain import DerivedEvent, GameRef, NormalizedFrame
from app.sources.base import SourceAdapter
from app.timeutil import round10, utcnow

Expand All @@ -27,10 +27,46 @@
MAX_IDLE_SLICES = 6


def _event_message(ev: DerivedEvent) -> dict:
return {
"type": "event",
"game_id": ev.game_id,
"ts": ev.ts.isoformat(),
"event_type": ev.type,
"side": ev.side,
"info": ev.info,
}


def _score_message(frame: NormalizedFrame) -> dict:
return {
"type": "score",
"game_id": frame.game_id,
"ts": frame.ts.isoformat(),
"state": frame.state,
"blue": {"kills": frame.blue.kills, "towers": frame.blue.towers, "gold": frame.blue.gold},
"red": {"kills": frame.red.kills, "towers": frame.red.towers, "gold": frame.red.gold},
}


class GamePoller:
def __init__(self, adapter: SourceAdapter, game: GameRef):
def __init__(self, adapter: SourceAdapter, game: GameRef, bus):
self.adapter = adapter
self.game = game
self.bus = bus

async def _publish(self, events: list[DerivedEvent], frame: NormalizedFrame) -> None:
"""Push to the live bus. Must never break ingestion: swallow + log failures."""
try:
for ev in events:
await self.bus.publish(self.game.game_id, _event_message(ev))
await self.bus.publish(self.game.game_id, _score_message(frame))
except Exception: # noqa: BLE001 - a bus blip must not stop persistence
log.warning(
"bus.publish failed for game %s; ingestion continues",
self.game.game_id,
exc_info=True,
)

async def run(self) -> None:
deriver = EventDeriver()
Expand Down Expand Up @@ -61,8 +97,10 @@ async def run(self) -> None:
await repo.save_metadata(self.game.game_id, sl.metadata)

for frame in sl.frames:
await repo.save_events(deriver.push(frame))
events = deriver.push(frame)
await repo.save_events(events)
await repo.save_frame(frame)
await self._publish(events, frame)

last = sl.frames[-1]
if last.state == "finished":
Expand All @@ -78,8 +116,9 @@ async def run(self) -> None:


class LiveGameTracker:
def __init__(self, adapter: SourceAdapter):
def __init__(self, adapter: SourceAdapter, bus):
self.adapter = adapter
self.bus = bus
self._tasks: dict[str, asyncio.Task] = {}

async def run(self) -> None:
Expand All @@ -100,11 +139,13 @@ async def run(self) -> None:
game.red_code,
)
self._tasks[game.game_id] = asyncio.create_task(
GamePoller(self.adapter, game).run()
GamePoller(self.adapter, game, self.bus).run()
)

for gid, task in list(self._tasks.items()):
if task.done():
self._tasks.pop(gid)
if not task.cancelled() and task.exception() is not None:
log.error("game poller %s crashed: %r", gid, task.exception())

await asyncio.sleep(settings.poll_live_interval)
3 changes: 2 additions & 1 deletion app/ingestion/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import httpx

from app.bus import get_bus
from app.db.base import init_db
from app.ingestion.pollers import LiveGameTracker
from app.sources.lol_feed import LolFeedAdapter
Expand All @@ -21,7 +22,7 @@ async def main() -> None:
await init_db()
async with httpx.AsyncClient(timeout=20) as client:
adapter = LolFeedAdapter(client)
await LiveGameTracker(adapter).run()
await LiveGameTracker(adapter, get_bus()).run()


if __name__ == "__main__":
Expand Down
27 changes: 26 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

from __future__ import annotations

import asyncio
import contextlib
from contextlib import asynccontextmanager

import httpx
from fastapi import FastAPI

from app.api.routes import router
from app.bus import get_bus
from app.config import settings
from app.db.base import init_db
from app.ingestion.pollers import LiveGameTracker
from app.sources.lol_feed import LolFeedAdapter

TAGS_METADATA = [
{"name": "catalog", "description": "Leagues, teams and players."},
Expand All @@ -21,7 +28,25 @@ async def lifespan(app: FastAPI):
# Dev convenience: create tables on startup (SQLite). In production, use the
# Alembic migrations (see `alembic/`) as the source of truth instead.
await init_db()
yield

worker_task: asyncio.Task | None = None
client: httpx.AsyncClient | None = None
if settings.run_worker_in_api:
# Single-process mode: run ingestion in-process so the in-process bus
# bridges worker -> WebSocket without Redis.
client = httpx.AsyncClient(timeout=20)
tracker = LiveGameTracker(LolFeedAdapter(client), get_bus())
worker_task = asyncio.create_task(tracker.run())

try:
yield
finally:
if worker_task is not None:
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
if client is not None:
await client.aclose()


app = FastAPI(
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ services:
build: .
environment:
DATABASE_URL: postgresql+asyncpg://esport:esport@db:5432/esport
REDIS_URL: redis://redis:6379/0
ports: ["8000:8000"]
depends_on: [db, redis]

Expand All @@ -24,6 +25,7 @@ services:
command: python -m app.ingestion.worker
environment:
DATABASE_URL: postgresql+asyncpg://esport:esport@db:5432/esport
REDIS_URL: redis://redis:6379/0
depends_on: [db, redis]

volumes:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"aiosqlite>=0.20", # Local DB by default (dev)
"asyncpg>=0.29", # Postgres (prod)
"alembic>=1.13", # Schema migrations
"redis>=5.0", # Live pub/sub bus (prod live push)
"pydantic>=2.6",
"pydantic-settings>=2.2",
]
Expand Down
Loading