Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions analyzer/database_introspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class TableInfo:
schema: str = "public"
row_count: Optional[int] = None
size_mb: Optional[float] = None
last_analyzed: Optional[str] = None # ISO timestamp of last stats refresh, or None
columns: List[Dict[str, Any]] = None
indexes: List[Dict[str, Any]] = None
foreign_keys: List[Dict[str, Any]] = None
Expand Down Expand Up @@ -180,6 +181,19 @@ def get_tables(self, schema: str = None) -> List[TableInfo]:
f"Could not get size info for table {table_name}: {e}"
)

# Best-effort planner statistics: number-of-distinct-values
# per column and the last stats-refresh time. Both degrade
# to absent (None / unset) on backends or permission sets
# that can't report them — never raises.
ndv = self._get_column_ndv(cursor, table_name)
if ndv:
for col in table.columns:
if col["name"] in ndv:
col["ndv"] = ndv[col["name"]]
table.last_analyzed = self._get_table_stats_freshness(
cursor, table_name
)

tables.append(table)
self._tables_cache[table_name] = table

Expand Down Expand Up @@ -313,6 +327,106 @@ def _get_table_size(self, cursor, table_name: str) -> Optional[float]:
logger.debug(f"Error getting size for {table_name}: {e}")
return None

def _get_column_ndv(self, cursor, table_name: str) -> Dict[str, Optional[int]]:
"""Best-effort number-of-distinct-values per column from planner stats.

Returns ``{column_name: ndv}`` where ndv is an absolute estimate (int)
or ``None`` when the backend reports it but can't quantify. Columns
absent from the result simply have no NDV known.

* PostgreSQL: ``pg_stats.n_distinct`` — positive = absolute estimate;
negative = fraction of row_count (PG convention), which we convert.
* MySQL: ``information_schema.STATISTICS.CARDINALITY`` for the leading
column of each index (the only place MySQL exposes per-column NDV).
* SQLite: no per-column NDV available → ``{}``.
"""
engine = self.config["engine"]
try:
if engine == "postgresql":
cursor.execute(
"""
SELECT attname, n_distinct
FROM pg_stats
WHERE tablename = %s
""",
[table_name],
)
out: Dict[str, Optional[int]] = {}
row_count = self._get_table_row_count(cursor, table_name) or 0
for col_name, n_distinct in cursor.fetchall():
if n_distinct is None:
out[col_name] = None
elif n_distinct >= 0:
out[col_name] = int(n_distinct)
else:
# Negative: fraction of total rows.
out[col_name] = int(round(abs(n_distinct) * row_count))
return out
elif engine == "mysql":
cursor.execute(
"""
SELECT column_name, MAX(cardinality)
FROM information_schema.statistics
WHERE table_name = %s AND seq_in_index = 1
GROUP BY column_name
""",
[table_name],
)
return {
name: (int(card) if card is not None else None)
for name, card in cursor.fetchall()
}
else: # sqlite and others
return {}
except Exception as e:
logger.debug(f"Error getting NDV for {table_name}: {e}")
return {}

def _get_table_stats_freshness(self, cursor, table_name: str) -> Optional[str]:
"""Best-effort ISO timestamp of the table's last statistics refresh.

* PostgreSQL: ``pg_stat_user_tables.last_analyze`` (manual ANALYZE),
coalesced with ``last_autoanalyze``.
* MySQL (InnoDB): ``mysql.innodb_table_stats.last_update`` — the real
persistent-stats refresh time. NOT ``information_schema.tables``
``.update_time``, which is the last *data* write, not a stats refresh.
If the user lacks read on ``mysql.innodb_table_stats`` this returns
``None`` (the staleness insight then reports "skipped").
* SQLite: no ANALYZE timestamp is tracked → ``None``.
"""
engine = self.config["engine"]
try:
if engine == "postgresql":
cursor.execute(
"""
SELECT COALESCE(last_analyze, last_autoanalyze)
FROM pg_stat_user_tables
WHERE relname = %s
""",
[table_name],
)
elif engine == "mysql":
cursor.execute(
"""
SELECT last_update
FROM mysql.innodb_table_stats
WHERE table_name = %s
""",
[table_name],
)
else: # sqlite and others
return None

