Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changelog
1.0.0
-----

- #157 Make the Tamanu task queue resilient to failing tasks
- #159 Use FHIR comparator for out-of-detection-limit results sent to Tamanu
- #155 Disable notification for sample status transition to 'to_be_verified'
- #144 Adding reference results to Tamanu FHIR Observations
Expand Down
38 changes: 31 additions & 7 deletions scripts/exec_tamanu_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bes.lims.tamanu import logger
from bes.lims.tamanu.tasks import queue
from bika.lims import api
from ZODB.POSException import ConflictError


__doc__ = """
Expand Down Expand Up @@ -82,19 +83,42 @@ def main(app):
# max number of tasks to process
max_tasks = int(args.max_tasks)
for num in range(0, max_tasks):
task = queue.get()
if not task:
task_id, task = queue.get()

# no more tasks ready to be processed
if not task_id:
break

# head task popped but unresolvable (invalid id or context gone);
# commit the removal so it stops blocking the queue and move on
if task is None:
logger.warning("Dropping unresolvable task: %s" % task_id)
transaction.commit()
continue

task_name = task.__class__.__name__
logger.info("%s: %s ..." % (task_name, api.get_path(task.context)))

# try to process the task
task.process()
# do a transaction savepoint
transaction.savepoint(optimistic=True)
try:
# process the task and persist its removal + side effects together
task.process()
transaction.commit()
except ConflictError:
# transient write contention: leave the task queued untouched and
# let the next run retry it. Do not penalise the attempt count.
transaction.abort()
logger.warning(
"ConflictError on task %s; will retry next run" % task_id)
break
except Exception as e:
# a failing task must not abort the whole batch nor stay at the
# head forever. Discard its partial writes, then retry-with-backoff
# or dead-letter it in a fresh transaction.
logger.exception("Error processing task %s" % task_id)
transaction.abort()
queue.fail(task_id, error=repr(e))
transaction.commit()

transaction.commit()
logger.info("Executing Tamanu-specific tasks [DONE]")
logger.info("-" * 79)

Expand Down
6 changes: 6 additions & 0 deletions src/bes/lims/tamanu/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

TAMANU_TASKS_QUEUE = "senaite.tamanu.queue.storage"

# Dead-letter store for tasks that exhausted their retries. Keeps the failed
# task id, last error and attempt count so the failure is visible (alerting)
# and the task can be inspected/re-injected after the root cause is fixed,
# instead of silently blocking the queue head forever.
TAMANU_TASKS_DEADLETTER = "senaite.tamanu.queue.deadletter"

TAMANU_SEXES = (
("male", "m"),
("female", "f"),
Expand Down
113 changes: 104 additions & 9 deletions src/bes/lims/tamanu/tasks/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,34 @@

from BTrees.OOBTree import OOBTree
from bes.lims.tamanu import logger
from bes.lims.tamanu.config import TAMANU_TASKS_DEADLETTER
from bes.lims.tamanu.config import TAMANU_TASKS_QUEUE
from bes.lims.tamanu.interfaces import ITamanuTask
from bika.lims import api
from bika.lims.decorators import synchronized
from zope.annotation.interfaces import IAnnotations
from zope.component import queryAdapter

# Maximum number of times a task is retried before it is moved to the
# dead-letter store. Transient failures (Tamanu briefly unreachable) recover
# within these retries; a persistently failing task (e.g. a bad record Tamanu
# keeps rejecting) is parked instead of blocking the queue head forever.
MAX_ATTEMPTS = 5

# Base back-off in seconds between retries. The delay grows with the attempt
# count (attempt * RETRY_BACKOFF) so a failing task moves out of the head and
# does not get hammered every cron cycle.
RETRY_BACKOFF = 300


def _get_tasks():
"""Returns an OOBTree of pending Tamanu tasks, keyed by task_id
(``"<uid>-<name>"``) with the scheduled-on epoch timestamp as value.
(``"<uid>-<name>"``).

The value is the scheduled-on epoch timestamp. For backward compatibility
the value may be a bare int (legacy entries) or a dict with ``when`` and
``attempts`` keys (entries that have been retried at least once); use
:func:`_when` / :func:`_attempts` to read it regardless of shape.

