Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 94 additions & 14 deletions back-end/controllers/celery_controller/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import subprocess
import time
from datetime import datetime

from sqlalchemy.exc import SQLAlchemyError
Expand Down Expand Up @@ -30,6 +31,7 @@
)
from database.models import file, kml_data
from database.sessions import Session
from utils.config import Config
from utils.logger_config import logger
from utils.namingschemes import DATETIME_FORMAT, EXPORT_CSV_NAME_TEMPLATE
from utils.wireless_form2args import wireless_raster_file_format, wireless_vector_file_format
Expand Down Expand Up @@ -195,8 +197,42 @@ def async_delete_files(self, file_ids, editfile_ids):
session.close()


# --- edit flow: fast DB apply + debounced tile regeneration -------------------
# An edit used to be one task (editfile + kml_data changes + a full retile).
# It is now dispatched as a chain (see services/edit_service.py):
# apply_edit_changes — seconds; after it commits, kml_data (and therefore
# exports) is correct and only the tiles are stale;
# regenerate_tiles — the slow tippecanoe rebuild, single-flight per folder
# with a dirty flag so rapid edits coalesce.

TILES_DIRTY_KEY = "bdk:tiles-dirty:{folderid}"
TILES_LOCK_KEY = "bdk:tiles-lock:{folderid}"
TILES_LOCK_TTL = 3600 # safety expiry on the lock; no rebuild should take this long
TILES_LOCK_WAIT = 1800 # max time a regenerate waits behind another rebuild


def _tiles_redis():
"""Redis client for the tile dirty/lock flags, or None when redis isn't
reachable (e.g. the tests' in-memory broker). Callers must treat None as
'no debounce' and always rebuild — correct, just less efficient."""
try:
import redis

client = redis.Redis.from_url(Config.CELERY_BROKER_URL, socket_connect_timeout=2)
client.ping()
return client
except Exception:
return None


def _mark_tiles_dirty(folderid):
r = _tiles_redis()
if r is not None:
r.set(TILES_DIRTY_KEY.format(folderid=folderid), "1")


@celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def toggle_tiles(self, markers, folderid, polygonfeatures):
def apply_edit_changes(self, markers, folderid, polygonfeatures):
session = Session()
try:
user_folder = folder_ops.get_folder_with_id(folderid=folderid, session=session)
Expand Down Expand Up @@ -247,19 +283,8 @@ def toggle_tiles(self, markers, folderid, polygonfeatures):
else:
raise Exception("No folder for the user")

geojson_data = []
all_kmls = file_ops.get_files_with_postfix(user_folder.id, ".kml", session)
for kml_f in all_kmls:
geojson_data.append(vt_ops.read_kml(kml_f.id, session))

all_geojsons = file_ops.get_files_with_postfix(
folderid=user_folder.id, postfix=".geojson", session=session
)
for geojson_f in all_geojsons:
geojson_data.append(vt_ops.read_geojson(geojson_f.id, session))