result = cursor.fetchone()
if result and result[0] is not None:
value = result[0]
# psycopg/mysqlclient return datetime objects.
return value.isoformat() if hasattr(value, "isoformat") else str(value)
return None
except Exception as e:
logger.debug(f"Error getting stats freshness for {table_name}: {e}")
return None

def analyze_query_context(self, sql_query: str) -> Dict[str, Any]:
"""
Analyze query in the context of the actual database schema.
Expand Down
20 changes: 20 additions & 0 deletions analyzer/migrations/0007_queryanalysis_schema_insights.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 5.2.7 on 2026-05-21 08:40

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("analyzer", "0006_mlalert"),
]

operations = [
migrations.AddField(
model_name="queryanalysis",
name="schema_insights",
field=models.JSONField(
blank=True, default=dict, help_text="Schema-aware non-index insights"
),
),
]
3 changes: 3 additions & 0 deletions analyzer/models/query_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class QueryAnalysis(models.Model):
index_recommendations = models.JSONField(
default=dict, blank=True, help_text="Schema-aware index recommendations"
)
schema_insights = models.JSONField(
default=dict, blank=True, help_text="Schema-aware non-index insights"
)
performance_notes = models.TextField(
blank=True, help_text="Performance analysis notes"
)
Expand Down
20 changes: 16 additions & 4 deletions analyzer/services/live_schema_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
logger = logging.getLogger(__name__)

CACHE_TTL_SECONDS = 60 * 60 * 2 # 2 hours, matching query_analysis_cache convention
CACHE_PREFIX = "live_schema:v1:"
# v2: snapshot now carries per-column NDV (in column dicts) and per-table
# ``last_analyzed``. The prefix bump retires v1 entries (they age out within the
# TTL) so the first grade per connection re-introspects with the richer payload.
CACHE_PREFIX = "live_schema:v2:"


@dataclass
Expand All @@ -54,7 +57,8 @@ class TableSnapshot:
schema: str = ""
row_count: Optional[int] = None
size_mb: Optional[float] = None
columns: List[Dict] = field(default_factory=list)
last_analyzed: Optional[str] = None # ISO timestamp of last stats refresh
columns: List[Dict] = field(default_factory=list) # each may carry an "ndv" key
indexes: List[IndexSnapshot] = field(default_factory=list)
foreign_keys: List[Dict] = field(default_factory=list)

Expand Down Expand Up @@ -99,13 +103,19 @@ def hydrate_statistics_manager(self, manager) -> None:
)

for tbl in self.tables.values():
last_analyzed = datetime.utcnow()
if tbl.last_analyzed:
try:
last_analyzed = datetime.fromisoformat(tbl.last_analyzed)
except (ValueError, TypeError):
pass
manager.table_stats[tbl.name] = TableStatistics(
table_name=tbl.name,
row_count=tbl.row_count or 0,
page_count=0,
avg_row_size=0,
total_size_mb=tbl.size_mb or 0.0,
last_analyzed=datetime.utcnow(),
last_analyzed=last_analyzed,
)
for idx in tbl.indexes:
manager.index_stats[tbl.name].append(
Expand All @@ -126,12 +136,13 @@ def hydrate_statistics_manager(self, manager) -> None:
)
)
for col in tbl.columns:
ndv = col.get("ndv")
manager.column_stats[tbl.name][col["name"]] = ColumnStatistics(
column_name=col["name"],
table_name=tbl.name,
data_type=str(col.get("type", "")),
nullable=bool(col.get("nullable", True)),
distinct_values=tbl.row_count or 0,
distinct_values=(ndv if ndv is not None else (tbl.row_count or 0)),
null_percentage=0.0,
avg_length=col.get("max_length") or 0,
max_length=col.get("max_length") or 0,
Expand Down Expand Up @@ -229,6 +240,7 @@ def build_live_context(
schema=t.schema,
row_count=t.row_count,
size_mb=t.size_mb,
last_analyzed=t.last_analyzed,
columns=list(t.columns),
indexes=[
IndexSnapshot(
Expand Down
Loading
Loading