diff --git a/back-end/controllers/celery_controller/celery_tasks.py b/back-end/controllers/celery_controller/celery_tasks.py index 2b4c728..d341eca 100755 --- a/back-end/controllers/celery_controller/celery_tasks.py +++ b/back-end/controllers/celery_controller/celery_tasks.py @@ -2,6 +2,7 @@ import json import os import subprocess +import time from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -30,6 +31,7 @@ ) from database.models import file, kml_data from database.sessions import Session +from utils.config import Config from utils.logger_config import logger from utils.namingschemes import DATETIME_FORMAT, EXPORT_CSV_NAME_TEMPLATE from utils.wireless_form2args import wireless_raster_file_format, wireless_vector_file_format @@ -195,8 +197,42 @@ def async_delete_files(self, file_ids, editfile_ids): session.close() +# --- edit flow: fast DB apply + debounced tile regeneration ------------------- +# An edit used to be one task (editfile + kml_data changes + a full retile). +# It is now dispatched as a chain (see services/edit_service.py): +# apply_edit_changes — seconds; after it commits, kml_data (and therefore +# exports) is correct and only the tiles are stale; +# regenerate_tiles — the slow tippecanoe rebuild, single-flight per folder +# with a dirty flag so rapid edits coalesce. + +TILES_DIRTY_KEY = "bdk:tiles-dirty:{folderid}" +TILES_LOCK_KEY = "bdk:tiles-lock:{folderid}" +TILES_LOCK_TTL = 3600 # safety expiry on the lock; no rebuild should take this long +TILES_LOCK_WAIT = 1800 # max time a regenerate waits behind another rebuild + + +def _tiles_redis(): + """Redis client for the tile dirty/lock flags, or None when redis isn't + reachable (e.g. the tests' in-memory broker). Callers must treat None as + 'no debounce' and always rebuild — correct, just less efficient.""" + try: + import redis + + client = redis.Redis.from_url(Config.CELERY_BROKER_URL, socket_connect_timeout=2) + client.ping() + return client + except Exception: + return None + + +def _mark_tiles_dirty(folderid): + r = _tiles_redis() + if r is not None: + r.set(TILES_DIRTY_KEY.format(folderid=folderid), "1") + + @celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True) -def toggle_tiles(self, markers, folderid, polygonfeatures): +def apply_edit_changes(self, markers, folderid, polygonfeatures): session = Session() try: user_folder = folder_ops.get_folder_with_id(folderid=folderid, session=session) @@ -247,19 +283,8 @@ def toggle_tiles(self, markers, folderid, polygonfeatures): else: raise Exception("No folder for the user") - geojson_data = [] - all_kmls = file_ops.get_files_with_postfix(user_folder.id, ".kml", session) - for kml_f in all_kmls: - geojson_data.append(vt_ops.read_kml(kml_f.id, session)) - - all_geojsons = file_ops.get_files_with_postfix( - folderid=user_folder.id, postfix=".geojson", session=session - ) - for geojson_f in all_geojsons: - geojson_data.append(vt_ops.read_geojson(geojson_f.id, session)) - - mbtiles_ops.delete_mbtiles(user_folder.id, session) - vt_ops.create_tiles(geojson_data, user_folder.id, session) + # The availability CSV derives from kml_data, so it can refresh now — + # exports are correct without waiting for the retile. if user_folder.type == "export": existing_csvs = file_ops.get_files_by_type( folderid=user_folder.id, filetype="export", session=session @@ -291,12 +316,67 @@ def toggle_tiles(self, markers, folderid, polygonfeatures): session.add(new_csv_file) except Exception: + logger.exception(f"apply_edit_changes failed for folder {folderid}") session.rollback() # rollback transaction on error finally: session.commit() session.close() + _mark_tiles_dirty(folderid) + + +def _rebuild_folder_tiles(folderid): + """Rebuild a folder's vector tiles from current DB truth (the slow part).""" + session = Session() + try: + geojson_data = [] + all_kmls = file_ops.get_files_with_postfix(folderid, ".kml", session) + for kml_f in all_kmls: + geojson_data.append(vt_ops.read_kml(kml_f.id, session)) + + all_geojsons = file_ops.get_files_with_postfix( + folderid=folderid, postfix=".geojson", session=session + ) + for geojson_f in all_geojsons: + geojson_data.append(vt_ops.read_geojson(geojson_f.id, session)) + + mbtiles_ops.delete_mbtiles(folderid, session) + vt_ops.create_tiles(geojson_data, folderid, session) + finally: + session.commit() + session.close() + + +@celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True) +def regenerate_tiles(self, folderid): + """Rebuild the folder's tiles, single-flight + coalescing. Every edit's + apply phase sets the dirty flag; the rebuild that runs after it clears it. + A queued regenerate that finds the folder clean no-ops (an earlier rebuild + already covered its edit), so N rapid edits cost ~1 rebuild, while 'task + SUCCESS' still always means 'the tiles include this chain's edit'.""" + r = _tiles_redis() + if r is None: + _rebuild_folder_tiles(folderid) + return "tiles rebuilt" + + lock_key = TILES_LOCK_KEY.format(folderid=folderid) + dirty_key = TILES_DIRTY_KEY.format(folderid=folderid) + waited = 0 + while not r.set(lock_key, str(self.request.id), nx=True, ex=TILES_LOCK_TTL): + if waited >= TILES_LOCK_WAIT: + raise Exception(f"timed out waiting for the tile-rebuild lock on folder {folderid}") + time.sleep(5) + waited += 5 + try: + if not r.get(dirty_key): + return "tiles fresh (an earlier rebuild covered this edit)" + r.delete(dirty_key) + _rebuild_folder_tiles(folderid) + return "tiles rebuilt" + finally: + r.delete(lock_key) + @celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True) def async_folder_copy_for_export(self, folderid, serialized_csv, brandname, deadline): diff --git a/back-end/services/edit_service.py b/back-end/services/edit_service.py index 49320e0..71326f6 100644 --- a/back-end/services/edit_service.py +++ b/back-end/services/edit_service.py @@ -1,9 +1,14 @@ """Edit-apply flow: apply marker/polygon edits to a filing and retile. -Extracted verbatim (behavior-preserving) from routes.edit.toggle_markers. +Dispatched as a two-task chain: a fast DB apply (editfiles + kml_data, after +which exports are already correct) followed by the slow, debounced tile +rebuild. The recorded task id is the chain's final task, so a task reaching +SUCCESS still means "the map tiles include this edit". """ -from controllers.celery_controller.celery_tasks import toggle_tiles +from celery import chain + +from controllers.celery_controller.celery_tasks import apply_edit_changes, regenerate_tiles from controllers.database_controller import celerytaskinfo_ops, folder_ops, user_ops from services.exceptions import ServiceError from utils.logger_config import logger @@ -48,7 +53,10 @@ def apply_edit(user_id, folderid, markers, polygonfeatures, session): all_filenames.update(point["editedFile"]) concatenated_filenames = ", ".join(sorted(all_filenames)) logger.debug(polygonfeatures) - result = toggle_tiles.apply_async(args=[filtered_markers, folderid, polygonfeatures]) + result = chain( + apply_edit_changes.s(filtered_markers, folderid, polygonfeatures), + regenerate_tiles.si(folderid), + ).apply_async() celerytaskinfo_ops.create_celery_taskinfo( task_id=result.task_id, diff --git a/back-end/tests/test_services_edit.py b/back-end/tests/test_services_edit.py index 3cbd277..4fc4233 100644 --- a/back-end/tests/test_services_edit.py +++ b/back-end/tests/test_services_edit.py @@ -1,6 +1,7 @@ -"""P1.4 — unit tests for the edit-apply service (no Flask). Pin behavior before -the toggle_markers handler logic is extracted. toggle_tiles (the heavy retile -pipeline) is stubbed so we test orchestration, not tippecanoe.""" +"""Unit tests for the edit-apply service (no Flask). The dispatched celery +chain (DB apply + the heavy retile pipeline) is stubbed so we test the +service's validation/orchestration, not tippecanoe; the chain itself is +covered in test_tiles_split.py.""" import pytest @@ -11,11 +12,16 @@ class _FakeResult: task_id = "fake-edit-task" +class _FakeChain: + def apply_async(self, *a, **k): + return _FakeResult() + + @pytest.fixture() def edit_service(monkeypatch): from services import edit_service as mod - monkeypatch.setattr(mod.toggle_tiles, "apply_async", lambda *a, **k: _FakeResult()) + monkeypatch.setattr(mod, "chain", lambda *sigs: _FakeChain()) return mod diff --git a/back-end/tests/test_tiles_split.py b/back-end/tests/test_tiles_split.py new file mode 100644 index 0000000..f972aca --- /dev/null +++ b/back-end/tests/test_tiles_split.py @@ -0,0 +1,162 @@ +"""Tests for the edit-apply / tile-regeneration split. + +An edit used to be one monolithic celery task (editfile + kml_data changes + +a full tippecanoe retile). It is now a chain: `apply_edit_changes` (fast — +after it, DB truth is correct and exports are right) followed by +`regenerate_tiles` (slow — rebuilds the folder's vector tiles, debounced via +redis dirty/lock flags so rapid edits coalesce into one rebuild). + +The heavy tile pipeline (tippecanoe) is stubbed; we test orchestration. +""" + +import json + +import pytest + +from tests import conftest_helpers as H + +POLYGON = { + "type": "Feature", + "properties": {}, + "geometry": { + "type": "Polygon", + "coordinates": [ + [[-80.01, 37.27], [-80.0, 37.27], [-80.0, 37.28], [-80.01, 37.28], [-80.01, 37.27]] + ], + }, +} + + +@pytest.fixture() +def tasks(monkeypatch): + """celery_tasks with the tile pipeline stubbed; records rebuild calls.""" + from controllers.celery_controller import celery_tasks as ct + + calls = {"create_tiles": [], "delete_mbtiles": []} + monkeypatch.setattr( + ct.vt_ops, "create_tiles", lambda gj, fid, s: calls["create_tiles"].append(fid) + ) + monkeypatch.setattr( + ct.mbtiles_ops, "delete_mbtiles", lambda fid, s: calls["delete_mbtiles"].append(fid) + ) + empty = {"type": "FeatureCollection", "features": []} + monkeypatch.setattr(ct.vt_ops, "read_kml", lambda fid, s: empty) + monkeypatch.setattr(ct.vt_ops, "read_geojson", lambda fid, s: empty) + return ct, calls + + +class _StubRedis: + """Just enough of the redis interface for the dirty/lock flags.""" + + def __init__(self): + self.store = {} + + def set(self, key, value, nx=False, ex=None): + if nx and key in self.store: + return False + self.store[key] = value + return True + + def get(self, key): + return self.store.get(key) + + def delete(self, key): + self.store.pop(key, None) + + +def _seed_edit_fixture(s): + """Org + folder + one coverage file with one served kml_data location.""" + from database.models import kml_data + + org = H.make_org(s) + user = H.make_user(s, org_id=org.id, email="editor@example.com") + folder = H.make_folder(s, org.id) + cov = H.seed_coverage(s, folder.id, b"stub", "cov.kml", "wired", 50) + s.add( + kml_data(location_id=123, served=True, file_id=cov.id, longitude=-80.005, latitude=37.275) + ) + s.commit() + return org, user, folder, cov + + +def test_apply_phase_updates_db_without_retiling(db_session, tasks): + ct, calls = tasks + s = db_session + _, _, folder, cov = _seed_edit_fixture(s) + from database.models import editfile, file_editfile_link, kml_data + + markers = [[{"id": 123, "editedFile": ["cov.kml"]}]] + ct.apply_edit_changes.apply_async(args=[markers, folder.id, [POLYGON]]).get() + + s.expire_all() + assert s.query(kml_data).filter(kml_data.location_id == 123).count() == 0 + efs = s.query(editfile).filter(editfile.folder_id == folder.id).all() + assert len(efs) == 1 + assert json.loads(bytes(efs[0].data).decode("utf-8")) == POLYGON + links = s.query(file_editfile_link).filter(file_editfile_link.editfile_id == efs[0].id).all() + assert [ln.file_id for ln in links] == [cov.id] + # the whole point of the split: applying the edit does NOT rebuild tiles + assert calls["create_tiles"] == [] + assert calls["delete_mbtiles"] == [] + + +def test_regenerate_tiles_rebuilds(db_session, tasks): + ct, calls = tasks + s = db_session + _, _, folder, _ = _seed_edit_fixture(s) + + ct.regenerate_tiles.apply_async(args=[folder.id]).get() + + assert calls["delete_mbtiles"] == [folder.id] + assert calls["create_tiles"] == [folder.id] + + +def test_regenerate_tiles_coalesces_when_fresh(db_session, tasks, monkeypatch): + """With redis flags: a rebuild only runs if the folder is dirty, so a + queued regenerate whose edits were covered by an earlier rebuild no-ops.""" + ct, calls = tasks + s = db_session + _, _, folder, _ = _seed_edit_fixture(s) + stub = _StubRedis() + monkeypatch.setattr(ct, "_tiles_redis", lambda: stub) + + # not dirty -> tiles already cover current truth -> no rebuild + res = ct.regenerate_tiles.apply_async(args=[folder.id]).get() + assert calls["create_tiles"] == [] + assert "fresh" in res + + # the apply phase marks the folder dirty -> next regenerate rebuilds + clears + markers = [[{"id": 123, "editedFile": ["cov.kml"]}]] + ct.apply_edit_changes.apply_async(args=[markers, folder.id, [POLYGON]]).get() + assert stub.get(f"bdk:tiles-dirty:{folder.id}") is not None + res = ct.regenerate_tiles.apply_async(args=[folder.id]).get() + assert calls["create_tiles"] == [folder.id] + assert "rebuilt" in res + assert stub.get(f"bdk:tiles-dirty:{folder.id}") is None + assert stub.get(f"bdk:tiles-lock:{folder.id}") is None # lock released + + +def test_apply_edit_service_runs_full_chain(db_session, tasks): + """End-to-end through the service (eager celery): one call applies the DB + edit AND regenerates tiles, and the recorded task id tracks the chain's + final (tile) task so 'task done' still means 'tiles done'.""" + ct, calls = tasks + s = db_session + _, user, folder, _ = _seed_edit_fixture(s) + from database.models import celerytaskinfo, kml_data + from services import edit_service + + task_id = edit_service.apply_edit( + user_id=user.id, + folderid=folder.id, + markers=[[{"id": 123, "editedFile": ["cov.kml"]}]], + polygonfeatures=[POLYGON], + session=s, + ) + + s.expire_all() + assert s.query(kml_data).filter(kml_data.location_id == 123).count() == 0 + assert calls["create_tiles"] == [folder.id] + info = s.query(celerytaskinfo).filter(celerytaskinfo.task_id == task_id).one() + assert info.operation_type == "Edit" + assert info.files_changed == "cov.kml"