From a49b7d9031f4462157e89de31a6b90459b522247 Mon Sep 17 00:00:00 2001 From: Spyros Date: Wed, 10 Jun 2026 08:48:56 +0300 Subject: [PATCH 1/5] fix(web): restore Heroku cache-only map_analysis and add GA health monitor PR #823 reintroduced Neo4j/Redis fallback on Heroku cache misses, causing 503s when Neo4j DNS fails. Serve precomputed GA from Postgres only on Heroku and return 404 on cache miss. Add monitor_ga_health.py for production regression alerting (especially HTTP 503). Fixes OWASP/OpenCRE#923 Co-authored-by: Cursor --- Makefile | 6 + application/tests/monitor_ga_health_test.py | 18 ++ application/tests/web_main_test.py | 38 +++- application/web/web_main.py | 66 ++---- scripts/monitor_ga_health.py | 239 ++++++++++++++++++++ 5 files changed, 322 insertions(+), 45 deletions(-) create mode 100644 application/tests/monitor_ga_health_test.py create mode 100644 scripts/monitor_ga_health.py diff --git a/Makefile b/Makefile index 86c88a868..2a22dbd31 100644 --- a/Makefile +++ b/Makefile @@ -153,6 +153,12 @@ verify-ga-complete-prod: --base-url "https://opencre.org" \ --output-json "$(CURDIR)/tmp/prod-ga-completeness.json" +monitor-ga-health-prod: + @[ -d "./.venv" ] && . ./.venv/bin/activate || ([ -d "./venv" ] && . ./venv/bin/activate); \ + python scripts/monitor_ga_health.py \ + --base-url "https://opencre.org" \ + --output-json "$(CURDIR)/tmp/prod-ga-health.json" + verify-ga-parity-local: @[ -d "./.venv" ] && . ./.venv/bin/activate || ([ -d "./venv" ] && . ./venv/bin/activate); \ export CRE_CACHE_FILE="$${CRE_CACHE_FILE:-postgresql://cre:password@127.0.0.1:5432/cre}"; \ diff --git a/application/tests/monitor_ga_health_test.py b/application/tests/monitor_ga_health_test.py new file mode 100644 index 000000000..aa5c1ac7b --- /dev/null +++ b/application/tests/monitor_ga_health_test.py @@ -0,0 +1,18 @@ +import unittest + +from scripts import monitor_ga_health as monitor + + +class MonitorGaHealthTest(unittest.TestCase): + def test_material_result_detection(self) -> None: + self.assertTrue(monitor._http_gap_result_is_material({"a": 1})) + self.assertFalse(monitor._http_gap_result_is_material({})) + self.assertFalse(monitor._http_gap_result_is_material(None)) + + def test_503_bucket_is_regression_marker(self) -> None: + bucket = "http_503_regression" + self.assertEqual(bucket, "http_503_regression") + + +if __name__ == "__main__": + unittest.main() diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 5a1fc14ce..5568d0f35 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -743,23 +743,42 @@ def test_gap_analysis_fallback_backend_failure_returns_503( redis_conn_mock.side_effect = RuntimeError("redis down") db_gap_analysis_mock.side_effect = RuntimeError("neo unavailable") db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + with self.app.test_client() as client: + with patch.dict(os.environ, {}, clear=False): + os.environ.pop("DYNO", None) + os.environ.pop("HEROKU", None) + response = client.get( + "/rest/v1/map_analysis?standard=aaa&standard=bbb", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(503, response.status_code) + db_gap_analysis_mock.assert_called_once() + + @patch.dict(os.environ, {"HEROKU": "True"}, clear=False) + @patch.object(db, "Node_collection") + @patch.object(redis, "from_url") + def test_gap_analysis_heroku_cache_miss_returns_404( + self, redis_conn_mock, db_mock + ) -> None: + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False with self.app.test_client() as client: response = client.get( "/rest/v1/map_analysis?standard=aaa&standard=bbb", headers={"Content-Type": "application/json"}, ) - self.assertEqual(503, response.status_code) - db_gap_analysis_mock.assert_called_once() + self.assertEqual(404, response.status_code) + redis_conn_mock.assert_not_called() @patch.dict(os.environ, {"DYNO": "web.1"}, clear=False) @patch.object(db, "Node_collection") @patch.object(redis, "from_url") - def test_gap_analysis_dyno_missing_standard_returns_404( + def test_gap_analysis_dyno_cache_miss_returns_404( self, redis_conn_mock, db_mock ) -> None: db_mock.return_value.get_gap_analysis_result.return_value = None db_mock.return_value.gap_analysis_exists.return_value = False - db_mock.return_value.standards.return_value = ["aaa"] with self.app.test_client() as client: response = client.get( "/rest/v1/map_analysis?standard=aaa&standard=bbb", @@ -768,6 +787,17 @@ def test_gap_analysis_dyno_missing_standard_returns_404( self.assertEqual(404, response.status_code) redis_conn_mock.assert_not_called() + @patch.dict(os.environ, {"HEROKU": "True"}, clear=False) + @patch.object(db, "Node_collection") + def test_map_analysis_opencre_heroku_cache_miss_returns_404(self, db_mock) -> None: + db_mock.return_value.gap_analysis_exists.return_value = False + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=bbb", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(404, response.status_code) + @patch.object(redis, "from_url") @patch.object(db, "Node_collection") def test_standards_from_db(self, node_mock, redis_conn_mock) -> None: diff --git a/application/web/web_main.py b/application/web/web_main.py index 273ad19ed..530a917be 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -40,7 +40,6 @@ session, send_file, ) -from werkzeug.exceptions import HTTPException from google.oauth2 import id_token from google_auth_oauthlib.flow import Flow from application.utils.spreadsheet import write_csv @@ -72,6 +71,11 @@ def _ga_timeout_seconds() -> int: return 129600 +def _is_heroku_deploy() -> bool: + """True on Heroku/read-only web dynos where GA must be served from SQL cache only.""" + return os.environ.get("DYNO") is not None or bool(os.environ.get("HEROKU")) + + def _compute_ga_without_redis( database: db.Node_collection, standards: list[str] ) -> dict: @@ -421,8 +425,16 @@ def map_analysis() -> Any: standards = standards[:2] standards_hash = gap_analysis.make_resources_key(standards) - # ----- PR #825: OpenCRE fast path ----- + # ----- PR #825: OpenCRE fast path (SQL cache only on Heroku) ----- if OPENCRE_STANDARD_NAME in standards: + if database.gap_analysis_exists(standards_hash): + cached = database.get_gap_analysis_result(cache_key=standards_hash) + if cached: + parsed = json.loads(cached) + if "result" in parsed: + return jsonify({"result": parsed.get("result")}) + if _is_heroku_deploy(): + abort(404, "No such Cache") direct_gap_analysis = _build_direct_cre_overlap_map_analysis( standards, standards_hash, database ) @@ -439,27 +451,13 @@ def map_analysis() -> Any: if "result" in parsed: return jsonify({"result": parsed.get("result")}) - # On Heroku/read-only deployments, verify standards before attempting - # Redis or graph-backed fallback work. - is_heroku = os.environ.get("DYNO") is not None - if is_heroku: - try: - existing_standards = database.standards() - if isinstance(existing_standards, (list, tuple, set)): - existing_lower = {str(s).lower() for s in existing_standards} - missing = [s for s in standards if str(s).lower() not in existing_lower] - if missing: - logger.info( - f"On Heroku: gap analysis request {standards_hash} references " - f"standards that do not exist: {', '.join(missing)}, returning 404" - ) - abort( - 404, f"One or more standards do not exist: {', '.join(missing)}" - ) - except HTTPException: - raise - except Exception as exc: - logger.warning(f"Could not verify standards existence on Heroku: {exc}") + # Heroku serves precomputed GA from Postgres only — never Redis, RQ, or Neo4j. + if _is_heroku_deploy(): + logger.info( + "On Heroku: gap analysis cache miss for %s, returning 404", + standards_hash, + ) + abort(404, "No such Cache") if os.environ.get("CRE_NO_CALCULATE_GAP_ANALYSIS"): logger.info( @@ -513,25 +511,11 @@ def map_analysis() -> Any: cache_key, exc, ) - # NEW: fallback — compute gap analysis directly in the database try: - db.gap_analysis( - neo_db=database.neo_db, - node_names=standards, - cache_key=cache_key, - ) - cached = database.get_gap_analysis_result(cache_key=cache_key) - if cached: - parsed = json.loads(cached) - if "result" in parsed: - return jsonify({"result": parsed["result"]}) - except Exception: - logger.exception( - "Database gap analysis fallback failed for %s", - cache_key, - ) - abort(503, "Database/graph backend unavailable") - abort(404, "Gap analysis result not found") + return jsonify(_compute_ga_without_redis(database, standards)) + except Exception as fallback_exc: + logger.exception("Synchronous GA fallback failed for %s", cache_key) + abort(503, f"Gap analysis unavailable: {fallback_exc}") @app.route("/rest/v1/map_analysis_weak_links", methods=["GET"]) diff --git a/scripts/monitor_ga_health.py b/scripts/monitor_ga_health.py new file mode 100644 index 000000000..de8911e7e --- /dev/null +++ b/scripts/monitor_ga_health.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +""" +Monitor production (or staging) gap-analysis health over HTTP. + +Alerts when map_analysis responses are incomplete, especially HTTP 503 which +indicates the Heroku Neo4j/Redis fallback regression path. + +Exit codes: + 0 — all directed GA pairs return 200 with material ``result`` + 1 — incomplete pairs and/or 503 responses detected + 2 — configuration or request failure +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import urllib.error +import urllib.parse +import urllib.request +from collections import Counter +from typing import Any + + +def _http_gap_result_is_material(result: Any) -> bool: + if result is None: + return False + if isinstance(result, dict): + return len(result) > 0 + if isinstance(result, list): + return len(result) > 0 + return bool(result) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--base-url", + default=os.environ.get("OPENCRE_GA_MONITOR_BASE_URL", "https://opencre.org"), + help="OpenCRE base URL (default: https://opencre.org)", + ) + parser.add_argument( + "--timeout-seconds", + type=int, + default=int(os.environ.get("OPENCRE_GA_MONITOR_TIMEOUT", "40")), + help="HTTP timeout in seconds (default: 40)", + ) + parser.add_argument( + "--output-json", + default="", + help="Optional output JSON report path", + ) + parser.add_argument( + "--max-failures-print", + type=int, + default=50, + help="Max individual incomplete pairs to print (default: 50)", + ) + parser.add_argument( + "--webhook-url", + default=os.environ.get("OPENCRE_GA_MONITOR_WEBHOOK_URL", ""), + help="Optional webhook URL for alert payload (JSON POST)", + ) + return parser.parse_args() + + +def _get_json(url: str, timeout: int) -> Any: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def _check_pair( + base_rest: str, sa: str, sb: str, timeout: int +) -> dict[str, Any] | None: + if sa == sb: + return None + query = urllib.parse.urlencode([("standard", sa), ("standard", sb)]) + url = f"{base_rest}/map_analysis?{query}" + try: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=timeout) as resp: + code = resp.status + body = resp.read().decode("utf-8", errors="replace") + except urllib.error.HTTPError as exc: + code = exc.code + body = (exc.read() or b"").decode("utf-8", errors="replace") + except Exception as exc: + return { + "pair": f"{sa}->{sb}", + "status_code": None, + "bucket": "request_exception", + "error": str(exc), + } + + body_preview = body.strip().replace("\n", " ")[:200] + if code != 200: + bucket = f"http_{code}" + if code == 503: + bucket = "http_503_regression" + return { + "pair": f"{sa}->{sb}", + "status_code": code, + "bucket": bucket, + "body_preview": body_preview, + } + + try: + payload = json.loads(body) + except json.JSONDecodeError: + return { + "pair": f"{sa}->{sb}", + "status_code": 200, + "bucket": "invalid_json_200", + "body_preview": body_preview, + } + + result = payload.get("result") + if _http_gap_result_is_material(result): + return None + if result is not None and not _http_gap_result_is_material(result): + return { + "pair": f"{sa}->{sb}", + "status_code": 200, + "bucket": "empty_result_200", + "body_preview": body_preview, + } + if payload.get("job_id"): + return { + "pair": f"{sa}->{sb}", + "status_code": 200, + "bucket": "job_id_only", + "job_id": payload.get("job_id"), + } + return { + "pair": f"{sa}->{sb}", + "status_code": 200, + "bucket": "missing_result", + "body_preview": body_preview, + } + + +def _post_webhook(webhook_url: str, payload: dict[str, Any]) -> None: + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + webhook_url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=30) as resp: + if resp.status >= 400: + raise RuntimeError(f"webhook returned HTTP {resp.status}") + + +def main() -> int: + args = _parse_args() + base_url = args.base_url.rstrip("/") + rest = f"{base_url}/rest/v1" + timeout = args.timeout_seconds + + try: + standards = _get_json(f"{rest}/ga_standards", timeout) + except Exception as exc: + print(f"Failed to fetch ga_standards from {base_url}: {exc}", file=sys.stderr) + return 2 + if not isinstance(standards, list): + print("ga_standards response is not a list", file=sys.stderr) + return 2 + + failures: list[dict[str, Any]] = [] + success_count = 0 + total_pairs = 0 + bucket_counts: Counter[str] = Counter() + + for sa in standards: + for sb in standards: + if sa == sb: + continue + total_pairs += 1 + item = _check_pair(rest, sa, sb, timeout) + if item is None: + success_count += 1 + bucket_counts["ok_result"] += 1 + else: + failures.append(item) + bucket_counts[item["bucket"]] += 1 + + report: dict[str, Any] = { + "base_url": base_url, + "ga_standards_count": len(standards), + "directed_pairs_tested": total_pairs, + "complete_pairs": success_count, + "incomplete_pairs": len(failures), + "buckets": dict(bucket_counts), + "incomplete_examples": failures[: args.max_failures_print], + "alert": len(failures) > 0, + "regression_503_count": bucket_counts.get("http_503_regression", 0), + } + + print( + f"GA health check for {base_url}: " + f"{success_count}/{total_pairs} complete, {len(failures)} incomplete" + ) + if bucket_counts.get("http_503_regression"): + print( + f"ALERT: {bucket_counts['http_503_regression']} pair(s) returned HTTP 503 " + "(Heroku Neo4j/Redis fallback regression)" + ) + if failures: + print("Incomplete buckets:") + for key, count in sorted(bucket_counts.items(), key=lambda kv: (-kv[1], kv[0])): + if key == "ok_result": + continue + print(f" - {key}: {count}") + print("Incomplete pair samples:") + for item in failures[: args.max_failures_print]: + print(f" - {item['pair']} [{item['bucket']}]") + + if args.output_json: + with open(args.output_json, "w", encoding="utf-8") as handle: + json.dump(report, handle, indent=2) + print(f"Wrote report: {args.output_json}") + + if args.webhook_url and failures: + try: + _post_webhook(args.webhook_url, report) + print(f"Posted alert to webhook ({len(failures)} incomplete pairs)") + except Exception as exc: + print(f"Webhook alert failed: {exc}", file=sys.stderr) + return 2 + + return 0 if not failures else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) From 4304a5a9f16dc0d52628406048db68c76567c77b Mon Sep 17 00:00:00 2001 From: Spyros Date: Wed, 10 Jun 2026 08:56:26 +0300 Subject: [PATCH 2/5] fix(scripts): send User-Agent in GA health monitor HTTP requests Cloudflare blocks anonymous urllib requests to ga_standards on production. Co-authored-by: Cursor --- scripts/monitor_ga_health.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/scripts/monitor_ga_health.py b/scripts/monitor_ga_health.py index de8911e7e..c7fc8ae37 100644 --- a/scripts/monitor_ga_health.py +++ b/scripts/monitor_ga_health.py @@ -66,8 +66,14 @@ def _parse_args() -> argparse.Namespace: return parser.parse_args() +_DEFAULT_HEADERS = { + "Accept": "application/json", + "User-Agent": "OpenCRE-GA-Monitor/1.0 (+https://opencre.org)", +} + + def _get_json(url: str, timeout: int) -> Any: - req = urllib.request.Request(url, headers={"Accept": "application/json"}) + req = urllib.request.Request(url, headers=_DEFAULT_HEADERS) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) @@ -80,7 +86,7 @@ def _check_pair( query = urllib.parse.urlencode([("standard", sa), ("standard", sb)]) url = f"{base_rest}/map_analysis?{query}" try: - req = urllib.request.Request(url, headers={"Accept": "application/json"}) + req = urllib.request.Request(url, headers=_DEFAULT_HEADERS) with urllib.request.urlopen(req, timeout=timeout) as resp: code = resp.status body = resp.read().decode("utf-8", errors="replace") From 1e009d95ed6901460ff1a35aa4927e5faa136c91 Mon Sep 17 00:00:00 2001 From: Spyros Date: Wed, 10 Jun 2026 08:59:09 +0300 Subject: [PATCH 3/5] docs(agents): track AGENTS.md with production GA cache-only policy Allow AGENTS.md through the *.md gitignore exception and document that Heroku/opencreorg gap analysis is cache-only (no compute on production). Co-authored-by: Cursor --- .gitignore | 1 + AGENTS.md | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 AGENTS.md diff --git a/.gitignore b/.gitignore index 831738958..f660f1339 100644 --- a/.gitignore +++ b/.gitignore @@ -64,6 +64,7 @@ standards_cache.sqlite ### Docs *.md +!AGENTS.md ### Dev DBDumps *.sql diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..24b54bf16 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,41 @@ +# OpenCRE Agent Instructions + +Cursor agents working in this repo must follow the rules in `.cursor/rules/`. + +## Quick start + +1. **Requirements gate** — If goal, success criteria, `@` file refs, or constraints are missing, stop and ask (`requirements-gate.mdc`). +2. **Plan first** — Non-trivial or multi-file work requires a plan and user approval before coding (`plan-first-workflow.mdc`, `multi-agent-workflow.mdc`). +3. **Verify** — After code changes, run checks and iterate until green (`verifiable-goals.mdc`): + - `make lint` + - `make mypy` + - `make test` + - `make frontend` (if frontend touched) +4. **Review** — Substantive work needs independent judge/subagent review (`multi-agent-workflow.mdc`). +5. **Production gap analysis** — On Heroku/opencreorg, gap analysis is cache-only: serve precomputed rows from Postgres; do not compute GA on production (cache miss → 404). + +## Rule index + +| Rule | Purpose | +|------|---------| +| `requirements-gate.mdc` | Clarifying questions + requirements template | +| `complete-ticket.mdc` | Ticket gate for `.md`/`.txt` files; uses `requirements-gate` template + coding standards | +| `plan-first-workflow.mdc` | Plan Mode before non-trivial edits | +| `multi-agent-workflow.mdc` | Big changes, approval gates, builder ≠ judge | +| `verifiable-goals.mdc` | Lint, mypy, test, CI — show evidence | +| `never-assume.mdc` | No guessing; complete code; minimal scope | +| `tdd-workflow.mdc` | Test-first for new behavior and importers | +| `autonomous-workflow.mdc` | Execute after approval; no unsolicited commits | +| `context-management.mdc` | `/clear`, `@` refs, stale context recovery | +| `production-db-ops-safety.mdc` | Destructive prod DB confirmation | +| `alembic-deploy-guardrail.mdc` | Pre-deploy migration guardrail | + +## OpenCRE commands + +```bash +make lint # black + frontend prettier +make mypy # Python typecheck +make test # Python unittest suite +make frontend # yarn build (when TS/TSX changed) +make alembic-guardrail # before deploy/migration ops +``` From c7cc9ccfa31f0049dbe8a6b185527086a9fae0f3 Mon Sep 17 00:00:00 2001 From: Spyros Date: Wed, 10 Jun 2026 08:59:42 +0300 Subject: [PATCH 4/5] fix(ga): block empty primary cache writes and clobbering Guard add_gap_analysis_result so non-material {"result":{}} primary rows are not inserted and cannot overwrite material cache; subresource keys unchanged. Co-authored-by: Cursor --- application/database/db.py | 17 +++++ application/tests/db_test.py | 62 ++++++++++++++++++- .../tests/gap_analysis_pair_job_test.py | 19 ++++++ application/utils/gap_analysis.py | 19 ++++++ 4 files changed, 115 insertions(+), 2 deletions(-) diff --git a/application/database/db.py b/application/database/db.py index 398eff4ee..dad2b0318 100644 --- a/application/database/db.py +++ b/application/database/db.py @@ -47,6 +47,7 @@ make_resources_key, make_subresources_key, primary_gap_analysis_payload_is_material, + should_persist_primary_gap_analysis_cache, ) @@ -2428,6 +2429,22 @@ def add_gap_analysis_result(self, cache_key: str, ga_object: str): .filter(GapAnalysisResults.cache_key == cache_key) .first() ) + if gap_analysis_cache_key_is_primary(cache_key): + existing_payload = existing.ga_object if existing else None + if not should_persist_primary_gap_analysis_cache( + ga_object, existing_payload + ): + if existing is None: + logger.info( + "Skipping empty primary gap analysis cache insert for %s", + cache_key, + ) + else: + logger.warning( + "Refusing non-material primary gap analysis update for %s", + cache_key, + ) + return if existing: existing.ga_object = ga_object self.session.add(existing) diff --git a/application/tests/db_test.py b/application/tests/db_test.py index 88f2cdbaf..87a1ec8f4 100644 --- a/application/tests/db_test.py +++ b/application/tests/db_test.py @@ -1327,15 +1327,73 @@ def test_gap_analysis_paths_without_base_standard_nodes(self, gap_mock): collection.gap_analysis_exists(make_resources_key(["788-788", "788-789"])), ) + def test_add_gap_analysis_result_skips_empty_primary_insert(self): + collection = db.Node_collection() + key = make_resources_key(["788-788", "b"]) + collection.add_gap_analysis_result(key, '{"result": {}}') + self.assertIsNone( + collection.session.query(db.GapAnalysisResults) + .filter(db.GapAnalysisResults.cache_key == key) + .first() + ) + self.assertFalse(collection.gap_analysis_exists(key)) + + def test_add_gap_analysis_result_does_not_clobber_material_primary(self): + collection = db.Node_collection() + key = make_resources_key(["788-788", "b"]) + material = '{"result": {"111-111": {"paths": {}}}}' + collection.add_gap_analysis_result(key, material) + collection.add_gap_analysis_result(key, '{"result": {}}') + row = ( + collection.session.query(db.GapAnalysisResults) + .filter(db.GapAnalysisResults.cache_key == key) + .first() + ) + self.assertEqual(row.ga_object, material) + self.assertTrue(collection.gap_analysis_exists(key)) + + def test_add_gap_analysis_result_allows_material_over_empty_primary(self): + collection = db.Node_collection() + key = make_resources_key(["788-788", "b"]) + collection.session.add( + db.GapAnalysisResults(cache_key=key, ga_object='{"result": {}}') + ) + collection.session.commit() + material = '{"result": {"111-111": {"paths": {}}}}' + collection.add_gap_analysis_result(key, material) + row = ( + collection.session.query(db.GapAnalysisResults) + .filter(db.GapAnalysisResults.cache_key == key) + .first() + ) + self.assertEqual(row.ga_object, material) + + def test_add_gap_analysis_result_allows_empty_subresource(self): + collection = db.Node_collection() + sub = make_subresources_key(["788-788", "b"], "111-111") + collection.add_gap_analysis_result(sub, '{"result": {}}') + row = ( + collection.session.query(db.GapAnalysisResults) + .filter(db.GapAnalysisResults.cache_key == sub) + .first() + ) + self.assertIsNotNone(row) + self.assertEqual(row.ga_object, '{"result": {}}') + @patch.object(db.NEO_DB, "gap_analysis") def test_gap_analysis_removes_stale_empty_primary_when_neo_empty(self, gap_mock): """Placeholder ``{"result":{}}`` rows must not survive a recompute with no Neo paths.""" collection = db.Node_collection() collection.neo_db.connected = True key = make_resources_key(["788-788", "b"]) - collection.add_gap_analysis_result(key, '{"result": {}}') + collection.session.add( + db.GapAnalysisResults(cache_key=key, ga_object='{"result": {}}') + ) sub = make_subresources_key(["788-788", "b"], "111-111") - collection.add_gap_analysis_result(sub, '{"result": {}}') + collection.session.add( + db.GapAnalysisResults(cache_key=sub, ga_object='{"result": {}}') + ) + collection.session.commit() self.assertFalse(collection.gap_analysis_exists(key)) gap_mock.return_value = ([], []) diff --git a/application/tests/gap_analysis_pair_job_test.py b/application/tests/gap_analysis_pair_job_test.py index bf50cb797..eeeb86007 100644 --- a/application/tests/gap_analysis_pair_job_test.py +++ b/application/tests/gap_analysis_pair_job_test.py @@ -27,6 +27,25 @@ def get_gap_analysis_result(self, cache_key: str): class TestGapAnalysisPairJob(unittest.TestCase): + def test_should_persist_primary_gap_analysis_cache(self): + g = gap_analysis + self.assertFalse( + g.should_persist_primary_gap_analysis_cache('{"result":{}}', None) + ) + self.assertFalse( + g.should_persist_primary_gap_analysis_cache( + '{"result":{}}', '{"result":{"k":1}}' + ) + ) + self.assertTrue( + g.should_persist_primary_gap_analysis_cache( + '{"result":{"k":1}}', '{"result":{}}' + ) + ) + self.assertTrue( + g.should_persist_primary_gap_analysis_cache('{"result":{"k":1}}', None) + ) + def test_primary_gap_analysis_payload_is_material(self): g = gap_analysis self.assertFalse(g.primary_gap_analysis_payload_is_material(None)) diff --git a/application/utils/gap_analysis.py b/application/utils/gap_analysis.py index 8a320a462..dfde5578e 100644 --- a/application/utils/gap_analysis.py +++ b/application/utils/gap_analysis.py @@ -59,6 +59,25 @@ def primary_gap_analysis_payload_is_material(ga_object: Optional[str]) -> bool: return bool(res) +def should_persist_primary_gap_analysis_cache( + ga_object: str, + existing_ga_object: Optional[str] = None, +) -> bool: + """ + True when a primary GA SQL cache write should be applied. + + Non-material empty ``{"result": {}}`` payloads must not be inserted and must + not overwrite an existing material row. + """ + if primary_gap_analysis_payload_is_material(ga_object): + return True + if existing_ga_object is None: + return False + if primary_gap_analysis_payload_is_material(existing_ga_object): + return False + return False + + def get_path_score(path): score = 0 previous_id = path["start"].id From 947db5f65f7de9c94848e42fdb0bd11603a9be6f Mon Sep 17 00:00:00 2001 From: Spyros Date: Wed, 10 Jun 2026 19:43:17 +0300 Subject: [PATCH 5/5] Restore GA sync script with material-only merge upsert. Supports postgres-to-postgres sync via temp-table merge for prod tables without a unique index on cache_key. Co-authored-by: Cursor --- scripts/sync_gap_analysis_table.py | 181 +++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 scripts/sync_gap_analysis_table.py diff --git a/scripts/sync_gap_analysis_table.py b/scripts/sync_gap_analysis_table.py new file mode 100644 index 000000000..6a6661f38 --- /dev/null +++ b/scripts/sync_gap_analysis_table.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +"""Copy material ``gap_analysis_results`` rows between databases (upsert only).""" + +from __future__ import annotations + +import argparse +import json +import sqlite3 +import sys +import urllib.parse +from typing import Iterable, List, Optional, Sequence, Tuple + +import psycopg2 +from psycopg2 import extras + + +def _normalize_pg_url(url: str) -> str: + if url.startswith("postgres://"): + return "postgresql://" + url[len("postgres://") :] + return url + + +def _pg_host_is_loopback(url: str) -> bool: + p = urllib.parse.urlparse(_normalize_pg_url(url)) + h = (p.hostname or "").lower() + return h in ("127.0.0.1", "localhost", "::1", "0.0.0.0") or h == "" + + +def _payload_is_material(ga_object: Optional[str]) -> bool: + if not ga_object or not isinstance(ga_object, str): + return False + try: + parsed = json.loads(ga_object) + except json.JSONDecodeError: + return False + res = parsed.get("result") + if res is None: + return False + if isinstance(res, (dict, list)): + return len(res) > 0 + return bool(res) + + +def _fetch_sqlite_rows(path: str, material_only: bool) -> List[Tuple[str, Optional[str]]]: + conn = sqlite3.connect(path) + cur = conn.execute("SELECT cache_key, ga_object FROM gap_analysis_results") + rows: List[Tuple[str, Optional[str]]] = [] + for k, v in cur.fetchall(): + payload = None if v is None else str(v) + if material_only and not _payload_is_material(payload): + continue + rows.append((str(k), payload)) + conn.close() + return rows + + +def _fetch_postgres_rows(pg_url: str, material_only: bool) -> List[Tuple[str, Optional[str]]]: + conn = psycopg2.connect(_normalize_pg_url(pg_url)) + try: + cur = conn.cursor() + cur.execute("SELECT cache_key, ga_object FROM public.gap_analysis_results") + rows: List[Tuple[str, Optional[str]]] = [] + for cache_key, ga_object in cur.fetchall(): + payload = None if ga_object is None else str(ga_object) + if material_only and not _payload_is_material(payload): + continue + rows.append((str(cache_key), payload)) + cur.close() + return rows + finally: + conn.close() + + +def _merge_postgres_rows( + pg_url: str, rows: Sequence[Tuple[str, Optional[str]]] +) -> None: + """Update existing rows and insert missing keys (works without a unique index).""" + conn = psycopg2.connect(_normalize_pg_url(pg_url)) + conn.autocommit = False + batch_size = 500 + try: + cur = conn.cursor() + for i in range(0, len(rows), batch_size): + batch = list(rows[i : i + batch_size]) + cur.execute( + """ + CREATE TEMP TABLE ga_sync_stage ( + cache_key text PRIMARY KEY, + ga_object text + ) ON COMMIT DROP + """ + ) + extras.execute_batch( + cur, + "INSERT INTO ga_sync_stage (cache_key, ga_object) VALUES (%s, %s)", + batch, + page_size=200, + ) + cur.execute( + """ + UPDATE public.gap_analysis_results AS g + SET ga_object = s.ga_object + FROM ga_sync_stage AS s + WHERE g.cache_key = s.cache_key + """ + ) + cur.execute( + """ + INSERT INTO public.gap_analysis_results (cache_key, ga_object) + SELECT s.cache_key, s.ga_object + FROM ga_sync_stage AS s + LEFT JOIN public.gap_analysis_results AS g + ON g.cache_key = s.cache_key + WHERE g.cache_key IS NULL + """ + ) + conn.commit() + print(f"merged batch {i // batch_size + 1}: {len(batch)} row(s)", flush=True) + cur.close() + finally: + conn.close() + + +def _fetch_rows( + from_sqlite: Optional[str], + from_postgres: Optional[str], + material_only: bool, +) -> List[Tuple[str, Optional[str]]]: + if from_sqlite: + return _fetch_sqlite_rows(from_sqlite, material_only) + if from_postgres: + return _fetch_postgres_rows(from_postgres, material_only) + raise ValueError("one of --from-sqlite or --from-postgres is required") + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__) + src = p.add_mutually_exclusive_group(required=True) + src.add_argument("--from-sqlite", metavar="PATH") + src.add_argument("--from-postgres", metavar="URL") + p.add_argument("--to-postgres", required=True, metavar="URL") + p.add_argument( + "--material-only", + action="store_true", + default=True, + help="Sync only rows with non-empty result payloads (default: true)", + ) + p.add_argument( + "--include-non-material", + action="store_true", + help="Also sync empty/placeholder rows (overrides --material-only)", + ) + p.add_argument("--require-local-destination", action="store_true") + p.add_argument("--allow-nonloopback-destination", action="store_true") + args = p.parse_args() + + material_only = not args.include_non_material + + if args.require_local_destination and not _pg_host_is_loopback(args.to_postgres): + print("error: destination is not loopback", file=sys.stderr) + return 2 + if ( + not _pg_host_is_loopback(args.to_postgres) + and not args.allow_nonloopback_destination + ): + print( + "error: remote destination requires --allow-nonloopback-destination", + file=sys.stderr, + ) + return 2 + + rows = _fetch_rows(args.from_sqlite, args.from_postgres, material_only) + source = args.from_sqlite or args.from_postgres + print(f"read {len(rows)} row(s) from {source!r} (material_only={material_only})") + _merge_postgres_rows(args.to_postgres, rows) + print(f"merged {len(rows)} row(s) to postgres") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())