Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/phlower/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

logger = logging.getLogger(__name__)

SNAPSHOT_VERSION = 1
SNAPSHOT_VERSION = 2 # v2: hourly_workers keys are worker groups, not pod hostnames


def _serialize_tdigest(td: TDigest | None) -> str | None:
Expand Down
17 changes: 9 additions & 8 deletions src/phlower/sqlite_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .snapshot import deserialize_aggregate
from .sqlite_store import SQLiteStore
from .store import DIGEST_HOT_WINDOW, Store
from .workers import extract_worker_group

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,12 +101,15 @@ def _load_counts(store: Store, sqlite_store: SQLiteStore, conn, since_ts: float)
except ValueError:
continue

# Resolve worker group here (outside the store lock taken in _flush_counts)
# so the regex doesn't run on the hot path that blocks live ingest.
worker = row["worker"]
batch.append((
task_name,
state,
row["minute_ts"],
row["cnt"],
row["worker"],
extract_worker_group(worker) if worker else None,
row["queue"],
row["exception_type"],
))
Expand All @@ -130,11 +134,11 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None:
"""
hot_cutoff = int(time.time()) - DIGEST_HOT_WINDOW
with store._lock:
for (task_name, state, minute_ts, cnt, worker, queue, exception_type) in batch:
for (task_name, state, minute_ts, cnt, worker_group, queue, exception_type) in batch:
agg = store._get_or_create_task(task_name)
hour_ts = minute_ts // 3600 * 3600

if minute_ts >= hot_cutoff:
# Recent data → per-minute buckets
bucket = agg._get_or_create_bucket(minute_ts)
bucket.count += cnt
if state == TaskState.SUCCESS:
Expand All @@ -144,8 +148,6 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None:
elif state == TaskState.RETRY:
bucket.retry += cnt
else:
# Old data → hourly rollups directly
hour_ts = minute_ts // 3600 * 3600
hb = agg.hourly_counts.get(hour_ts)
if hb is None:
hb = HourBucket(timestamp=hour_ts)
Expand All @@ -158,9 +160,8 @@ def _flush_counts(store: Store, batch: list[tuple]) -> None:
elif state == TaskState.RETRY:
hb.retry += cnt

hour_ts = minute_ts // 3600 * 3600
if worker:
agg._hourly_counter(agg.hourly_workers, hour_ts)[worker] += cnt
if worker_group:
agg._hourly_counter(agg.hourly_workers, hour_ts)[worker_group] += cnt
if queue:
agg._hourly_counter(agg.hourly_queues, hour_ts)[queue] += cnt
if exception_type:
Expand Down
9 changes: 5 additions & 4 deletions src/phlower/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,11 @@ def process_succeeded(
agg.active_count = max(0, agg.active_count - 1)

rec = self._ensure_record(task_id, name)
worker = rec.worker
# K8s pod hashes churn — aggregate by stable group, not hostname.
worker_group = rec.worker_group
queue = rec.queue
agg.record_terminal_event(
TaskState.SUCCESS, ts, runtime_ms=runtime_ms, worker=worker, queue=queue
TaskState.SUCCESS, ts, runtime_ms=runtime_ms, worker=worker_group, queue=queue
)

rec.state = TaskState.SUCCESS
Expand Down Expand Up @@ -599,13 +600,13 @@ def process_failed(
agg.active_count = max(0, agg.active_count - 1)

rec = self._ensure_record(task_id, name)
worker = rec.worker
worker_group = rec.worker_group
queue = rec.queue
agg.record_terminal_event(
TaskState.FAILURE,
ts,
runtime_ms=runtime_ms,
worker=worker,
worker=worker_group,
queue=queue,
exception_type=exception_type,
)
Expand Down
Loading