OOBTree is used (rather than a flat ``persistent.list.PersistentList``)
so concurrent producers inserting distinct keys do not generate
Expand All @@ -27,9 +44,41 @@ def _get_tasks():
return annotation[TAMANU_TASKS_QUEUE]


def _get_deadletter():
"""Returns an OOBTree of tasks that exhausted their retries, keyed by
task_id with a dict value (error, attempts, failed_on epoch).
"""
portal = api.get_portal()
annotation = IAnnotations(portal)
if annotation.get(TAMANU_TASKS_DEADLETTER) is None:
annotation[TAMANU_TASKS_DEADLETTER] = OOBTree()
return annotation[TAMANU_TASKS_DEADLETTER]


def _when(value):
"""Returns the scheduled-on epoch from a queue value (int or dict)
"""
if isinstance(value, dict):
return value.get("when", 0)
return value


def _attempts(value):
"""Returns the attempt count from a queue value (int or dict)
"""
if isinstance(value, dict):
return value.get("attempts", 0)
return 0


@synchronized(max_connections=1)
def get():
"""Pops the next task whose scheduled time has elapsed
"""Pops the next task whose scheduled time has elapsed.

:returns: a ``(task_id, task)`` tuple. ``(None, None)`` when no task is
ready. ``(task_id, None)`` when the head task was popped but cannot be
resolved (invalid id or its context no longer exists) and should be
dropped by the caller.
"""
# get the tasks
tasks = _get_tasks()
Expand All @@ -38,13 +87,13 @@ def get():
now = int(time.time())

task_id = None
for tid, when in tasks.items():
if when <= now:
for tid, value in tasks.items():
if _when(value) <= now:
task_id = tid
break

if not task_id:
return None
return None, None

del tasks[task_id]

Expand All @@ -56,16 +105,16 @@ def get():
# validate the task id
if not all([name, api.is_uid(uid)]):
logger.error("Not a valid task: %s" % task_id)
return None
return task_id, None

# get the context
obj = api.get_object_by_uid(uid, None)
if not obj:
logger.error("No object found for UID %s" % uid)
return None
return task_id, None

# find an adapter for the given name
return queryAdapter(obj, ITamanuTask, name=name)
return task_id, queryAdapter(obj, ITamanuTask, name=name)


@synchronized(max_connections=1)
Expand All @@ -90,5 +139,51 @@ def put(name, context, delay=120):

# add the task
logger.info("Task %s [scheduled on %s]" % (task_id, when))
tasks[task_id] = when
tasks[task_id] = {"when": when, "attempts": 0}
return True


@synchronized(max_connections=1)
def fail(task_id, error=""):
"""Records a failed processing attempt for the given task.

Increments the attempt counter and either re-schedules the task with a
back-off (so it leaves the queue head and is retried later) or, once
``MAX_ATTEMPTS`` is reached, moves it to the dead-letter store so it stops
blocking the queue while remaining visible for inspection/re-injection.

Must be called in a fresh transaction (i.e. after aborting the failed
processing transaction), so the requeue/dead-letter is durable regardless
of what the failed task did.

:returns: True if the task was re-scheduled, False if it was dead-lettered
"""
tasks = _get_tasks()
value = tasks.get(task_id)
if value is None:
# the task is no longer in the queue (already handled elsewhere)
return False

attempts = _attempts(value) + 1
now = int(time.time())

if attempts < MAX_ATTEMPTS:
when = now + attempts * RETRY_BACKOFF
tasks[task_id] = {"when": when, "attempts": attempts}
logger.warning(
"Task %s failed (attempt %s/%s), retry on %s: %s"
% (task_id, attempts, MAX_ATTEMPTS, when, error))
return True

# exhausted retries: park in the dead-letter store
del tasks[task_id]
deadletter = _get_deadletter()
deadletter[task_id] = {
"error": error,
"attempts": attempts,
"failed_on": now,
}
logger.error(
"Task %s dead-lettered after %s attempts: %s"
% (task_id, attempts, error))
return False