Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
4a4f1d1
convert files: setup.py -> pyproject.toml
MehmedGIT Dec 22, 2025
6620d35
update ocrd dependency to 3.9.1
MehmedGIT Dec 22, 2025
bfb53af
remove: requirements.txt from docker files
MehmedGIT Dec 22, 2025
7863b88
update: docker files of broker and server
MehmedGIT Dec 22, 2025
0cd2a4b
fix: use proper package import
MehmedGIT Dec 22, 2025
b5d32b3
set: beanie and pydantic versions to >=2
MehmedGIT Dec 22, 2025
2c89579
fix: pydantic warning
MehmedGIT Dec 22, 2025
cfe45a0
replace: motor with pymongo async
MehmedGIT Dec 22, 2025
44d84ed
resolve another deprecation
MehmedGIT Dec 22, 2025
4222ede
refactor: remove class Config
MehmedGIT Dec 22, 2025
9ce2dd0
add: async versions of server tests
MehmedGIT Dec 22, 2025
a4347d0
activate all workflow tests again
MehmedGIT Dec 22, 2025
b4d12e4
revert back to motor
MehmedGIT Dec 22, 2025
df5d919
remove: motor
MehmedGIT Dec 22, 2025
d04e4e5
remove: pytest.async
MehmedGIT Dec 22, 2025
61356d4
remove: Py 3.8 and 3.9, add Py 3.13, 3.14, 3.15
MehmedGIT Dec 22, 2025
c997c1b
remove: Py 3.15
MehmedGIT Dec 22, 2025
e153dbb
ci-cd: fast fail - false
MehmedGIT Dec 22, 2025
858a76f
set requirement: core v3.10.1
MehmedGIT Jan 23, 2026
e375cdb
set beanie back to v2.0.0
MehmedGIT Jan 23, 2026
a879502
release: v2.24.0
MehmedGIT Jan 23, 2026
a3b8a8e
set: beanie v2.0.1
MehmedGIT Jan 23, 2026
8c96afe
remove py3.14 from ci/cd due to beanie v2.0.1 restriction that requir…
MehmedGIT Jan 23, 2026
97bcda1
try: add oton parent folder to resource
MehmedGIT Jan 23, 2026
d19e421
fix: add hpc parent folder to resources
MehmedGIT Jan 23, 2026
ba1e5e4
fix: remove async from broker fixture
MehmedGIT Jan 23, 2026
7acdbde
add: model default values to Optional parameters
MehmedGIT Jan 23, 2026
2b6f813
remove: obsolete version label
MehmedGIT Jan 23, 2026
1fdf39e
update: rmq to version 4.2
MehmedGIT Jan 23, 2026
cd07b36
fix: use the proper status and do check job status after data download
MehmedGIT Jan 23, 2026
c27f92a
add: rabbitmq restart streams flag
MehmedGIT Jan 23, 2026
8d4f23a
fix: rabbitmq start
MehmedGIT Jan 23, 2026
904dcc4
improve: timeouts
MehmedGIT Jan 26, 2026
0e1454f
set: ci/cd integration test timeout to 12h
MehmedGIT Jan 27, 2026
5fcce18
fix: Makefile call for harvester dummy
MehmedGIT Jan 28, 2026
6f396fc
greatly increase timeout values
MehmedGIT Jan 28, 2026
b09a3d4
Makefile: enable -s flag for tests
MehmedGIT Jan 28, 2026
b69709a
add extra prints to stdout from HPC fixtures
MehmedGIT Jan 28, 2026
f4be98a
fix: job status parsing in workers
MehmedGIT Jan 29, 2026
4f3b43f
optimize: transfer SIF only when needed in batch script
MehmedGIT Jan 29, 2026
39d340b
add: extra parsing in edge cases when the job is still PENDING in slurm
MehmedGIT Jan 29, 2026
a9fda4b
integration_test: reduce testing timeouts
MehmedGIT Jan 29, 2026
a3d27e1
update: test requirements to latest versions
MehmedGIT Jan 29, 2026
b38e4c4
use core v3.11.0
MehmedGIT Jan 29, 2026
d271de6
adapt requirements
MehmedGIT Jan 29, 2026
a5eff57
fix: test_3_hpc_test_0
MehmedGIT Jan 29, 2026
53ec823
reduce: noise from tests
MehmedGIT Jan 29, 2026
6c1655d
fix: typo in test requirements
MehmedGIT Jan 29, 2026
6c94e5d
slow down checks of job status test to 1 min
MehmedGIT Jan 29, 2026
982eaa9
remove: feature flags from rabbitmq docker compose
MehmedGIT Jan 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci_cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ jobs:
build-native:
name: Native build of Operandi modules
strategy:
fail-fast: true
fail-fast: false
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ]
python-version: [ "3.10", "3.11", "3.12", "3.13" ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -74,7 +74,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [ "3.8" ]
python-version: [ "3.12" ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ start-server-native:

start-harvester-dummy:
export $(shell sed 's/=.*//' ./.env)
operandi-harvester start-dummy --address http://localhost
operandi-harvester start-dummy --address http://localhost:8000

run-tests: run-tests-utils run-tests-broker run-tests-server run-tests-harvester run-tests-integration

Expand All @@ -119,7 +119,7 @@ run-tests-utils:

run-tests-broker:
export $(shell sed 's/=.*//' ./tests/.env)
pytest tests/tests_broker/test_*.py -s -v
pytest tests/tests_broker/test_*.py -v

run-tests-harvester:
export $(shell sed 's/=.*//' ./tests/.env)
Expand Down
10 changes: 6 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.8'

networks:
operandi:
name: operandi
Expand Down Expand Up @@ -28,9 +26,13 @@ services:
- operandi

operandi-rabbitmq:
image: "rabbitmq:3.12-management"
image: "rabbitmq:4.2-management"
container_name: operandi-rabbitmq
hostname: rabbit-mq-host
ulimits:
nofile:
soft: 65536
hard: 65536
ports:
- "5672:5672"
- "15672:15672"
Expand All @@ -45,7 +47,7 @@ services:
restart: on-failure
environment:
- RABBITMQ_SERVER_ERL_ARGS=-rabbitmq_management load_definitions "/rmq_definitions.json"
- RABBITMQ_FEATURE_FLAGS=quorum_queue,implicit_default_bindings,classic_mirrored_queue_version
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit peer_discovery_backend none
networks:
- operandi
healthcheck:
Expand Down
12 changes: 7 additions & 5 deletions docker-compose_image_based.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.8'

networks:
operandi:
name: operandi
Expand Down Expand Up @@ -28,9 +26,13 @@ services:
- operandi

operandi-rabbitmq:
image: "rabbitmq:3.12-management"
image: "rabbitmq:4.2-management"
container_name: operandi-rabbitmq
hostname: rabbit-mq-host
ulimits:
nofile:
soft: 65536
hard: 65536
ports:
- "5672:5672"
- "15672:15672"
Expand All @@ -44,8 +46,8 @@ services:
target: /rmq_definitions.json
restart: on-failure
environment:
- RABBITMQ_SERVER_ERL_ARGS=-rabbitmq_management load_definitions "/rmq_definitions.json"
- RABBITMQ_FEATURE_FLAGS=quorum_queue,implicit_default_bindings,classic_mirrored_queue_version
- RABBITMQ_SERVER_ERL_ARGS='-rabbitmq_management load_definitions "/rmq_definitions.json"'
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbit peer_discovery_backend none'
networks:
- operandi
healthcheck:
Expand Down
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto
89 changes: 57 additions & 32 deletions src/Dockerfile_broker
Original file line number Diff line number Diff line change
@@ -1,38 +1,63 @@
FROM ubuntu:22.04
# ================================
# Builder stage: build all Python packages
# ================================
FROM python:3.11-slim as builder

MAINTAINER OPERANDI
ENV DEBIAN_FRONTEND noninteractive
ENV PYTHONIOENCODING utf8
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
LABEL maintainer="OPERANDI"

COPY broker/ /usr/src/broker
COPY utils/ /usr/src/utils
ENV PYTHONIOENCODING=utf8 \
LC_ALL=C.UTF-8 \
LANG=C.UTF-8 \
DEBIAN_FRONTEND=noninteractive

WORKDIR /usr/src/broker/operandi_broker
# Install system build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
git \
curl \
make \
wget \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /usr/src

# Copy and install utils first (for caching)
COPY utils/ ./utils
RUN python3 -m pip install --upgrade pip setuptools wheel \
&& pip wheel --no-deps --wheel-dir /wheels ./utils

# Copy and install broker
COPY broker/ ./broker
RUN pip wheel --no-deps --wheel-dir /wheels ./broker

# ================================
# Final stage: minimal runtime
# ================================
FROM python:3.11-slim

ENV PYTHONIOENCODING=utf8 \
LC_ALL=C.UTF-8 \
LANG=C.UTF-8

# install dependencies
RUN apt-get update && apt-get install -y \
apt-transport-https \
ca-certificates \
curl \
git \
gnupg-agent \
make \
python3 \
python3-dev \
python3-pip \
python3-venv \
software-properties-common \
sudo \
time \
wget

RUN python3 -m pip install --upgrade pip setuptools
RUN pip3 install -U pip wheel
RUN python3 -m pip install -r /usr/src/utils/requirements.txt --ignore-installed
RUN pip3 install /usr/src/utils
RUN python3 -m pip install -r /usr/src/broker/requirements.txt --ignore-installed
RUN pip3 install /usr/src/broker
WORKDIR /usr/src

# Install runtime dependencies only (no build tools needed)
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
curl \
&& rm -rf /var/lib/apt/lists/*

# Copy wheels from builder and install
COPY --from=builder /wheels /wheels
RUN python3 -m pip install --upgrade pip setuptools wheel \
&& pip install /wheels/*

# Copy broker code (optional if entry points are installed)
COPY broker/ ./broker
WORKDIR /usr/src/broker/operandi_broker

# Build success indicator
RUN echo "Operandi broker build success"

# Optional default command
CMD ["python3", "-m", "operandi_broker", "--version"]
86 changes: 57 additions & 29 deletions src/Dockerfile_server
Original file line number Diff line number Diff line change
@@ -1,35 +1,63 @@
FROM ubuntu:22.04
# ================================
# Builder stage: build all Python packages
# ================================
FROM python:3.11-slim as builder

MAINTAINER OPERANDI
ENV DEBIAN_FRONTEND noninteractive
ENV PYTHONIOENCODING utf8
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
LABEL maintainer="OPERANDI"

COPY server/ /usr/src/server
COPY utils/ /usr/src/utils
ENV PYTHONIOENCODING=utf8 \
LC_ALL=C.UTF-8 \
LANG=C.UTF-8 \
DEBIAN_FRONTEND=noninteractive

WORKDIR /usr/src/server/operandi_server
# Install system build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
git \
curl \
make \
wget \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /usr/src

# Copy and install utils first (for caching)
COPY utils/ ./utils
RUN python3 -m pip install --upgrade pip setuptools wheel \
&& pip wheel --no-deps --wheel-dir /wheels ./utils

# Copy and install server
COPY server/ ./server
RUN pip wheel --no-deps --wheel-dir /wheels ./server

# ================================
# Final stage: minimal runtime
# ================================
FROM python:3.11-slim

ENV PYTHONIOENCODING=utf8 \
LC_ALL=C.UTF-8 \
LANG=C.UTF-8

# install dependencies
RUN apt-get update && apt-get install -y \
curl \
git \
make \
python3 \
python3-dev \
python3-pip \
python3-venv \
software-properties-common \
sudo \
time \
wget

RUN python3 -m pip install --upgrade pip setuptools
RUN pip3 install -U pip wheel
RUN python3 -m pip install -r /usr/src/utils/requirements.txt --ignore-installed
RUN pip3 install /usr/src/utils
RUN python3 -m pip install -r /usr/src/server/requirements.txt --ignore-installed
RUN pip3 install /usr/src/server
WORKDIR /usr/src

# Install runtime dependencies only (no build tools needed)
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
curl \
&& rm -rf /var/lib/apt/lists/*

# Copy wheels from builder and install
COPY --from=builder /wheels /wheels
RUN python3 -m pip install --upgrade pip setuptools wheel \
&& pip install /wheels/*

# Copy server code (optional if entry points are installed)
COPY server/ ./server
WORKDIR /usr/src/server/operandi_server

# Build success indicator
RUN echo "Operandi server build success"

# Optional default command
CMD ["python3", "-m", "operandi_server", "--version"]
17 changes: 13 additions & 4 deletions src/broker/operandi_broker/job_worker_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _consumed_msg_callback(self, ch, method, properties, body):
consumed_message = loads(body)
self.log.info(f"Consumed message: {consumed_message}")
self.current_message_job_id = consumed_message["job_id"]
previous_job_state = consumed_message["previous_job_state"]
previous_job_state: StateJob = StateJob(consumed_message["previous_job_state"])
except Exception as error:
self.log.warning(f"Parsing the consumed message has failed: {error}")
self._handle_msg_failure(interruption=False)
Expand Down Expand Up @@ -63,8 +63,11 @@ def _consumed_msg_callback(self, ch, method, properties, body):

try:
# TODO: Refactor this block of code since nothing is downloaded from the HPC when job fails.
self.log.info(f"Previous job state: {previous_job_state}")
if previous_job_state == StateJob.HPC_SUCCESS:
self.log.info(f"Downloading slurm job log file of succeeded: {slurm_job_id}")
self.hpc_io_transfer.download_slurm_job_log_file(slurm_job_id, job_dir)
self.log.info(f"Downloading results of succeeded workflow job: {job_dir}")
self.__download_results_from_hpc(job_dir=job_dir, workspace_dir=ws_dir)
self.log.info(f"Setting new workspace state `{StateWorkspace.READY}` of workspace_id: {workspace_id}")
updated_file_groups = self.__extract_updated_file_groups(db_workspace=db_workspace)
Expand All @@ -82,8 +85,10 @@ def _consumed_msg_callback(self, ch, method, properties, body):
sync_db_update_workflow_job(find_job_id=self.current_message_job_id, job_state=StateJob.SUCCESS)
self.log.info(f"Setting new workflow job state `{StateJob.SUCCESS}`"
f" of job_id: {self.current_message_job_id}")
if previous_job_state == StateJob.HPC_FAILED:
elif previous_job_state == StateJob.HPC_FAILED:
self.log.info(f"Downloading slurm job log file of failed: {slurm_job_id}")
self.hpc_io_transfer.download_slurm_job_log_file(slurm_job_id, job_dir)
self.log.info(f"Skipping downloading results of failed workflow job: {job_dir}")
self.log.info(f"Setting new workspace state `{StateWorkspace.READY}` of workspace_id: {workspace_id}")
db_workspace: DBWorkspace = sync_db_update_workspace(
find_workspace_id=workspace_id, state=StateWorkspace.READY)
Expand All @@ -99,13 +104,17 @@ def _consumed_msg_callback(self, ch, method, properties, body):
sync_db_update_workflow_job(find_job_id=self.current_message_job_id, job_state=StateJob.FAILED)
self.log.info(f"Setting new workflow job state `{StateJob.FAILED}`"
f" of job_id: {self.current_message_job_id}")
elif previous_job_state == StateJob.TRANSFERRING_FROM_HPC:
self.log.warning("Another worker instance is already downloading or has downloaded")
else:
self.log.warning(f"State not processable: {previous_job_state}")
except Exception as error:
self.log.warning(f"{error}")
self._handle_msg_failure(interruption=False)
return

self.has_consumed_message = False
self.log.debug(f"Ack delivery tag: {self.current_message_delivery_tag}")
self.log.info(f"Ack delivery tag: {self.current_message_delivery_tag}")
ch.basic_ack(delivery_tag=method.delivery_tag)

@override
Expand All @@ -119,7 +128,7 @@ def _handle_msg_failure(self, interruption: bool):
self.rmq_consumer.ack_message(delivery_tag=self.current_message_delivery_tag)
return

self.log.debug(f"Ack delivery tag: {self.current_message_delivery_tag}")
self.log.info(f"Ack delivery tag: {self.current_message_delivery_tag}")
self.rmq_consumer.ack_message(delivery_tag=self.current_message_delivery_tag)

# Reset the current message related parameters
Expand Down
10 changes: 5 additions & 5 deletions src/broker/operandi_broker/job_worker_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def __init__(self, db_url, rabbitmq_url, queue_name):

@override
def _consumed_msg_callback(self, ch, method, properties, body):
self.log.debug(f"ch: {ch}, method: {method}, properties: {properties}, body: {body}")
self.log.debug(f"Consumed message: {body}")
self.log.info(f"ch: {ch}, method: {method}, properties: {properties}, body: {body}")
self.log.info(f"Consumed message: {body}")
self.current_message_delivery_tag = method.delivery_tag
self.has_consumed_message = True

Expand Down Expand Up @@ -55,7 +55,7 @@ def _consumed_msg_callback(self, ch, method, properties, body):
return

self.has_consumed_message = False
self.log.debug(f"Ack delivery tag: {self.current_message_delivery_tag}")
self.log.info(f"Ack delivery tag: {self.current_message_delivery_tag}")
ch.basic_ack(delivery_tag=method.delivery_tag)

@override
Expand All @@ -69,7 +69,7 @@ def _handle_msg_failure(self, interruption: bool):
self.rmq_consumer.ack_message(delivery_tag=self.current_message_delivery_tag)
return

self.log.debug(f"Ack delivery tag: {self.current_message_delivery_tag}")
self.log.info(f"Ack delivery tag: {self.current_message_delivery_tag}")
self.rmq_consumer.ack_message(delivery_tag=self.current_message_delivery_tag)

# Reset the current message related parameters
Expand Down Expand Up @@ -108,7 +108,7 @@ def __handle_hpc_and_workflow_states(
if new_job_state == StateJob.HPC_SUCCESS or new_job_state == StateJob.HPC_FAILED:
sync_db_update_workspace(find_workspace_id=workspace_id, state=StateWorkspace.TRANSFERRING_FROM_HPC)
sync_db_update_workflow_job(find_job_id=job_id, job_state=StateJob.TRANSFERRING_FROM_HPC)
result_download_message = {"job_id": f"{job_id}", "previous_job_state": f"{new_job_state}"}
result_download_message = {"job_id": f"{job_id}", "previous_job_state": f"{new_job_state.value}"}
self.log.info(f"Encoding the result download RabbitMQ message: {result_download_message}")
encoded_result_download_message = dumps(result_download_message).encode(encoding="utf-8")
self.rmq_publisher.publish_to_queue(
Expand Down
Loading
Loading