mbtiles_ops.delete_mbtiles(user_folder.id, session)
vt_ops.create_tiles(geojson_data, user_folder.id, session)
# The availability CSV derives from kml_data, so it can refresh now —
# exports are correct without waiting for the retile.
if user_folder.type == "export":
existing_csvs = file_ops.get_files_by_type(
folderid=user_folder.id, filetype="export", session=session
Expand Down Expand Up @@ -291,12 +316,67 @@ def toggle_tiles(self, markers, folderid, polygonfeatures):
session.add(new_csv_file)

except Exception:
logger.exception(f"apply_edit_changes failed for folder {folderid}")
session.rollback() # rollback transaction on error

finally:
session.commit()
session.close()

_mark_tiles_dirty(folderid)


def _rebuild_folder_tiles(folderid):
"""Rebuild a folder's vector tiles from current DB truth (the slow part)."""
session = Session()
try:
geojson_data = []
all_kmls = file_ops.get_files_with_postfix(folderid, ".kml", session)
for kml_f in all_kmls:
geojson_data.append(vt_ops.read_kml(kml_f.id, session))

all_geojsons = file_ops.get_files_with_postfix(
folderid=folderid, postfix=".geojson", session=session
)
for geojson_f in all_geojsons:
geojson_data.append(vt_ops.read_geojson(geojson_f.id, session))

mbtiles_ops.delete_mbtiles(folderid, session)
vt_ops.create_tiles(geojson_data, folderid, session)
finally:
session.commit()
session.close()


@celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def regenerate_tiles(self, folderid):
"""Rebuild the folder's tiles, single-flight + coalescing. Every edit's
apply phase sets the dirty flag; the rebuild that runs after it clears it.
A queued regenerate that finds the folder clean no-ops (an earlier rebuild
already covered its edit), so N rapid edits cost ~1 rebuild, while 'task
SUCCESS' still always means 'the tiles include this chain's edit'."""
r = _tiles_redis()
if r is None:
_rebuild_folder_tiles(folderid)
return "tiles rebuilt"

lock_key = TILES_LOCK_KEY.format(folderid=folderid)
dirty_key = TILES_DIRTY_KEY.format(folderid=folderid)
waited = 0
while not r.set(lock_key, str(self.request.id), nx=True, ex=TILES_LOCK_TTL):
if waited >= TILES_LOCK_WAIT:
raise Exception(f"timed out waiting for the tile-rebuild lock on folder {folderid}")
time.sleep(5)
waited += 5
try:
if not r.get(dirty_key):
return "tiles fresh (an earlier rebuild covered this edit)"
r.delete(dirty_key)
_rebuild_folder_tiles(folderid)
return "tiles rebuilt"
finally:
r.delete(lock_key)


@celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def async_folder_copy_for_export(self, folderid, serialized_csv, brandname, deadline):
Expand Down
14 changes: 11 additions & 3 deletions back-end/services/edit_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""Edit-apply flow: apply marker/polygon edits to a filing and retile.

Extracted verbatim (behavior-preserving) from routes.edit.toggle_markers.
Dispatched as a two-task chain: a fast DB apply (editfiles + kml_data, after
which exports are already correct) followed by the slow, debounced tile
rebuild. The recorded task id is the chain's final task, so a task reaching
SUCCESS still means "the map tiles include this edit".
"""

from controllers.celery_controller.celery_tasks import toggle_tiles
from celery import chain

from controllers.celery_controller.celery_tasks import apply_edit_changes, regenerate_tiles
from controllers.database_controller import celerytaskinfo_ops, folder_ops, user_ops
from services.exceptions import ServiceError
from utils.logger_config import logger
Expand Down Expand Up @@ -48,7 +53,10 @@ def apply_edit(user_id, folderid, markers, polygonfeatures, session):
all_filenames.update(point["editedFile"])
concatenated_filenames = ", ".join(sorted(all_filenames))
logger.debug(polygonfeatures)
result = toggle_tiles.apply_async(args=[filtered_markers, folderid, polygonfeatures])
result = chain(
apply_edit_changes.s(filtered_markers, folderid, polygonfeatures),
regenerate_tiles.si(folderid),
).apply_async()

celerytaskinfo_ops.create_celery_taskinfo(
task_id=result.task_id,
Expand Down
14 changes: 10 additions & 4 deletions back-end/tests/test_services_edit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""P1.4 — unit tests for the edit-apply service (no Flask). Pin behavior before
the toggle_markers handler logic is extracted. toggle_tiles (the heavy retile
pipeline) is stubbed so we test orchestration, not tippecanoe."""
"""Unit tests for the edit-apply service (no Flask). The dispatched celery
chain (DB apply + the heavy retile pipeline) is stubbed so we test the
service's validation/orchestration, not tippecanoe; the chain itself is
covered in test_tiles_split.py."""

import pytest

Expand All @@ -11,11 +12,16 @@ class _FakeResult:
task_id = "fake-edit-task"


class _FakeChain:
def apply_async(self, *a, **k):
return _FakeResult()


@pytest.fixture()
def edit_service(monkeypatch):
from services import edit_service as mod

monkeypatch.setattr(mod.toggle_tiles, "apply_async", lambda *a, **k: _FakeResult())
monkeypatch.setattr(mod, "chain", lambda *sigs: _FakeChain())
return mod


Expand Down
162 changes: 162 additions & 0 deletions back-end/tests/test_tiles_split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Tests for the edit-apply / tile-regeneration split.

An edit used to be one monolithic celery task (editfile + kml_data changes +
a full tippecanoe retile). It is now a chain: `apply_edit_changes` (fast —
after it, DB truth is correct and exports are right) followed by
`regenerate_tiles` (slow — rebuilds the folder's vector tiles, debounced via
redis dirty/lock flags so rapid edits coalesce into one rebuild).

The heavy tile pipeline (tippecanoe) is stubbed; we test orchestration.
"""

import json

import pytest

from tests import conftest_helpers as H

POLYGON = {
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [
[[-80.01, 37.27], [-80.0, 37.27], [-80.0, 37.28], [-80.01, 37.28], [-80.01, 37.27]]
],
},
}


@pytest.fixture()
def tasks(monkeypatch):
"""celery_tasks with the tile pipeline stubbed; records rebuild calls."""
from controllers.celery_controller import celery_tasks as ct

calls = {"create_tiles": [], "delete_mbtiles": []}
monkeypatch.setattr(
ct.vt_ops, "create_tiles", lambda gj, fid, s: calls["create_tiles"].append(fid)
)
monkeypatch.setattr(
ct.mbtiles_ops, "delete_mbtiles", lambda fid, s: calls["delete_mbtiles"].append(fid)
)
empty = {"type": "FeatureCollection", "features": []}
monkeypatch.setattr(ct.vt_ops, "read_kml", lambda fid, s: empty)
monkeypatch.setattr(ct.vt_ops, "read_geojson", lambda fid, s: empty)
return ct, calls


class _StubRedis:
"""Just enough of the redis interface for the dirty/lock flags."""

def __init__(self):
self.store = {}

def set(self, key, value, nx=False, ex=None):
if nx and key in self.store:
return False
self.store[key] = value
return True

def get(self, key):
return self.store.get(key)

def delete(self, key):
self.store.pop(key, None)


def _seed_edit_fixture(s):
"""Org + folder + one coverage file with one served kml_data location."""
from database.models import kml_data

org = H.make_org(s)
user = H.make_user(s, org_id=org.id, email="editor@example.com")
folder = H.make_folder(s, org.id)
cov = H.seed_coverage(s, folder.id, b"stub", "cov.kml", "wired", 50)
s.add(
kml_data(location_id=123, served=True, file_id=cov.id, longitude=-80.005, latitude=37.275)
)
s.commit()
return org, user, folder, cov


def test_apply_phase_updates_db_without_retiling(db_session, tasks):
ct, calls = tasks
s = db_session
_, _, folder, cov = _seed_edit_fixture(s)
from database.models import editfile, file_editfile_link, kml_data

markers = [[{"id": 123, "editedFile": ["cov.kml"]}]]
ct.apply_edit_changes.apply_async(args=[markers, folder.id, [POLYGON]]).get()

s.expire_all()
assert s.query(kml_data).filter(kml_data.location_id == 123).count() == 0
efs = s.query(editfile).filter(editfile.folder_id == folder.id).all()
assert len(efs) == 1
assert json.loads(bytes(efs[0].data).decode("utf-8")) == POLYGON
links = s.query(file_editfile_link).filter(file_editfile_link.editfile_id == efs[0].id).all()
assert [ln.file_id for ln in links] == [cov.id]
# the whole point of the split: applying the edit does NOT rebuild tiles
assert calls["create_tiles"] == []
assert calls["delete_mbtiles"] == []


def test_regenerate_tiles_rebuilds(db_session, tasks):
ct, calls = tasks
s = db_session
_, _, folder, _ = _seed_edit_fixture(s)

ct.regenerate_tiles.apply_async(args=[folder.id]).get()

assert calls["delete_mbtiles"] == [folder.id]
assert calls["create_tiles"] == [folder.id]


def test_regenerate_tiles_coalesces_when_fresh(db_session, tasks, monkeypatch):
"""With redis flags: a rebuild only runs if the folder is dirty, so a
queued regenerate whose edits were covered by an earlier rebuild no-ops."""
ct, calls = tasks
s = db_session
_, _, folder, _ = _seed_edit_fixture(s)
stub = _StubRedis()
monkeypatch.setattr(ct, "_tiles_redis", lambda: stub)

# not dirty -> tiles already cover current truth -> no rebuild
res = ct.regenerate_tiles.apply_async(args=[folder.id]).get()
assert calls["create_tiles"] == []
assert "fresh" in res

# the apply phase marks the folder dirty -> next regenerate rebuilds + clears
markers = [[{"id": 123, "editedFile": ["cov.kml"]}]]
ct.apply_edit_changes.apply_async(args=[markers, folder.id, [POLYGON]]).get()
assert stub.get(f"bdk:tiles-dirty:{folder.id}") is not None
res = ct.regenerate_tiles.apply_async(args=[folder.id]).get()
assert calls["create_tiles"] == [folder.id]
assert "rebuilt" in res
assert stub.get(f"bdk:tiles-dirty:{folder.id}") is None
assert stub.get(f"bdk:tiles-lock:{folder.id}") is None # lock released


def test_apply_edit_service_runs_full_chain(db_session, tasks):
"""End-to-end through the service (eager celery): one call applies the DB
edit AND regenerates tiles, and the recorded task id tracks the chain's
final (tile) task so 'task done' still means 'tiles done'."""
ct, calls = tasks
s = db_session
_, user, folder, _ = _seed_edit_fixture(s)
from database.models import celerytaskinfo, kml_data
from services import edit_service

task_id = edit_service.apply_edit(
user_id=user.id,
folderid=folder.id,
markers=[[{"id": 123, "editedFile": ["cov.kml"]}]],
polygonfeatures=[POLYGON],
session=s,
)

s.expire_all()
assert s.query(kml_data).filter(kml_data.location_id == 123).count() == 0
assert calls["create_tiles"] == [folder.id]
info = s.query(celerytaskinfo).filter(celerytaskinfo.task_id == task_id).one()
assert info.operation_type == "Edit"
assert info.files_changed == "cov.kml"
Loading