From fead0ca2b55ef407a3946de462a2533d6568ff81 Mon Sep 17 00:00:00 2001 From: Shaddi Hasan Date: Tue, 9 Jun 2026 20:53:55 -0400 Subject: [PATCH] Split filing edits into a fast DB apply and a debounced tile rebuild Applying an edit used to be one celery task that updated editfiles and kml_data and then rebuilt the folder's entire vector-tile set, so every edit cost a full tippecanoe run before it was reported done. The task is now a chain of two: - apply_edit_changes: persists the drawn polygons as editfiles, deletes the excluded kml_data rows, and refreshes the availability CSV for export filings. After this commits (seconds), database truth - and therefore exports - is correct; only the map tiles are stale. - regenerate_tiles: the slow tile rebuild, made single-flight per folder with a redis dirty flag so rapid consecutive edits coalesce instead of each paying a full rebuild. A queued rebuild that finds its folder clean no-ops, but a chain only reports SUCCESS once the tiles actually include its edit, so task completion keeps its meaning for clients. Without redis (e.g. the eager test broker) the debounce degrades to always rebuilding, which is correct, just less efficient. Tile-rebuild failures now log and surface through celery retries instead of being silently swallowed. --- .../celery_controller/celery_tasks.py | 108 ++++++++++-- back-end/services/edit_service.py | 14 +- back-end/tests/test_services_edit.py | 14 +- back-end/tests/test_tiles_split.py | 162 ++++++++++++++++++ 4 files changed, 277 insertions(+), 21 deletions(-) create mode 100644 back-end/tests/test_tiles_split.py diff --git a/back-end/controllers/celery_controller/celery_tasks.py b/back-end/controllers/celery_controller/celery_tasks.py index 2b4c728..d341eca 100755 --- a/back-end/controllers/celery_controller/celery_tasks.py +++ b/back-end/controllers/celery_controller/celery_tasks.py @@ -2,6 +2,7 @@ import json import os import subprocess +import time from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -30,6 +31,7 @@ ) from database.models import file, kml_data from database.sessions import Session +from utils.config import Config from utils.logger_config import logger from utils.namingschemes import DATETIME_FORMAT, EXPORT_CSV_NAME_TEMPLATE from utils.wireless_form2args import wireless_raster_file_format, wireless_vector_file_format @@ -195,8 +197,42 @@ def async_delete_files(self, file_ids, editfile_ids): session.close() +# --- edit flow: fast DB apply + debounced tile regeneration ------------------- +# An edit used to be one task (editfile + kml_data changes + a full retile). +# It is now dispatched as a chain (see services/edit_service.py): +# apply_edit_changes — seconds; after it commits, kml_data (and therefore +# exports) is correct and only the tiles are stale; +# regenerate_tiles — the slow tippecanoe rebuild, single-flight per folder +# with a dirty flag so rapid edits coalesce. + +TILES_DIRTY_KEY = "bdk:tiles-dirty:{folderid}" +TILES_LOCK_KEY = "bdk:tiles-lock:{folderid}" +TILES_LOCK_TTL = 3600 # safety expiry on the lock; no rebuild should take this long +TILES_LOCK_WAIT = 1800 # max time a regenerate waits behind another rebuild + + +def _tiles_redis(): + """Redis client for the tile dirty/lock flags, or None when redis isn't + reachable (e.g. the tests' in-memory broker). Callers must treat None as + 'no debounce' and always rebuild — correct, just less efficient.""" + try: + import redis + + client = redis.Redis.from_url(Config.CELERY_BROKER_URL, socket_connect_timeout=2) + client.ping() + return client + except Exception: + return None + + +def _mark_tiles_dirty(folderid): + r = _tiles_redis() + if r is not None: + r.set(TILES_DIRTY_KEY.format(folderid=folderid), "1") + + @celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True) -def toggle_tiles(self, markers, folderid, polygonfeatures): +def apply_edit_changes(self, markers, folderid, polygonfeatures): session = Session() try: user_folder = folder_ops.get_folder_with_id(folderid=folderid, session=session) @@ -247,19 +283,8 @@ def toggle_tiles(self, markers, folderid, polygonfeatures): else: raise Exception("No folder for the user") - geojson_data = [] - all_kmls = file_ops.get_files_with_postfix(user_folder.id, ".kml", session) - for kml_f in all_kmls: - geojson_data.append(vt_ops.read_kml(kml_f.id, session)) - - all_geojsons = file_ops.get_files_with_postfix( - folderid=user_folder.id, postfix=".geojson", session=session - ) - for geojson_f in all_geojsons: - geojson_data.append(vt_ops.read_geojson(geojson_f.id, session)) - - mbtiles_ops.delete_mbtiles(user_folder.id, session) - vt_ops.create_tiles(geojson_data, user_folder.id, session) + # The availability CSV derives from kml_data, so it can refresh now — + # exports are correct without waiting for the retile. if user_folder.type == "export": existing_csvs = file_ops.get_files_by_type( folderid=user_folder.id, filetype="export", session=session @@ -291,12 +316,67 @@ def toggle_tiles(self, markers, folderid, polygonfeatures): session.add(new_csv_file) except Exception: + logger.exception(f"apply_edit_changes failed for folder {folderid}") session.rollback() # rollback transaction on error finally: session.commit() session.close() + _mark_tiles_dirty(folderid) + + +def _rebuild_folder_tiles(folderid): + """Rebuild a folder's vector tiles from current DB truth (the slow part).""" + session = Session() + try: + geojson_data = [] + all_kmls = file_ops.get_files_with_postfix(folderid, ".kml", session) + for kml_f in all_kmls: + geojson_data.append(vt_ops.read_kml(kml_f.id, session)) + + all_geojsons = file_ops.get_files_with_postfix( + folderid=folderid, postfix=".geojson", session=session + ) + for geojson_f in all_geojsons: + geojson_data.append(vt_ops.read_geojson(geojson_f.id, session)) + + mbtiles_ops.delete_mbtiles(folderid, session) + vt_ops.create_tiles(geojson_data, folderid, session) + finally: + session.commit() + session.close() + + +@celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True) +def regenerate_tiles(self, folderid): + """Rebuild the folder's tiles, single-flight + coalescing. Every edit's + apply phase sets the dirty flag; the rebuild that runs after it clears it. + A queued regenerate that finds the folder clean no-ops (an earlier rebuild + already covered its edit), so N rapid edits cost ~1 rebuild, while 'task + SUCCESS' still always means 'the tiles include this chain's edit'.""" + r = _tiles_redis() + if r is None: + _rebuild_folder_tiles(folderid) + return "tiles rebuilt" + + lock_key = TILES_LOCK_KEY.format(folderid=folderid) + dirty_key = TILES_DIRTY_KEY.format(folderid=folderid) + waited = 0 + while not r.set(lock_key, str(self.request.id), nx=True, ex=TILES_LOCK_TTL): + if waited >= TILES_LOCK_WAIT: + raise Exception(f"timed out waiting for the tile-rebuild lock on folder {folderid}") + time.sleep(5) + waited += 5 + try: + if not r.get(dirty_key): + return "tiles fresh (an earlier rebuild covered this edit)" + r.delete(dirty_key) + _rebuild_folder_tiles(folderid) + return "tiles rebuilt" + finally: + r.delete(lock_key) + @celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True) def async_folder_copy_for_export(self, folderid, serialized_csv, brandname, deadline): diff --git a/back-end/services/edit_service.py b/back-end/services/edit_service.py index 49320e0..71326f6 100644 --- a/back-end/services/edit_service.py +++ b/back-end/services/edit_service.py @@ -1,9 +1,14 @@ """Edit-apply flow: apply marker/polygon edits to a filing and retile. -Extracted verbatim (behavior-preserving) from routes.edit.toggle_markers. +Dispatched as a two-task chain: a fast DB apply (editfiles + kml_data, after +which exports are already correct) followed by the slow, debounced tile +rebuild. The recorded task id is the chain's final task, so a task reaching +SUCCESS still means "the map tiles include this edit". """ -from controllers.celery_controller.celery_tasks import toggle_tiles +from celery import chain + +from controllers.celery_controller.celery_tasks import apply_edit_changes, regenerate_tiles from controllers.database_controller import celerytaskinfo_ops, folder_ops, user_ops from services.exceptions import ServiceError from utils.logger_config import logger @@ -48,7 +53,10 @@ def apply_edit(user_id, folderid, markers, polygonfeatures, session): all_filenames.update(point["editedFile"]) concatenated_filenames = ", ".join(sorted(all_filenames)) logger.debug(polygonfeatures) - result = toggle_tiles.apply_async(args=[filtered_markers, folderid, polygonfeatures]) + result = chain( + apply_edit_changes.s(filtered_markers, folderid, polygonfeatures), + regenerate_tiles.si(folderid), + ).apply_async() celerytaskinfo_ops.create_celery_taskinfo( task_id=result.task_id, diff --git a/back-end/tests/test_services_edit.py b/back-end/tests/test_services_edit.py index 3cbd277..4fc4233 100644 --- a/back-end/tests/test_services_edit.py +++ b/back-end/tests/test_services_edit.py @@ -1,6 +1,7 @@ -"""P1.4 — unit tests for the edit-apply service (no Flask). Pin behavior before -the toggle_markers handler logic is extracted. toggle_tiles (the heavy retile -pipeline) is stubbed so we test orchestration, not tippecanoe.""" +"""Unit tests for the edit-apply service (no Flask). The dispatched celery +chain (DB apply + the heavy retile pipeline) is stubbed so we test the +service's validation/orchestration, not tippecanoe; the chain itself is +covered in test_tiles_split.py.""" import pytest @@ -11,11 +12,16 @@ class _FakeResult: task_id = "fake-edit-task" +class _FakeChain: + def apply_async(self, *a, **k): + return _FakeResult() + + @pytest.fixture() def edit_service(monkeypatch): from services import edit_service as mod - monkeypatch.setattr(mod.toggle_tiles, "apply_async", lambda *a, **k: _FakeResult()) + monkeypatch.setattr(mod, "chain", lambda *sigs: _FakeChain()) return mod diff --git a/back-end/tests/test_tiles_split.py b/back-end/tests/test_tiles_split.py new file mode 100644 index 0000000..f972aca --- /dev/null +++ b/back-end/tests/test_tiles_split.py @@ -0,0 +1,162 @@ +"""Tests for the edit-apply / tile-regeneration split. + +An edit used to be one monolithic celery task (editfile + kml_data changes + +a full tippecanoe retile). It is now a chain: `apply_edit_changes` (fast — +after it, DB truth is correct and exports are right) followed by +`regenerate_tiles` (slow — rebuilds the folder's vector tiles, debounced via +redis dirty/lock flags so rapid edits coalesce into one rebuild). + +The heavy tile pipeline (tippecanoe) is stubbed; we test orchestration. +""" + +import json + +import pytest + +from tests import conftest_helpers as H + +POLYGON = { + "type": "Feature", + "properties": {}, + "geometry": { + "type": "Polygon", + "coordinates": [ + [[-80.01, 37.27], [-80.0, 37.27], [-80.0, 37.28], [-80.01, 37.28], [-80.01, 37.27]] + ], + }, +} + + +@pytest.fixture() +def tasks(monkeypatch): + """celery_tasks with the tile pipeline stubbed; records rebuild calls.""" + from controllers.celery_controller import celery_tasks as ct + + calls = {"create_tiles": [], "delete_mbtiles": []} + monkeypatch.setattr( + ct.vt_ops, "create_tiles", lambda gj, fid, s: calls["create_tiles"].append(fid) + ) + monkeypatch.setattr( + ct.mbtiles_ops, "delete_mbtiles", lambda fid, s: calls["delete_mbtiles"].append(fid) + ) + empty = {"type": "FeatureCollection", "features": []} + monkeypatch.setattr(ct.vt_ops, "read_kml", lambda fid, s: empty) + monkeypatch.setattr(ct.vt_ops, "read_geojson", lambda fid, s: empty) + return ct, calls + + +class _StubRedis: + """Just enough of the redis interface for the dirty/lock flags.""" + + def __init__(self): + self.store = {} + + def set(self, key, value, nx=False, ex=None): + if nx and key in self.store: + return False + self.store[key] = value + return True + + def get(self, key): + return self.store.get(key) + + def delete(self, key): + self.store.pop(key, None) + + +def _seed_edit_fixture(s): + """Org + folder + one coverage file with one served kml_data location.""" + from database.models import kml_data + + org = H.make_org(s) + user = H.make_user(s, org_id=org.id, email="editor@example.com") + folder = H.make_folder(s, org.id) + cov = H.seed_coverage(s, folder.id, b"stub", "cov.kml", "wired", 50) + s.add( + kml_data(location_id=123, served=True, file_id=cov.id, longitude=-80.005, latitude=37.275) + ) + s.commit() + return org, user, folder, cov + + +def test_apply_phase_updates_db_without_retiling(db_session, tasks): + ct, calls = tasks + s = db_session + _, _, folder, cov = _seed_edit_fixture(s) + from database.models import editfile, file_editfile_link, kml_data + + markers = [[{"id": 123, "editedFile": ["cov.kml"]}]] + ct.apply_edit_changes.apply_async(args=[markers, folder.id, [POLYGON]]).get() + + s.expire_all() + assert s.query(kml_data).filter(kml_data.location_id == 123).count() == 0 + efs = s.query(editfile).filter(editfile.folder_id == folder.id).all() + assert len(efs) == 1 + assert json.loads(bytes(efs[0].data).decode("utf-8")) == POLYGON + links = s.query(file_editfile_link).filter(file_editfile_link.editfile_id == efs[0].id).all() + assert [ln.file_id for ln in links] == [cov.id] + # the whole point of the split: applying the edit does NOT rebuild tiles + assert calls["create_tiles"] == [] + assert calls["delete_mbtiles"] == [] + + +def test_regenerate_tiles_rebuilds(db_session, tasks): + ct, calls = tasks + s = db_session + _, _, folder, _ = _seed_edit_fixture(s) + + ct.regenerate_tiles.apply_async(args=[folder.id]).get() + + assert calls["delete_mbtiles"] == [folder.id] + assert calls["create_tiles"] == [folder.id] + + +def test_regenerate_tiles_coalesces_when_fresh(db_session, tasks, monkeypatch): + """With redis flags: a rebuild only runs if the folder is dirty, so a + queued regenerate whose edits were covered by an earlier rebuild no-ops.""" + ct, calls = tasks + s = db_session + _, _, folder, _ = _seed_edit_fixture(s) + stub = _StubRedis() + monkeypatch.setattr(ct, "_tiles_redis", lambda: stub) + + # not dirty -> tiles already cover current truth -> no rebuild + res = ct.regenerate_tiles.apply_async(args=[folder.id]).get() + assert calls["create_tiles"] == [] + assert "fresh" in res + + # the apply phase marks the folder dirty -> next regenerate rebuilds + clears + markers = [[{"id": 123, "editedFile": ["cov.kml"]}]] + ct.apply_edit_changes.apply_async(args=[markers, folder.id, [POLYGON]]).get() + assert stub.get(f"bdk:tiles-dirty:{folder.id}") is not None + res = ct.regenerate_tiles.apply_async(args=[folder.id]).get() + assert calls["create_tiles"] == [folder.id] + assert "rebuilt" in res + assert stub.get(f"bdk:tiles-dirty:{folder.id}") is None + assert stub.get(f"bdk:tiles-lock:{folder.id}") is None # lock released + + +def test_apply_edit_service_runs_full_chain(db_session, tasks): + """End-to-end through the service (eager celery): one call applies the DB + edit AND regenerates tiles, and the recorded task id tracks the chain's + final (tile) task so 'task done' still means 'tiles done'.""" + ct, calls = tasks + s = db_session + _, user, folder, _ = _seed_edit_fixture(s) + from database.models import celerytaskinfo, kml_data + from services import edit_service + + task_id = edit_service.apply_edit( + user_id=user.id, + folderid=folder.id, + markers=[[{"id": 123, "editedFile": ["cov.kml"]}]], + polygonfeatures=[POLYGON], + session=s, + ) + + s.expire_all() + assert s.query(kml_data).filter(kml_data.location_id == 123).count() == 0 + assert calls["create_tiles"] == [folder.id] + info = s.query(celerytaskinfo).filter(celerytaskinfo.task_id == task_id).one() + assert info.operation_type == "Edit" + assert info.files_changed == "cov.kml"