From 176dcb59010c3b43b7c5c1db4e8047d88b2b0b78 Mon Sep 17 00:00:00 2001 From: Julian Bez Date: Mon, 18 May 2026 14:06:25 +0200 Subject: [PATCH] fix(phlower): collapse hourly_workers to worker group to plug week-long memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TaskAggregate.hourly_workers was keyed on the full Celery hostname, which in K8s embeds a per-pod hash. Rolling deploys and karpenter scaling churn pod names constantly, so each hourly Counter accumulated hundreds of unique string keys that never collapsed back down. With aggregate_retention_hours=168 the structure grew linearly for ~5 days before tripping the 6 GB pod limit — matches the observed OOM cadence. Switch the aggregate keys to worker_group (already derived in events.py and stored on InvocationRecord) so the Counter has a tiny, stable set of keys instead. SQLite invocation rows still carry the full hostname for per-invocation drill-in; only the aggregate collapses. Recovery path was leaking the same way — _flush_counts replays the SQLite worker column verbatim. Apply extract_worker_group there too, otherwise every restart would rehydrate the leak from history. Bump SNAPSHOT_VERSION so existing on-disk hourly_workers blobs are discarded and aggregates get rebuilt clean from row replay. Pyroscope memory profiling was discussed alongside this fix but the pyroscope-io Python client only ships CPU sampling — not memory. Leaving it out; tracemalloc/memray would be a separate change. --- src/phlower/snapshot.py | 2 +- src/phlower/sqlite_recovery.py | 17 +++++++++-------- src/phlower/store.py | 9 +++++---- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/phlower/snapshot.py b/src/phlower/snapshot.py index 682570d..2e2f18e 100644 --- a/src/phlower/snapshot.py +++ b/src/phlower/snapshot.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) -SNAPSHOT_VERSION = 1 +SNAPSHOT_VERSION = 2 # v2: hourly_workers keys are worker groups, not pod hostnames def _serialize_tdigest(td: TDigest | None) -> str | None: diff --git a/src/phlower/sqlite_recovery.py b/src/phlower/sqlite_recovery.py index 75c703f..68feec6 100644 --- a/src/phlower/sqlite_recovery.py +++ b/src/phlower/sqlite_recovery.py @@ -12,6 +12,7 @@ from .snapshot import deserialize_aggregate from .sqlite_store import SQLiteStore from .store import DIGEST_HOT_WINDOW, Store +from .workers import extract_worker_group logger = logging.getLogger(__name__) @@ -100,12 +101,15 @@ def _load_counts(store: Store, sqlite_store: SQLiteStore, conn, since_ts: float) except ValueError: continue + # Resolve worker group here (outside the store lock taken in _flush_counts) + # so the regex doesn't run on the hot path that blocks live ingest. + worker = row["worker"] batch.append(( task_name, state, row["minute_ts"], row["cnt"], - row["worker"], + extract_worker_group(worker) if worker else None, row["queue"], row["exception_type"], )) @@ -130,11 +134,11 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None: """ hot_cutoff = int(time.time()) - DIGEST_HOT_WINDOW with store._lock: - for (task_name, state, minute_ts, cnt, worker, queue, exception_type) in batch: + for (task_name, state, minute_ts, cnt, worker_group, queue, exception_type) in batch: agg = store._get_or_create_task(task_name) + hour_ts = minute_ts // 3600 * 3600 if minute_ts >= hot_cutoff: - # Recent data → per-minute buckets bucket = agg._get_or_create_bucket(minute_ts) bucket.count += cnt if state == TaskState.SUCCESS: @@ -144,8 +148,6 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None: elif state == TaskState.RETRY: bucket.retry += cnt else: - # Old data → hourly rollups directly - hour_ts = minute_ts // 3600 * 3600 hb = agg.hourly_counts.get(hour_ts) if hb is None: hb = HourBucket(timestamp=hour_ts) @@ -158,9 +160,8 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None: elif state == TaskState.RETRY: hb.retry += cnt - hour_ts = minute_ts // 3600 * 3600 - if worker: - agg._hourly_counter(agg.hourly_workers, hour_ts)[worker] += cnt + if worker_group: + agg._hourly_counter(agg.hourly_workers, hour_ts)[worker_group] += cnt if queue: agg._hourly_counter(agg.hourly_queues, hour_ts)[queue] += cnt if exception_type: diff --git a/src/phlower/store.py b/src/phlower/store.py index e021344..30289bc 100644 --- a/src/phlower/store.py +++ b/src/phlower/store.py @@ -558,10 +558,11 @@ def process_succeeded( agg.active_count = max(0, agg.active_count - 1) rec = self._ensure_record(task_id, name) - worker = rec.worker + # K8s pod hashes churn — aggregate by stable group, not hostname. + worker_group = rec.worker_group queue = rec.queue agg.record_terminal_event( - TaskState.SUCCESS, ts, runtime_ms=runtime_ms, worker=worker, queue=queue + TaskState.SUCCESS, ts, runtime_ms=runtime_ms, worker=worker_group, queue=queue ) rec.state = TaskState.SUCCESS @@ -599,13 +600,13 @@ def process_failed( agg.active_count = max(0, agg.active_count - 1) rec = self._ensure_record(task_id, name) - worker = rec.worker + worker_group = rec.worker_group queue = rec.queue agg.record_terminal_event( TaskState.FAILURE, ts, runtime_ms=runtime_ms, - worker=worker, + worker=worker_group, queue=queue, exception_type=exception_type, )