diff --git a/CHANGES.rst b/CHANGES.rst index b222425..d81b34f 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,7 @@ Changelog 1.0.0 ----- +- #157 Make the Tamanu task queue resilient to failing tasks - #159 Use FHIR comparator for out-of-detection-limit results sent to Tamanu - #155 Disable notification for sample status transition to 'to_be_verified' - #144 Adding reference results to Tamanu FHIR Observations diff --git a/scripts/exec_tamanu_tasks.py b/scripts/exec_tamanu_tasks.py index 38b1137..cf8958d 100644 --- a/scripts/exec_tamanu_tasks.py +++ b/scripts/exec_tamanu_tasks.py @@ -10,6 +10,7 @@ from bes.lims.tamanu import logger from bes.lims.tamanu.tasks import queue from bika.lims import api +from ZODB.POSException import ConflictError __doc__ = """ @@ -82,19 +83,42 @@ def main(app): # max number of tasks to process max_tasks = int(args.max_tasks) for num in range(0, max_tasks): - task = queue.get() - if not task: + task_id, task = queue.get() + + # no more tasks ready to be processed + if not task_id: break + # head task popped but unresolvable (invalid id or context gone); + # commit the removal so it stops blocking the queue and move on + if task is None: + logger.warning("Dropping unresolvable task: %s" % task_id) + transaction.commit() + continue + task_name = task.__class__.__name__ logger.info("%s: %s ..." % (task_name, api.get_path(task.context))) - # try to process the task - task.process() - # do a transaction savepoint - transaction.savepoint(optimistic=True) + try: + # process the task and persist its removal + side effects together + task.process() + transaction.commit() + except ConflictError: + # transient write contention: leave the task queued untouched and + # let the next run retry it. Do not penalise the attempt count. + transaction.abort() + logger.warning( + "ConflictError on task %s; will retry next run" % task_id) + break + except Exception as e: + # a failing task must not abort the whole batch nor stay at the + # head forever. Discard its partial writes, then retry-with-backoff + # or dead-letter it in a fresh transaction. + logger.exception("Error processing task %s" % task_id) + transaction.abort() + queue.fail(task_id, error=repr(e)) + transaction.commit() - transaction.commit() logger.info("Executing Tamanu-specific tasks [DONE]") logger.info("-" * 79) diff --git a/src/bes/lims/tamanu/config.py b/src/bes/lims/tamanu/config.py index b9d9032..db57cfa 100644 --- a/src/bes/lims/tamanu/config.py +++ b/src/bes/lims/tamanu/config.py @@ -24,6 +24,12 @@ TAMANU_TASKS_QUEUE = "senaite.tamanu.queue.storage" +# Dead-letter store for tasks that exhausted their retries. Keeps the failed +# task id, last error and attempt count so the failure is visible (alerting) +# and the task can be inspected/re-injected after the root cause is fixed, +# instead of silently blocking the queue head forever. +TAMANU_TASKS_DEADLETTER = "senaite.tamanu.queue.deadletter" + TAMANU_SEXES = ( ("male", "m"), ("female", "f"), diff --git a/src/bes/lims/tamanu/tasks/queue.py b/src/bes/lims/tamanu/tasks/queue.py index 87b47fc..3bd5d2a 100644 --- a/src/bes/lims/tamanu/tasks/queue.py +++ b/src/bes/lims/tamanu/tasks/queue.py @@ -4,6 +4,7 @@ from BTrees.OOBTree import OOBTree from bes.lims.tamanu import logger +from bes.lims.tamanu.config import TAMANU_TASKS_DEADLETTER from bes.lims.tamanu.config import TAMANU_TASKS_QUEUE from bes.lims.tamanu.interfaces import ITamanuTask from bika.lims import api @@ -11,10 +12,26 @@ from zope.annotation.interfaces import IAnnotations from zope.component import queryAdapter +# Maximum number of times a task is retried before it is moved to the +# dead-letter store. Transient failures (Tamanu briefly unreachable) recover +# within these retries; a persistently failing task (e.g. a bad record Tamanu +# keeps rejecting) is parked instead of blocking the queue head forever. +MAX_ATTEMPTS = 5 + +# Base back-off in seconds between retries. The delay grows with the attempt +# count (attempt * RETRY_BACKOFF) so a failing task moves out of the head and +# does not get hammered every cron cycle. +RETRY_BACKOFF = 300 + def _get_tasks(): """Returns an OOBTree of pending Tamanu tasks, keyed by task_id - (``"-"``) with the scheduled-on epoch timestamp as value. + (``"-"``). + + The value is the scheduled-on epoch timestamp. For backward compatibility + the value may be a bare int (legacy entries) or a dict with ``when`` and + ``attempts`` keys (entries that have been retried at least once); use + :func:`_when` / :func:`_attempts` to read it regardless of shape. OOBTree is used (rather than a flat ``persistent.list.PersistentList``) so concurrent producers inserting distinct keys do not generate @@ -27,9 +44,41 @@ def _get_tasks(): return annotation[TAMANU_TASKS_QUEUE] +def _get_deadletter(): + """Returns an OOBTree of tasks that exhausted their retries, keyed by + task_id with a dict value (error, attempts, failed_on epoch). + """ + portal = api.get_portal() + annotation = IAnnotations(portal) + if annotation.get(TAMANU_TASKS_DEADLETTER) is None: + annotation[TAMANU_TASKS_DEADLETTER] = OOBTree() + return annotation[TAMANU_TASKS_DEADLETTER] + + +def _when(value): + """Returns the scheduled-on epoch from a queue value (int or dict) + """ + if isinstance(value, dict): + return value.get("when", 0) + return value + + +def _attempts(value): + """Returns the attempt count from a queue value (int or dict) + """ + if isinstance(value, dict): + return value.get("attempts", 0) + return 0 + + @synchronized(max_connections=1) def get(): - """Pops the next task whose scheduled time has elapsed + """Pops the next task whose scheduled time has elapsed. + + :returns: a ``(task_id, task)`` tuple. ``(None, None)`` when no task is + ready. ``(task_id, None)`` when the head task was popped but cannot be + resolved (invalid id or its context no longer exists) and should be + dropped by the caller. """ # get the tasks tasks = _get_tasks() @@ -38,13 +87,13 @@ def get(): now = int(time.time()) task_id = None - for tid, when in tasks.items(): - if when <= now: + for tid, value in tasks.items(): + if _when(value) <= now: task_id = tid break if not task_id: - return None + return None, None del tasks[task_id] @@ -56,16 +105,16 @@ def get(): # validate the task id if not all([name, api.is_uid(uid)]): logger.error("Not a valid task: %s" % task_id) - return None + return task_id, None # get the context obj = api.get_object_by_uid(uid, None) if not obj: logger.error("No object found for UID %s" % uid) - return None + return task_id, None # find an adapter for the given name - return queryAdapter(obj, ITamanuTask, name=name) + return task_id, queryAdapter(obj, ITamanuTask, name=name) @synchronized(max_connections=1) @@ -90,5 +139,51 @@ def put(name, context, delay=120): # add the task logger.info("Task %s [scheduled on %s]" % (task_id, when)) - tasks[task_id] = when + tasks[task_id] = {"when": when, "attempts": 0} return True + + +@synchronized(max_connections=1) +def fail(task_id, error=""): + """Records a failed processing attempt for the given task. + + Increments the attempt counter and either re-schedules the task with a + back-off (so it leaves the queue head and is retried later) or, once + ``MAX_ATTEMPTS`` is reached, moves it to the dead-letter store so it stops + blocking the queue while remaining visible for inspection/re-injection. + + Must be called in a fresh transaction (i.e. after aborting the failed + processing transaction), so the requeue/dead-letter is durable regardless + of what the failed task did. + + :returns: True if the task was re-scheduled, False if it was dead-lettered + """ + tasks = _get_tasks() + value = tasks.get(task_id) + if value is None: + # the task is no longer in the queue (already handled elsewhere) + return False + + attempts = _attempts(value) + 1 + now = int(time.time()) + + if attempts < MAX_ATTEMPTS: + when = now + attempts * RETRY_BACKOFF + tasks[task_id] = {"when": when, "attempts": attempts} + logger.warning( + "Task %s failed (attempt %s/%s), retry on %s: %s" + % (task_id, attempts, MAX_ATTEMPTS, when, error)) + return True + + # exhausted retries: park in the dead-letter store + del tasks[task_id] + deadletter = _get_deadletter() + deadletter[task_id] = { + "error": error, + "attempts": attempts, + "failed_on": now, + } + logger.error( + "Task %s dead-lettered after %s attempts: %s" + % (task_id, attempts, error)) + return False