diff --git a/src/routers/analytics.py b/src/routers/analytics.py index bfbbc3b..7b1eaba 100644 --- a/src/routers/analytics.py +++ b/src/routers/analytics.py @@ -5,7 +5,7 @@ from typing import Annotated, Any from fastapi import APIRouter, Depends, HTTPException, Query, status -from sqlalchemy import or_ +from sqlalchemy import bindparam, or_, text from sqlalchemy.orm import Session from ..database import get_db @@ -327,6 +327,168 @@ def _forecast_query( return query +def _execute_sql( + db: Session, + sql: str, + params: dict[str, Any], + expanding_params: set[str] | None = None, +): + statement = text(sql) + for name in expanding_params or set(): + statement = statement.bindparams(bindparam(name, expanding=True)) + return db.execute(statement, params) + + +def _append_partner_scope_sql( + clauses: list[str], + params: dict[str, Any], + expanding_params: set[str], + user: User, + partner_id: int | None, + partner_columns: tuple[str, ...], +) -> None: + if _is_admin(user): + if partner_id is not None: + params["partner_id"] = partner_id + clauses.append( + "(" + " OR ".join(f"{column} = :partner_id" for column in partner_columns) + ")" + ) + return + + partner_ids = sorted(_active_partner_ids(user)) + if not partner_ids: + clauses.append("FALSE") + return + + params["partner_ids"] = partner_ids + expanding_params.add("partner_ids") + clauses.append( + "(" + " OR ".join(f"{column} IN :partner_ids" for column in partner_columns) + ")" + ) + + +def _zone_where_sql( + user: User, + partner_id: int | None, + zone_id: int | None, + camera_id: int | None, + active_only: bool, + params: dict[str, Any], + expanding_params: set[str], +) -> str: + clauses: list[str] = [] + + if active_only: + clauses.append("pz.is_active IS TRUE") + if zone_id is not None: + params["zone_id"] = zone_id + clauses.append("pz.parking_zone_id = :zone_id") + if camera_id is not None: + params["camera_id"] = camera_id + clauses.append("pz.camera_id = :camera_id") + + _append_partner_scope_sql( + clauses, + params, + expanding_params, + user, + partner_id, + ("pz.partner_id", "c.partner_id"), + ) + + return " AND ".join(clauses) if clauses else "TRUE" + + +def _observation_where_sql( + user: User, + partner_id: int | None, + zone_id: int | None, + camera_id: int | None, + active_only: bool, + params: dict[str, Any], + expanding_params: set[str], +) -> str: + clauses = [ + "oo.observed_at >= :from_", + "oo.observed_at <= :to", + ] + + if active_only: + clauses.append("pz.is_active IS TRUE") + if zone_id is not None: + params["zone_id"] = zone_id + clauses.append("oo.zone_id = :zone_id") + if camera_id is not None: + params["camera_id"] = camera_id + clauses.append("COALESCE(oo.camera_id, pz.camera_id) = :camera_id") + + _append_partner_scope_sql( + clauses, + params, + expanding_params, + user, + partner_id, + ("oo.partner_id", "pz.partner_id", "c.partner_id"), + ) + + return " AND ".join(clauses) + + +def _forecast_where_sql( + user: User, + partner_id: int | None, + zone_id: int | None, + camera_id: int | None, + forecast_created_at: datetime | None, + active_only: bool, + params: dict[str, Any], + expanding_params: set[str], +) -> str: + clauses = [ + "f.predicted_for >= :from_", + "f.predicted_for <= :to", + ] + + if active_only: + clauses.append("pz.is_active IS TRUE") + if zone_id is not None: + params["zone_id"] = zone_id + clauses.append("f.zone_id = :zone_id") + if camera_id is not None: + params["camera_id"] = camera_id + clauses.append("COALESCE(f.camera_id, pz.camera_id) = :camera_id") + if forecast_created_at is not None: + params["forecast_created_at"] = forecast_created_at + clauses.append("f.generated_at = :forecast_created_at") + + _append_partner_scope_sql( + clauses, + params, + expanding_params, + user, + partner_id, + ("f.partner_id", "pz.partner_id", "c.partner_id"), + ) + + return " AND ".join(clauses) + + +def _bucket_sql(column: str = "observed_at") -> str: + return ( + "to_timestamp(" + f"floor(extract(epoch FROM {column}) / :bucket_seconds) * :bucket_seconds" + ")" + ) + + +def _as_float(value: Any) -> float | None: + return None if value is None else float(value) + + +def _as_int(value: Any) -> int | None: + return None if value is None else int(value) + + def _get_detection_or_404(db: Session, detection_run_id: int) -> OccupancyObservation: detection = db.query(OccupancyObservation).filter( OccupancyObservation.observation_id == detection_run_id @@ -699,63 +861,129 @@ def get_analytics_summary( from_, to = _validate_period(from_, to) _validate_common_filters(db, current_user, partner_id, zone_id, camera_id) - zones = _zone_query( - db, + params: dict[str, Any] = {"from_": from_, "to": to} + expanding_params: set[str] = set() + zone_where = _zone_where_sql( current_user, - partner_id=partner_id, - zone_id=zone_id, - camera_id=camera_id, + partner_id, + zone_id, + camera_id, active_only=True, - ).all() - observations = _observation_query( - db, + params=params, + expanding_params=expanding_params, + ) + observation_where = _observation_where_sql( current_user, - from_, - to, - partner_id=partner_id, - zone_id=zone_id, - camera_id=camera_id, - ).all() - - zone_ids = {zone.parking_zone_id for zone in zones} - observations_by_zone = _group_observations_by_zone(observations) - - total_capacity = sum(zone.capacity for zone in zones) - current_occupied_count = sum(zone.occupied for zone in zones) if zones else None - current_free_count = ( - sum(max(zone.capacity - zone.occupied, 0) for zone in zones) - if zones - else None + partner_id, + zone_id, + camera_id, + active_only=True, + params=params, + expanding_params=expanding_params, ) - occupancy_values = [ - _occupancy_percent(obs.occupied, obs.capacity) - for obs in observations - ] - all_intervals: list[float] = [] - for zone_observations in observations_by_zone.values(): - all_intervals.extend(_intervals_for_observations(zone_observations)) - - update_times: list[datetime] = [] - for zone in zones: - latest_obs = _latest_observation(observations_by_zone.get(zone.parking_zone_id, [])) - last_update = latest_obs.observed_at if latest_obs is not None else zone.occupancy_updated_at - if last_update is not None: - update_times.append(_to_utc(last_update)) - - scoped_observations = [obs for obs in observations if obs.zone_id in zone_ids] + row = _execute_sql( + db, + f""" + WITH scoped_zones AS ( + SELECT + pz.parking_zone_id AS zone_id, + pz.capacity, + pz.occupied, + pz.occupancy_updated_at + FROM parking_zones pz + LEFT JOIN cameras c ON c.camera_id = pz.camera_id + WHERE {zone_where} + ), + scoped_obs AS ( + SELECT + oo.zone_id, + oo.observation_id, + oo.capacity, + oo.occupied, + oo.confidence, + oo.observed_at, + oo.ingested_at + FROM occupancy_observations oo + JOIN parking_zones pz ON pz.parking_zone_id = oo.zone_id + LEFT JOIN cameras c ON c.camera_id = COALESCE(oo.camera_id, pz.camera_id) + WHERE {observation_where} + ), + latest_by_zone AS ( + SELECT DISTINCT ON (zone_id) + zone_id, + observed_at + FROM scoped_obs + ORDER BY zone_id, observed_at DESC, ingested_at DESC, observation_id DESC + ), + intervals AS ( + SELECT + EXTRACT(EPOCH FROM ( + observed_at - LAG(observed_at) OVER ( + PARTITION BY zone_id + ORDER BY observed_at, ingested_at, observation_id + ) + )) AS interval_sec + FROM scoped_obs + ), + zone_updates AS ( + SELECT + sz.zone_id, + COALESCE(lbz.observed_at, sz.occupancy_updated_at) AS last_update_at + FROM scoped_zones sz + LEFT JOIN latest_by_zone lbz ON lbz.zone_id = sz.zone_id + ) + SELECT + COUNT(sz.zone_id)::integer AS active_zones_count, + COALESCE(SUM(sz.capacity), 0)::integer AS total_capacity, + CASE WHEN COUNT(sz.zone_id) = 0 THEN NULL ELSE SUM(sz.occupied)::integer END AS current_occupied_count, + CASE + WHEN COUNT(sz.zone_id) = 0 THEN NULL + ELSE SUM(GREATEST(sz.capacity - sz.occupied, 0))::integer + END AS current_free_count, + ( + SELECT ROUND(AVG( + CASE + WHEN so.capacity > 0 THEN so.occupied::double precision / so.capacity * 100.0 + ELSE NULL + END + )::numeric, 2)::double precision + FROM scoped_obs so + ) AS avg_occupancy_percent, + (SELECT MAX(last_update_at) FROM zone_updates) AS freshest_update_at, + (SELECT MIN(last_update_at) FROM zone_updates WHERE last_update_at IS NOT NULL) AS oldest_update_at, + ( + SELECT ROUND(AVG(interval_sec)::numeric, 2)::double precision + FROM intervals + WHERE interval_sec IS NOT NULL AND interval_sec >= 0 + ) AS avg_update_interval_sec, + ( + SELECT ROUND(MAX(interval_sec)::numeric, 2)::double precision + FROM intervals + WHERE interval_sec IS NOT NULL AND interval_sec >= 0 + ) AS max_update_interval_sec, + ( + SELECT ROUND(AVG(confidence)::numeric, 2)::double precision + FROM scoped_obs + ) AS avg_confidence + FROM scoped_zones sz + """, + params, + expanding_params, + ).one() + data = row._mapping return AnalyticsSummary( - active_zones_count=len(zones), - total_capacity=total_capacity, - current_occupied_count=current_occupied_count, - current_free_count=current_free_count, - avg_occupancy_percent=_avg(occupancy_values), - freshest_update_at=max(update_times) if update_times else None, - oldest_update_at=min(update_times) if update_times else None, - avg_update_interval_sec=round(sum(all_intervals) / len(all_intervals), 2) if all_intervals else None, - max_update_interval_sec=round(max(all_intervals), 2) if all_intervals else None, - avg_confidence=_avg([obs.confidence for obs in scoped_observations]), + active_zones_count=data["active_zones_count"], + total_capacity=data["total_capacity"], + current_occupied_count=data["current_occupied_count"], + current_free_count=data["current_free_count"], + avg_occupancy_percent=_as_float(data["avg_occupancy_percent"]), + freshest_update_at=data["freshest_update_at"], + oldest_update_at=data["oldest_update_at"], + avg_update_interval_sec=_as_float(data["avg_update_interval_sec"]), + max_update_interval_sec=_as_float(data["max_update_interval_sec"]), + avg_confidence=_as_float(data["avg_confidence"]), ) @@ -778,43 +1006,65 @@ def get_occupancy_history( _validate_common_filters(db, current_user, partner_id, zone_id, camera_id) actual_granularity = _normalize_granularity(granularity, from_, to) - observations = _observation_query( - db, + params: dict[str, Any] = { + "from_": from_, + "to": to, + "bucket_seconds": GRANULARITY_SECONDS[actual_granularity], + } + expanding_params: set[str] = set() + observation_where = _observation_where_sql( current_user, - from_, - to, - partner_id=partner_id, - zone_id=zone_id, - camera_id=camera_id, - ).order_by(OccupancyObservation.observed_at.asc()).all() - zone_camera_ids = _zone_camera_map(db, {obs.zone_id for obs in observations}) + partner_id, + zone_id, + camera_id, + active_only=False, + params=params, + expanding_params=expanding_params, + ) + bucket_expr = _bucket_sql("oo.observed_at") - groups: dict[tuple[datetime, int, int], list[OccupancyObservation]] = defaultdict(list) - for obs in observations: - camera = obs.camera_id or zone_camera_ids.get(obs.zone_id) - if camera is None: - continue - groups[(_bucket_start(obs.observed_at, actual_granularity), obs.zone_id, camera)].append(obs) + rows = _execute_sql( + db, + f""" + SELECT + {bucket_expr} AS timestamp, + oo.zone_id AS zone_id, + COALESCE(oo.camera_id, pz.camera_id) AS camera_id, + ROUND(AVG(oo.occupied))::integer AS occupied_count, + GREATEST(MAX(oo.capacity) - ROUND(AVG(oo.occupied))::integer, 0)::integer AS free_count, + MAX(oo.capacity)::integer AS capacity, + ROUND(AVG( + CASE + WHEN oo.capacity > 0 THEN oo.occupied::double precision / oo.capacity * 100.0 + ELSE NULL + END + )::numeric, 2)::double precision AS occupancy_percent, + ROUND(AVG(oo.confidence)::numeric, 2)::double precision AS confidence_avg, + COUNT(*)::integer AS observations_count + FROM occupancy_observations oo + JOIN parking_zones pz ON pz.parking_zone_id = oo.zone_id + LEFT JOIN cameras c ON c.camera_id = COALESCE(oo.camera_id, pz.camera_id) + WHERE {observation_where} + GROUP BY timestamp, oo.zone_id, COALESCE(oo.camera_id, pz.camera_id) + ORDER BY timestamp ASC, oo.zone_id ASC, COALESCE(oo.camera_id, pz.camera_id) ASC + """, + params, + expanding_params, + ).all() - points: list[OccupancyHistoryPoint] = [] - for (timestamp, grouped_zone_id, grouped_camera_id), grouped_observations in sorted(groups.items()): - latest = _latest_observation(grouped_observations) - occupied_count = _avg_int([obs.occupied for obs in grouped_observations]) - capacity = latest.capacity if latest is not None else 0 - occupancy_values = [ - _occupancy_percent(obs.occupied, obs.capacity) - for obs in grouped_observations - ] + points = [] + for row in rows: + data = row._mapping points.append(OccupancyHistoryPoint( - timestamp=timestamp, - zone_id=grouped_zone_id, - camera_id=grouped_camera_id, - occupied_count=occupied_count, - free_count=_free_count(capacity, occupied_count), - capacity=capacity, - occupancy_percent=_avg(occupancy_values), - confidence_avg=_avg([obs.confidence for obs in grouped_observations]), - observations_count=len(grouped_observations), + timestamp=data["timestamp"], + zone_id=data["zone_id"], + camera_id=data["camera_id"], + occupied_count=data["occupied_count"], + free_count=data["free_count"], + capacity=data["capacity"], + occupancy_percent=_as_float(data["occupancy_percent"]), + confidence_avg=_as_float(data["confidence_avg"]), + observations_count=data["observations_count"], )) return OccupancyHistoryResponse(granularity=actual_granularity, points=points) @@ -837,43 +1087,82 @@ def get_occupancy_forecast( from_, to = _validate_period(from_, to) _validate_common_filters(db, current_user, partner_id, zone_id, None) - forecasts = _forecast_query( - db, + params: dict[str, Any] = {"from_": from_, "to": to} + expanding_params: set[str] = set() + forecast_where = _forecast_where_sql( current_user, - from_, - to, - partner_id=partner_id, - zone_id=zone_id, - forecast_created_at=forecast_created_at, - ).order_by(Forecast.predicted_for.asc(), Forecast.generated_at.desc()).all() - - if forecast_created_at is None: - forecasts = _latest_forecasts(forecasts) + partner_id, + zone_id, + None, + forecast_created_at, + active_only=False, + params=params, + expanding_params=expanding_params, + ) + selected_filter = "ranked.rn = 1" if forecast_created_at is None else "TRUE" - forecasts = sorted(forecasts, key=lambda item: (_to_utc(item.predicted_for), item.forecast_id)) + rows = _execute_sql( + db, + f""" + WITH ranked AS ( + SELECT + f.forecast_id, + f.zone_id, + f.predicted_for, + f.capacity, + f.predicted_occupied, + f.model_version, + f.generated_at, + ROW_NUMBER() OVER ( + PARTITION BY f.zone_id, f.predicted_for + ORDER BY f.generated_at DESC, f.forecast_id DESC + ) AS rn + FROM forecasts f + JOIN parking_zones pz ON pz.parking_zone_id = f.zone_id + LEFT JOIN cameras c ON c.camera_id = COALESCE(f.camera_id, pz.camera_id) + WHERE {forecast_where} + ) + SELECT + predicted_for AS timestamp, + zone_id, + predicted_occupied::double precision AS predicted_occupied_count, + GREATEST(capacity - predicted_occupied, 0)::double precision AS predicted_free_count, + ROUND( + CASE + WHEN capacity > 0 THEN predicted_occupied::double precision / capacity * 100.0 + ELSE NULL + END::numeric, + 2 + )::double precision AS predicted_occupancy_percent, + model_version, + generated_at AS forecast_created_at + FROM ranked + WHERE {selected_filter} + ORDER BY predicted_for ASC, forecast_id ASC + """, + params, + expanding_params, + ).all() - if not forecasts: + if not rows: return OccupancyForecastResponse( available=False, reason="insufficient_history", points=[], ) - points = [ - OccupancyForecastPoint( - timestamp=_to_utc(forecast.predicted_for), - zone_id=forecast.zone_id, - predicted_occupied_count=float(forecast.predicted_occupied), - predicted_free_count=float(max(forecast.capacity - forecast.predicted_occupied, 0)), - predicted_occupancy_percent=_occupancy_percent( - forecast.predicted_occupied, - forecast.capacity, - ), - model_version=forecast.model_version, - forecast_created_at=_to_utc(forecast.generated_at), - ) - for forecast in forecasts - ] + points = [] + for row in rows: + data = row._mapping + points.append(OccupancyForecastPoint( + timestamp=data["timestamp"], + zone_id=data["zone_id"], + predicted_occupied_count=_as_float(data["predicted_occupied_count"]), + predicted_free_count=_as_float(data["predicted_free_count"]), + predicted_occupancy_percent=_as_float(data["predicted_occupancy_percent"]), + model_version=data["model_version"], + forecast_created_at=data["forecast_created_at"], + )) return OccupancyForecastResponse(available=True, reason=None, points=points)