diff --git a/src/phlower/snapshot.py b/src/phlower/snapshot.py index 682570d..2e2f18e 100644 --- a/src/phlower/snapshot.py +++ b/src/phlower/snapshot.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) -SNAPSHOT_VERSION = 1 +SNAPSHOT_VERSION = 2 # v2: hourly_workers keys are worker groups, not pod hostnames def _serialize_tdigest(td: TDigest | None) -> str | None: diff --git a/src/phlower/sqlite_recovery.py b/src/phlower/sqlite_recovery.py index 75c703f..68feec6 100644 --- a/src/phlower/sqlite_recovery.py +++ b/src/phlower/sqlite_recovery.py @@ -12,6 +12,7 @@ from .snapshot import deserialize_aggregate from .sqlite_store import SQLiteStore from .store import DIGEST_HOT_WINDOW, Store +from .workers import extract_worker_group logger = logging.getLogger(__name__) @@ -100,12 +101,15 @@ def _load_counts(store: Store, sqlite_store: SQLiteStore, conn, since_ts: float) except ValueError: continue + # Resolve worker group here (outside the store lock taken in _flush_counts) + # so the regex doesn't run on the hot path that blocks live ingest. + worker = row["worker"] batch.append(( task_name, state, row["minute_ts"], row["cnt"], - row["worker"], + extract_worker_group(worker) if worker else None, row["queue"], row["exception_type"], )) @@ -130,11 +134,11 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None: """ hot_cutoff = int(time.time()) - DIGEST_HOT_WINDOW with store._lock: - for (task_name, state, minute_ts, cnt, worker, queue, exception_type) in batch: + for (task_name, state, minute_ts, cnt, worker_group, queue, exception_type) in batch: agg = store._get_or_create_task(task_name) + hour_ts = minute_ts // 3600 * 3600 if minute_ts >= hot_cutoff: - # Recent data → per-minute buckets bucket = agg._get_or_create_bucket(minute_ts) bucket.count += cnt if state == TaskState.SUCCESS: @@ -144,8 +148,6 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None: elif state == TaskState.RETRY: bucket.retry += cnt else: - # Old data → hourly rollups directly - hour_ts = minute_ts // 3600 * 3600 hb = agg.hourly_counts.get(hour_ts) if hb is None: hb = HourBucket(timestamp=hour_ts) @@ -158,9 +160,8 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None: elif state == TaskState.RETRY: hb.retry += cnt - hour_ts = minute_ts // 3600 * 3600 - if worker: - agg._hourly_counter(agg.hourly_workers, hour_ts)[worker] += cnt + if worker_group: + agg._hourly_counter(agg.hourly_workers, hour_ts)[worker_group] += cnt if queue: agg._hourly_counter(agg.hourly_queues, hour_ts)[queue] += cnt if exception_type: diff --git a/src/phlower/store.py b/src/phlower/store.py index e021344..30289bc 100644 --- a/src/phlower/store.py +++ b/src/phlower/store.py @@ -558,10 +558,11 @@ def process_succeeded( agg.active_count = max(0, agg.active_count - 1) rec = self._ensure_record(task_id, name) - worker = rec.worker + # K8s pod hashes churn — aggregate by stable group, not hostname. + worker_group = rec.worker_group queue = rec.queue agg.record_terminal_event( - TaskState.SUCCESS, ts, runtime_ms=runtime_ms, worker=worker, queue=queue + TaskState.SUCCESS, ts, runtime_ms=runtime_ms, worker=worker_group, queue=queue ) rec.state = TaskState.SUCCESS @@ -599,13 +600,13 @@ def process_failed( agg.active_count = max(0, agg.active_count - 1) rec = self._ensure_record(task_id, name) - worker = rec.worker + worker_group = rec.worker_group queue = rec.queue agg.record_terminal_event( TaskState.FAILURE, ts, runtime_ms=runtime_ms, - worker=worker, + worker=worker_group, queue=queue, exception_type=exception_type, )