From 717fddb7d5dc909860dcbbdf48adc7b60f8f01e8 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 13 May 2026 14:57:58 +0200 Subject: [PATCH 01/16] First version --- conf/topic_schemas/status_change.json | 141 ++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 conf/topic_schemas/status_change.json diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json new file mode 100644 index 0000000..cfbec06 --- /dev/null +++ b/conf/topic_schemas/status_change.json @@ -0,0 +1,141 @@ +{ + "type": "object", + "properties": { + "event_id": { + "type": "string", + "description": "Unique identifier for the event (GUID)" + }, + "tenant_id": { + "type": "string", + "description": "Application ID or ServiceNow identifier" + }, + "source_app": { + "type": "string", + "description": " Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_event": { + "type": "number", + "description": "Timestamp of the event in epoch milliseconds" + }, + "country": { + "type": "string", + "description": "The country the data is related to." + }, + "execution_id": { + "type": "string", + "format": "uuid", + "description": "Primary execution identifier (UUID)." + }, + "execution_parent_id": { + "type": [ + "string", + "null" + ], + "description": "Optional parent execution identifier (UUID)." + }, + "execution_group_id": { + "type": "string", + "format": "uuid", + "description": "Execution group identifier (UUID), may reference an execution id." + }, + "attempt_number": { + "type": "integer", + "minimum": 1, + "description": "Attempt number for this execution." + }, + "platform": { + "type": "string", + "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." + }, + "platform_metadata": { + "type": "object", + "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." + }, + "input_arguments": { + "type": "object", + "description": "Arguments passed to the job." + }, + "definition_id": { + "type": "string", + "description": "Definition (Pipeline, Domain, Process) identifier." + }, + "definition_version": { + "type": [ + "string", + "null" + ], + "description": "Optional definition version." + }, + "definition_name": { + "type": "string", + "description": "Human-readable definition name." + }, + "status_type": { + "type": "string", + "enum": [ + "WAITING", + "RUNNING", + "SUCCEEDED", + "FAILED" + ], + "description": "High-level status type." + }, + "status_subtype": { + "type": [ + "string", + "null" + ], + "description": "Optional status subtype, e.g. NO_DATA or error code." + }, + "status_detail": { + "type": [ + "string", + "null" + ], + "description": "Optional human-readable status detail." + }, + "timestamp_created_at": { + "type": "number", + "description": "Timestamp when an execution was created in epoch milliseconds" + }, + "timestamp_started_at": { + "type": "number", + "description": "Timestamp when an execution was started in epoch milliseconds" + }, + "timestamp_finished_at": { + "type": "number", + "description": "Timestamp when an execution was finished in epoch milliseconds" + }, + "timestamp_last_updated_at": { + "type": "number", + "description": "Timestamp when an execution status was last updated in epoch milliseconds." + }, + "additional_context": { + "type": "object", + "description": "Additional context payload." + } + }, + "required": [ + "execution_id", + "execution_group_id", + "attempt_number", + "source_app", + "source_app_version", + "environment", + "platform", + "definition_id", + "definition_name", + "status_type", + "created_at", + "started_at", + "last_updated_at" + ] +} \ No newline at end of file From f607b5882d87c5a0c3a701e32c8b8d19187311c8 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 13 May 2026 16:05:39 +0200 Subject: [PATCH 02/16] Add status_change schema --- conf/access.json | 3 +- conf/topic_schemas/status_change.json | 112 +++++++++++++--------- src/handlers/handler_topic.py | 4 +- src/utils/config_loader.py | 3 +- src/utils/constants.py | 1 + tests/unit/handlers/test_handler_topic.py | 5 +- tests/unit/utils/test_config_loader.py | 8 +- 7 files changed, 84 insertions(+), 52 deletions(-) diff --git a/conf/access.json b/conf/access.json index bb21ccc..e44a8f9 100644 --- a/conf/access.json +++ b/conf/access.json @@ -9,5 +9,6 @@ } }, "public.cps.za.dlchange": ["FooUser", "BarUser"], - "public.cps.za.test": ["TestUser"] + "public.cps.za.test": ["TestUser"], + "public.cps.za.status-change": ["TestUser"] } diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index cfbec06..8b5150f 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -1,9 +1,20 @@ { "type": "object", "properties": { + "event_type": { + "type": "string", + "enum": [ + "ExecutionCreatedEvent", + "ExecutionStartedEvent", + "ExecutionUpdatedEvent", + "ExecutionFinishedEvent" + ], + "description": "Lifecycle event type for execution status changes." + }, "event_id": { "type": "string", - "description": "Unique identifier for the event (GUID)" + "format": "uuid", + "description": "Unique identifier for the event (UUID)" }, "tenant_id": { "type": "string", @@ -27,7 +38,7 @@ }, "country": { "type": "string", - "description": "The country the data is related to." + "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." }, "execution_id": { "type": "string", @@ -39,6 +50,7 @@ "string", "null" ], + "format": "uuid", "description": "Optional parent execution identifier (UUID)." }, "execution_group_id": { @@ -68,10 +80,7 @@ "description": "Definition (Pipeline, Domain, Process) identifier." }, "definition_version": { - "type": [ - "string", - "null" - ], + "type": "string", "description": "Optional definition version." }, "definition_name": { @@ -84,39 +93,18 @@ "WAITING", "RUNNING", "SUCCEEDED", - "FAILED" + "FAILED", + "KILLED" ], - "description": "High-level status type." + "description": "High-level status type for the current lifecycle event." }, "status_subtype": { - "type": [ - "string", - "null" - ], + "type": "string", "description": "Optional status subtype, e.g. NO_DATA or error code." }, "status_detail": { - "type": [ - "string", - "null" - ], - "description": "Optional human-readable status detail." - }, - "timestamp_created_at": { - "type": "number", - "description": "Timestamp when an execution was created in epoch milliseconds" - }, - "timestamp_started_at": { - "type": "number", - "description": "Timestamp when an execution was started in epoch milliseconds" - }, - "timestamp_finished_at": { - "type": "number", - "description": "Timestamp when an execution was finished in epoch milliseconds" - }, - "timestamp_last_updated_at": { - "type": "number", - "description": "Timestamp when an execution status was last updated in epoch milliseconds." + "type": "string", + "description": "Optional human-readable status detail, e.g. short error message." }, "additional_context": { "type": "object", @@ -124,18 +112,52 @@ } }, "required": [ + "event_type", + "event_id", "execution_id", - "execution_group_id", - "attempt_number", - "source_app", - "source_app_version", - "environment", - "platform", - "definition_id", - "definition_name", - "status_type", - "created_at", - "started_at", - "last_updated_at" + "timestamp_event" + ], + "allOf": [ + { + "if": { + "properties": { + "event_type": { + "const": "ExecutionCreatedEvent" + } + } + }, + "then": { + "required": [ + "source_app", + "source_app_version", + "environment", + "platform", + "input_arguments" + ] + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "ExecutionFinishedEvent" + } + } + }, + "then": { + "required": [ + "status_type" + ], + "properties": { + "status_type": { + "enum": [ + "SUCCEEDED", + "FAILED", + "KILLED" + ] + } + } + } + } ] } \ No newline at end of file diff --git a/src/handlers/handler_topic.py b/src/handlers/handler_topic.py index 5426805..6b8cd5d 100644 --- a/src/handlers/handler_topic.py +++ b/src/handlers/handler_topic.py @@ -29,7 +29,7 @@ from src.handlers.handler_token import HandlerToken from src.utils.conf_path import CONF_DIR from src.utils.config_loader import TopicAccessMap, load_access_config -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST from src.utils.utils import build_error_response from src.writers.writer import WriteError, Writer @@ -75,6 +75,8 @@ def with_load_topic_schemas(self) -> "HandlerTopic": self.topics[TOPIC_DLCHANGE] = json.load(file) with open(os.path.join(topic_schemas_dir, "test.json"), "r", encoding="utf-8") as file: self.topics[TOPIC_TEST] = json.load(file) + with open(os.path.join(topic_schemas_dir, "status_change.json"), "r", encoding="utf-8") as file: + self.topics[TOPIC_STATUS_CHANGE] = json.load(file) logger.debug("Loaded topic schemas successfully.") return self diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py index 673bcbf..f7c01b0 100644 --- a/src/utils/config_loader.py +++ b/src/utils/config_loader.py @@ -24,7 +24,7 @@ from boto3.resources.base import ServiceResource -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST logger = logging.getLogger(__name__) @@ -150,6 +150,7 @@ def load_topic_names(conf_dir: str) -> list[str]: "runs.json": TOPIC_RUNS, "dlchange.json": TOPIC_DLCHANGE, "test.json": TOPIC_TEST, + "status_change.json": TOPIC_STATUS_CHANGE, } schemas_dir = os.path.join(conf_dir, "topic_schemas") topics: list[str] = [] diff --git a/src/utils/constants.py b/src/utils/constants.py index 425b376..0110dc8 100644 --- a/src/utils/constants.py +++ b/src/utils/constants.py @@ -37,6 +37,7 @@ TOPIC_RUNS = "public.cps.za.runs" TOPIC_DLCHANGE = "public.cps.za.dlchange" TOPIC_TEST = "public.cps.za.test" +TOPIC_STATUS_CHANGE = "public.cps.za.status-change" SUPPORTED_WRITE_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS, TOPIC_DLCHANGE, TOPIC_TEST}) SUPPORTED_STATS_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS}) diff --git a/tests/unit/handlers/test_handler_topic.py b/tests/unit/handlers/test_handler_topic.py index fc4a5b7..8fa292c 100644 --- a/tests/unit/handlers/test_handler_topic.py +++ b/tests/unit/handlers/test_handler_topic.py @@ -87,6 +87,7 @@ def test_load_topic_schemas_success(): "runs.json": {"type": "object", "properties": {"run_id": {"type": "string"}}}, "dlchange.json": {"type": "object", "properties": {"change_id": {"type": "string"}}}, "test.json": {"type": "object", "properties": {"event_id": {"type": "string"}}}, + "status_change.json": {"type": "object", "properties": {"execution_id": {"type": "string"}}}, } def mock_open_side_effect(file_path, *_args, **_kwargs): @@ -99,10 +100,11 @@ def mock_open_side_effect(file_path, *_args, **_kwargs): result = handler.with_load_topic_schemas() assert result is handler - assert 3 == len(handler.topics) + assert 4 == len(handler.topics) assert "public.cps.za.runs" in handler.topics assert "public.cps.za.dlchange" in handler.topics assert "public.cps.za.test" in handler.topics + assert "public.cps.za.status-change" in handler.topics ## get_topics_list() @@ -112,6 +114,7 @@ def test_get_topics(event_gate_module, make_event): assert 200 == resp["statusCode"] body = json.loads(resp["body"]) assert "public.cps.za.test" in body + assert "public.cps.za.status-change" in body ## get_topic_schema() diff --git a/tests/unit/utils/test_config_loader.py b/tests/unit/utils/test_config_loader.py index 82301f7..3bc2979 100644 --- a/tests/unit/utils/test_config_loader.py +++ b/tests/unit/utils/test_config_loader.py @@ -46,6 +46,7 @@ def conf_dir(tmp_path: Path) -> str: (schemas_dir / "runs.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "dlchange.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "test.json").write_text('{"type": "object"}', encoding="utf-8") + (schemas_dir / "status_change.json").write_text('{"type": "object"}', encoding="utf-8") return str(tmp_path) @@ -101,13 +102,14 @@ class TestLoadTopicNames: """Tests for load_topic_names().""" def test_discovers_all_topics(self, conf_dir: str) -> None: - """Test that all three topics are discovered from schema files.""" + """Test that all configured topics are discovered from schema files.""" result = load_topic_names(conf_dir) - assert 3 == len(result) + assert 4 == len(result) assert "public.cps.za.runs" in result assert "public.cps.za.dlchange" in result assert "public.cps.za.test" in result + assert "public.cps.za.status-change" in result def test_missing_schema_file_excluded(self, conf_dir: str) -> None: """Test that a missing schema file excludes that topic.""" @@ -115,7 +117,7 @@ def test_missing_schema_file_excluded(self, conf_dir: str) -> None: result = load_topic_names(conf_dir) - assert 2 == len(result) + assert 3 == len(result) assert "public.cps.za.test" not in result def test_empty_schemas_dir(self, tmp_path: Path) -> None: From 2093de907213d6688196d0a62b83592359e14268 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 20 May 2026 15:29:35 +0200 Subject: [PATCH 03/16] wip --- conf/topic_schemas/status_change.json | 141 +++++++++++++++++++------- 1 file changed, 105 insertions(+), 36 deletions(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index 8b5150f..ef4f8d3 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -4,28 +4,45 @@ "event_type": { "type": "string", "enum": [ - "ExecutionCreatedEvent", - "ExecutionStartedEvent", - "ExecutionUpdatedEvent", - "ExecutionFinishedEvent" + "JobCreatedEvent", + "JobCreatedAndStartedEvent", + "JobStartedEvent", + "JobUpdatedEvent", + "JobFinishedEvent" ], - "description": "Lifecycle event type for execution status changes." + "description": "Lifecycle event type for job status changes." }, "event_id": { "type": "string", "format": "uuid", "description": "Unique identifier for the event (UUID)" }, + "job_ref": { + "type": [ + "string", + "null" + ], + "description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." + }, "tenant_id": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Application ID or ServiceNow identifier" }, "source_app": { - "type": "string", + "type": [ + "string", + "null" + ], "description": " Standardized source application name (aqueduct, unify, lum, etc)" }, "source_app_version": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Source application version (SemVer preferred)" }, "environment": { @@ -37,92 +54,144 @@ "description": "Timestamp of the event in epoch milliseconds" }, "country": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." }, - "execution_id": { + "job_id": { "type": "string", "format": "uuid", - "description": "Primary execution identifier (UUID)." + "description": "Primary job identifier (UUID)." }, - "execution_parent_id": { + "parent_job_id": { "type": [ "string", "null" ], "format": "uuid", - "description": "Optional parent execution identifier (UUID)." + "description": "Optional parent job identifier (UUID), to represent nested job hierarchies." }, - "execution_group_id": { - "type": "string", + "initial_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional initial job identifier (UUID), to represent retried or replayed jobs." + }, + "job_group_id": { + "type": [ + "string", + "null" + ], "format": "uuid", - "description": "Execution group identifier (UUID), may reference an execution id." + "description": "Job group identifier (UUID), may or may not reference a job id." + }, + "job_name": { + "type": [ + "string", + "null" + ], + "description": "Human-readable job name." }, "attempt_number": { - "type": "integer", + "type": [ + "integer", + "null" + ], "minimum": 1, - "description": "Attempt number for this execution." + "description": "Attempt number for this job." }, "platform": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." }, "platform_metadata": { - "type": "object", + "type": [ + "object", + "null" + ], "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." }, "input_arguments": { - "type": "object", + "type": [ + "object", + "null" + ], "description": "Arguments passed to the job." }, "definition_id": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Definition (Pipeline, Domain, Process) identifier." }, "definition_version": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Optional definition version." }, - "definition_name": { - "type": "string", - "description": "Human-readable definition name." - }, "status_type": { - "type": "string", + "type": [ + "string", + "null" + ], "enum": [ "WAITING", "RUNNING", "SUCCEEDED", "FAILED", - "KILLED" + "KILLED", + null ], "description": "High-level status type for the current lifecycle event." }, "status_subtype": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Optional status subtype, e.g. NO_DATA or error code." }, "status_detail": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Optional human-readable status detail, e.g. short error message." }, "additional_context": { - "type": "object", + "type": [ + "object", + "null" + ], "description": "Additional context payload." } }, "required": [ "event_type", "event_id", - "execution_id", - "timestamp_event" + "job_id", + "timestamp_event", + "environment" ], "allOf": [ { "if": { "properties": { "event_type": { - "const": "ExecutionCreatedEvent" + "enum": [ + "JobCreatedEvent", + "JobCreatedAndStartedEvent" + ] } } }, @@ -140,7 +209,7 @@ "if": { "properties": { "event_type": { - "const": "ExecutionFinishedEvent" + "const": "JobFinishedEvent" } } }, From a6dc8b4e4b397f98be999aecf97083faca4e3178 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 20 May 2026 15:40:25 +0200 Subject: [PATCH 04/16] newline --- conf/topic_schemas/status_change.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index ef4f8d3..3f9b283 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -229,4 +229,4 @@ } } ] -} \ No newline at end of file +} From 0c8deb23ba046ce665342c7ab422a4b5beab4a8f Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 20 May 2026 16:49:55 +0200 Subject: [PATCH 05/16] Update conf/topic_schemas/status_change.json Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- conf/topic_schemas/status_change.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index 3f9b283..ee9fedc 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -50,7 +50,8 @@ "description": "Environment (dev, uat, pre-prod, prod, test or others)" }, "timestamp_event": { - "type": "number", + "type": "integer", + "minimum": 0, "description": "Timestamp of the event in epoch milliseconds" }, "country": { From c6e67502d377e3bba0a5769b5fb8b26bc6a959e2 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 21 May 2026 13:38:07 +0200 Subject: [PATCH 06/16] Make source_app mandatory --- conf/topic_schemas/status_change.json | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index ee9fedc..b31e685 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -32,17 +32,11 @@ "description": "Application ID or ServiceNow identifier" }, "source_app": { - "type": [ - "string", - "null" - ], + "type": "string", "description": " Standardized source application name (aqueduct, unify, lum, etc)" }, "source_app_version": { - "type": [ - "string", - "null" - ], + "type": "string", "description": "Source application version (SemVer preferred)" }, "environment": { @@ -181,6 +175,8 @@ "event_type", "event_id", "job_id", + "source_app", + "source_app_version", "timestamp_event", "environment" ], From bd315a36795f85a3814000471d992220f5ea9261 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 27 May 2026 23:57:53 +0200 Subject: [PATCH 07/16] Update src/utils/constants.py Co-authored-by: Oto Macenauer <112095903+oto-macenauer-absa@users.noreply.github.com> --- src/utils/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/constants.py b/src/utils/constants.py index 0110dc8..f0af65a 100644 --- a/src/utils/constants.py +++ b/src/utils/constants.py @@ -37,7 +37,7 @@ TOPIC_RUNS = "public.cps.za.runs" TOPIC_DLCHANGE = "public.cps.za.dlchange" TOPIC_TEST = "public.cps.za.test" -TOPIC_STATUS_CHANGE = "public.cps.za.status-change" +TOPIC_STATUS_CHANGE = "public.cps.za.status_change" SUPPORTED_WRITE_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS, TOPIC_DLCHANGE, TOPIC_TEST}) SUPPORTED_STATS_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS}) From b3f9e9a8c2e271b34887c880658fdd87e14cfe6e Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 28 May 2026 00:18:05 +0200 Subject: [PATCH 08/16] address some pr comments --- .vscode/settings.json | 7 ++ conf/access.json | 2 +- conf/topic_schemas/status_change.json | 92 ++++++++++++++++++----- tests/unit/handlers/test_handler_topic.py | 4 +- tests/unit/utils/test_config_loader.py | 2 +- 5 files changed, 86 insertions(+), 21 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..9b38853 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} \ No newline at end of file diff --git a/conf/access.json b/conf/access.json index e44a8f9..9d80b59 100644 --- a/conf/access.json +++ b/conf/access.json @@ -10,5 +10,5 @@ }, "public.cps.za.dlchange": ["FooUser", "BarUser"], "public.cps.za.test": ["TestUser"], - "public.cps.za.status-change": ["TestUser"] + "public.cps.za.status_change": ["TestUser"] } diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index b31e685..8d014a3 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -85,10 +85,7 @@ "description": "Job group identifier (UUID), may or may not reference a job id." }, "job_name": { - "type": [ - "string", - "null" - ], + "type": "string", "description": "Human-readable job name." }, "attempt_number": { @@ -121,10 +118,7 @@ "description": "Arguments passed to the job." }, "definition_id": { - "type": [ - "string", - "null" - ], + "type": "string", "description": "Definition (Pipeline, Domain, Process) identifier." }, "definition_version": { @@ -175,10 +169,13 @@ "event_type", "event_id", "job_id", + "job_name", "source_app", "source_app_version", "timestamp_event", - "environment" + "environment", + "definition_id", + "status_type" ], "allOf": [ { @@ -186,34 +183,95 @@ "properties": { "event_type": { "enum": [ - "JobCreatedEvent", - "JobCreatedAndStartedEvent" + "JobCreatedEvent" ] } } }, "then": { "required": [ - "source_app", - "source_app_version", - "environment", "platform", "input_arguments" - ] + ], + "properties": { + "status_type": { + "enum": [ + "WAITING" + ] + } + } } }, { "if": { "properties": { "event_type": { - "const": "JobFinishedEvent" + "enum": [ + "JobCreatedAndStartedEvent" + ] } } }, "then": { "required": [ - "status_type" + "platform", + "input_arguments" ], + "properties": { + "status_type": { + "enum": [ + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobStartedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobUpdatedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "WAITING", + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobFinishedEvent" + } + } + }, + "then": { "properties": { "status_type": { "enum": [ diff --git a/tests/unit/handlers/test_handler_topic.py b/tests/unit/handlers/test_handler_topic.py index 8fa292c..9d2fa32 100644 --- a/tests/unit/handlers/test_handler_topic.py +++ b/tests/unit/handlers/test_handler_topic.py @@ -104,7 +104,7 @@ def mock_open_side_effect(file_path, *_args, **_kwargs): assert "public.cps.za.runs" in handler.topics assert "public.cps.za.dlchange" in handler.topics assert "public.cps.za.test" in handler.topics - assert "public.cps.za.status-change" in handler.topics + assert "public.cps.za.status_change" in handler.topics ## get_topics_list() @@ -114,7 +114,7 @@ def test_get_topics(event_gate_module, make_event): assert 200 == resp["statusCode"] body = json.loads(resp["body"]) assert "public.cps.za.test" in body - assert "public.cps.za.status-change" in body + assert "public.cps.za.status_change" in body ## get_topic_schema() diff --git a/tests/unit/utils/test_config_loader.py b/tests/unit/utils/test_config_loader.py index 3bc2979..361656b 100644 --- a/tests/unit/utils/test_config_loader.py +++ b/tests/unit/utils/test_config_loader.py @@ -109,7 +109,7 @@ def test_discovers_all_topics(self, conf_dir: str) -> None: assert "public.cps.za.runs" in result assert "public.cps.za.dlchange" in result assert "public.cps.za.test" in result - assert "public.cps.za.status-change" in result + assert "public.cps.za.status_change" in result def test_missing_schema_file_excluded(self, conf_dir: str) -> None: """Test that a missing schema file excludes that topic.""" From f2d7278da7d5c41d65a158c3f81f72402e7a86ed Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 28 May 2026 01:00:22 +0200 Subject: [PATCH 09/16] Move required properties that are constant per job to event start types --- conf/topic_schemas/status_change.json | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index 8d014a3..bb4b7de 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -169,12 +169,6 @@ "event_type", "event_id", "job_id", - "job_name", - "source_app", - "source_app_version", - "timestamp_event", - "environment", - "definition_id", "status_type" ], "allOf": [ @@ -190,6 +184,12 @@ }, "then": { "required": [ + "job_name", + "source_app", + "source_app_version", + "timestamp_event", + "environment", + "definition_id", "platform", "input_arguments" ], @@ -214,6 +214,12 @@ }, "then": { "required": [ + "job_name", + "source_app", + "source_app_version", + "timestamp_event", + "environment", + "definition_id", "platform", "input_arguments" ], From 84f9b240688ee30fb3bf31a65a538fceab2b8498 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 28 May 2026 13:29:22 +0200 Subject: [PATCH 10/16] Make status_type non-null --- conf/topic_schemas/status_change.json | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index bb4b7de..eaa6f4d 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -129,17 +129,13 @@ "description": "Optional definition version." }, "status_type": { - "type": [ - "string", - "null" - ], + "type": "string", "enum": [ "WAITING", "RUNNING", "SUCCEEDED", "FAILED", - "KILLED", - null + "KILLED" ], "description": "High-level status type for the current lifecycle event." }, From 12de355b3edf27f6baf989f3bd616edea6a0017b Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 28 May 2026 13:49:17 +0200 Subject: [PATCH 11/16] Add topic keys configuration --- conf/config.json | 1 + conf/topic_keys.json | 3 + src/event_gate_lambda.py | 7 +- src/handlers/handler_topic.py | 37 +++++++++- src/utils/config_loader.py | 71 ++++++++++++++---- src/writers/writer.py | 3 +- src/writers/writer_eventbridge.py | 3 +- src/writers/writer_kafka.py | 5 +- src/writers/writer_postgres.py | 3 +- tests/unit/handlers/test_handler_topic.py | 90 +++++++++++++++++++++++ tests/unit/utils/test_config_loader.py | 82 ++++++++++++++++++++- tests/unit/writers/test_writer_kafka.py | 10 +++ 12 files changed, 292 insertions(+), 23 deletions(-) create mode 100644 conf/topic_keys.json diff --git a/conf/config.json b/conf/config.json index 9770b27..2265cdc 100644 --- a/conf/config.json +++ b/conf/config.json @@ -1,5 +1,6 @@ { "access_config": "s3:///access.json", + "topic_keys_config": "conf/topic_keys.json", "token_provider_url": "https://", "token_public_keys_url": "https://", "kafka_bootstrap_server": "localhost:9092", diff --git a/conf/topic_keys.json b/conf/topic_keys.json new file mode 100644 index 0000000..db0a690 --- /dev/null +++ b/conf/topic_keys.json @@ -0,0 +1,3 @@ +{ + "public.cps.za.status_change": "job_id" +} diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 9861496..7fbae8e 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -64,7 +64,12 @@ # Initialize EventGate handlers handler_token = HandlerToken(config).with_public_keys_queried() -handler_topic = HandlerTopic(config, aws_s3, handler_token, writers).with_load_access_config().with_load_topic_schemas() +handler_topic = ( + HandlerTopic(config, aws_s3, handler_token, writers) + .with_load_access_config() + .with_load_topic_keys_config() + .with_load_topic_schemas() +) handler_health = HandlerHealth(writers) handler_api = HandlerApi().with_api_definition_loaded() diff --git a/src/handlers/handler_topic.py b/src/handlers/handler_topic.py index 6b8cd5d..34b0107 100644 --- a/src/handlers/handler_topic.py +++ b/src/handlers/handler_topic.py @@ -28,7 +28,7 @@ from src.handlers.handler_token import HandlerToken from src.utils.conf_path import CONF_DIR -from src.utils.config_loader import TopicAccessMap, load_access_config +from src.utils.config_loader import TopicAccessMap, TopicKeyMap, load_access_config, load_topic_keys_config from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST from src.utils.utils import build_error_response from src.writers.writer import WriteError, Writer @@ -51,6 +51,7 @@ def __init__( self.handler_token = handler_token self.writers = writers self.access_config: TopicAccessMap = {} + self.topic_keys: TopicKeyMap = {} self.topics: dict[str, dict[str, Any]] = {} def with_load_access_config(self) -> "HandlerTopic": @@ -81,6 +82,14 @@ def with_load_topic_schemas(self) -> "HandlerTopic": logger.debug("Loaded topic schemas successfully.") return self + def with_load_topic_keys_config(self) -> "HandlerTopic": + """Load topic key mapping configuration from S3 or local file. + Returns: + The current instance with loaded topic key config. + """ + self.topic_keys = load_topic_keys_config(self.config, self.aws_s3) + return self + def get_topics_list(self) -> dict[str, Any]: """Return the list of available topics. Returns: @@ -171,10 +180,11 @@ def _post_topic_message(self, topic_name: str, topic_message: dict[str, Any], to except ValidationError as exc: return build_error_response(400, "validation", exc.message) + message_key = self._resolve_message_key(topic_name, topic_message) errors = [] for writer_name, writer in self.writers.items(): try: - writer.write(topic_name, topic_message) + writer.write(topic_name, topic_message, message_key) except WriteError as exc: errors.append({"type": writer_name, "message": str(exc)}) @@ -217,3 +227,26 @@ def _validate_user_permissions( return False, f"Field '{restricted_field}' value not permitted for user '{user}'" return True, None + + def _resolve_message_key(self, topic_name: str, message: dict[str, Any]) -> str: + """Resolve topic key value from message using configured field mapping. + Args: + topic_name: Target topic name. + message: Topic payload. + Returns: + String key value for writers, or empty string when no key is configured/resolvable. + """ + key_field = self.topic_keys.get(topic_name) + if not key_field: + return "" + + key_value = message.get(key_field) + if key_value is None: + logger.warning("Topic key field '%s' missing for topic '%s'.", key_field, topic_name) + return "" + + if isinstance(key_value, (dict, list)): + logger.warning("Topic key field '%s' for topic '%s' is not scalar.", key_field, topic_name) + return "" + + return str(key_value) diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py index f7c01b0..6bc4adf 100644 --- a/src/utils/config_loader.py +++ b/src/utils/config_loader.py @@ -30,6 +30,7 @@ FieldPatterns = dict[str, list[re.Pattern[str]]] TopicAccessMap = dict[str, dict[str, FieldPatterns]] +TopicKeyMap = dict[str, str] def load_config(conf_dir: str) -> dict[str, Any]: @@ -46,6 +47,24 @@ def load_config(conf_dir: str) -> dict[str, Any]: return config +def _load_json_from_path(path: str, aws_s3: ServiceResource) -> dict[str, Any]: + """Load JSON data from either S3 (`s3://...`) or local file path. + Args: + path: JSON source path. + aws_s3: Boto3 S3 resource used for S3-backed paths. + Returns: + Parsed JSON object. + """ + if path.startswith("s3://"): + name_parts = path.split("/") + bucket_name = name_parts[2] + bucket_object_key = "/".join(name_parts[3:]) + return json.loads(aws_s3.Bucket(bucket_name).Object(bucket_object_key).get()["Body"].read().decode("utf-8")) + + with open(path, "r", encoding="utf-8") as file: + return json.load(file) + + def _compile_topic_patterns( topic: str, user_constraints: dict[str, dict[str, list[str]]], @@ -121,24 +140,50 @@ def load_access_config(config: dict[str, Any], aws_s3: ServiceResource) -> Topic """ access_path: str = config["access_config"] logger.debug("Loading access configuration from %s.", access_path) - - access_data: dict[str, Any] = {} - - if access_path.startswith("s3://"): - name_parts = access_path.split("/") - bucket_name = name_parts[2] - bucket_object_key = "/".join(name_parts[3:]) - access_data = json.loads( - aws_s3.Bucket(bucket_name).Object(bucket_object_key).get()["Body"].read().decode("utf-8") - ) - else: - with open(access_path, "r", encoding="utf-8") as file: - access_data = json.load(file) + access_data = _load_json_from_path(access_path, aws_s3) logger.debug("Loaded access configuration.") return _normalize_access_config(access_data) +def _validate_topic_keys_config(topic_key_data: dict[str, Any]) -> None: + """Validate topic key mapping config. + Args: + topic_key_data: Parsed JSON mapping of topic to event property name. + Returns: + None + Raises: + ValueError: If any topic key mapping is not a string. + """ + for topic, field_name in topic_key_data.items(): + if not isinstance(field_name, str): + raise ValueError( + f"Topic '{topic}': expected key field name as string, got {type(field_name).__name__}." + ) + return + + +def load_topic_keys_config(config: dict[str, Any], aws_s3: ServiceResource) -> TopicKeyMap: + """Load topic key configuration from S3 or a local file. + Args: + config: Main configuration dict. If `topic_keys_config` is absent, returns empty mapping. + aws_s3: Boto3 S3 resource for loading from S3 paths. + Returns: + Mapping of topic name to message property used as Kafka key. + """ + topic_keys_path = config.get("topic_keys_config") + if not topic_keys_path: + logger.debug("No topic_keys_config configured. Using empty topic key mapping.") + return {} + + logger.debug("Loading topic key configuration from %s.", topic_keys_path) + topic_key_data = _load_json_from_path(topic_keys_path, aws_s3) + + logger.debug("Loaded topic key configuration.") + _validate_topic_keys_config(topic_key_data) + return topic_key_data + + def load_topic_names(conf_dir: str) -> list[str]: """Discover topic names from the topic_schemas directory. Args: diff --git a/src/writers/writer.py b/src/writers/writer.py index af5f7eb..0269714 100644 --- a/src/writers/writer.py +++ b/src/writers/writer.py @@ -38,11 +38,12 @@ def __init__(self, config: dict[str, Any]) -> None: self.config = config @abstractmethod - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Publish a message to the target system. Args: topic_name: Target writer topic (destination) name. message: JSON-serializable payload to publish. + message_key: Optional transport key for systems that support keyed messages. Raises: WriteError: If publishing fails. """ diff --git a/src/writers/writer_eventbridge.py b/src/writers/writer_eventbridge.py index 5bcac47..076f007 100644 --- a/src/writers/writer_eventbridge.py +++ b/src/writers/writer_eventbridge.py @@ -49,11 +49,12 @@ def _format_failed_entries(self) -> str: failed = [e for e in self._entries if "ErrorCode" in e or "ErrorMessage" in e] return json.dumps(failed) if failed else "[]" - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Publish a message to EventBridge. Args: topic_name: Target EventBridge writer topic (destination) name. message: JSON-serializable payload to publish. + message_key: Optional transport key (unused by EventBridge). Raises: WriteError: If publishing fails. """ diff --git a/src/writers/writer_kafka.py b/src/writers/writer_kafka.py index 831789a..58a9ad3 100644 --- a/src/writers/writer_kafka.py +++ b/src/writers/writer_kafka.py @@ -88,11 +88,12 @@ def _flush_with_timeout(self, timeout: float) -> int | None: except TypeError: return self._producer.flush() - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Publish a message to Kafka. Args: topic_name: Kafka topic to publish to. message: JSON-serializable payload. + message_key: Optional Kafka key used for partitioning. Raises: WriteError: If publishing fails. """ @@ -115,7 +116,7 @@ def write(self, topic_name: str, message: dict[str, Any]) -> None: logger.debug("Sending to Kafka %s.", topic_name) self._producer.produce( topic_name, - key="", + key=message_key, value=json.dumps(message).encode("utf-8"), callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), ) diff --git a/src/writers/writer_postgres.py b/src/writers/writer_postgres.py index b405d77..825a230 100644 --- a/src/writers/writer_postgres.py +++ b/src/writers/writer_postgres.py @@ -158,11 +158,12 @@ def _insert_test(self, cursor: Any, message: dict[str, Any]) -> None: }, ) - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Dispatch insertion for a topic into the correct Postgres table(s). Args: topic_name: Incoming topic identifier. message: JSON-serializable payload. + message_key: Optional transport key (unused by Postgres writer). Raises: WriteError: If publishing fails. """ diff --git a/tests/unit/handlers/test_handler_topic.py b/tests/unit/handlers/test_handler_topic.py index 9d2fa32..a8123a8 100644 --- a/tests/unit/handlers/test_handler_topic.py +++ b/tests/unit/handlers/test_handler_topic.py @@ -71,6 +71,52 @@ def test_load_access_config_from_s3(): mock_aws_s3.Bucket.return_value.Object.assert_called_once_with("path/to/access.json") +## load_topic_keys_config() +def test_load_topic_keys_config_from_local_file(): + """Test loading topic key config from local file.""" + mock_handler_token = MagicMock() + mock_aws_s3 = MagicMock() + mock_writers = { + "kafka": MagicMock(), + "eventbridge": MagicMock(), + "postgres": MagicMock(), + } + config = {"topic_keys_config": "conf/topic_keys.json"} + handler = HandlerTopic(config, mock_aws_s3, mock_handler_token, mock_writers) + + topic_keys_data = {"public.cps.za.test": "event_id"} + with patch("builtins.open", mock_open(read_data=json.dumps(topic_keys_data))): + result = handler.with_load_topic_keys_config() + + assert result is handler + assert "event_id" == handler.topic_keys["public.cps.za.test"] + + +def test_load_topic_keys_config_from_s3(): + """Test loading topic key config from S3.""" + mock_handler_token = MagicMock() + mock_aws_s3 = MagicMock() + mock_writers = { + "kafka": MagicMock(), + "eventbridge": MagicMock(), + "postgres": MagicMock(), + } + config = {"topic_keys_config": "s3://my-bucket/path/to/topic_keys.json"} + handler = HandlerTopic(config, mock_aws_s3, mock_handler_token, mock_writers) + + topic_keys_data = {"public.cps.za.status_change": "job_id"} + mock_body = MagicMock() + mock_body.read.return_value = json.dumps(topic_keys_data).encode("utf-8") + mock_aws_s3.Bucket.return_value.Object.return_value.get.return_value = {"Body": mock_body} + + result = handler.with_load_topic_keys_config() + + assert result is handler + assert "job_id" == handler.topic_keys["public.cps.za.status_change"] + mock_aws_s3.Bucket.assert_called_once_with("my-bucket") + mock_aws_s3.Bucket.return_value.Object.assert_called_once_with("path/to/topic_keys.json") + + ## load_topic_schemas() def test_load_topic_schemas_success(): mock_handler_token = MagicMock() @@ -211,6 +257,50 @@ def test_post_success_all_writers(event_gate_module, make_event, valid_payload): assert 202 == body["statusCode"] +def test_post_passes_topic_key_to_writers(event_gate_module, make_event, valid_payload): + """Configured topic key field is extracted and passed to writer.write.""" + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + event_gate_module.handler_topic.topic_keys["public.cps.za.test"] = "event_id" + kafka_writer = event_gate_module.handler_topic.writers["kafka"] + kafka_writer.write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["eventbridge"].write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["postgres"].write = MagicMock(return_value=None) + + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert 202 == resp["statusCode"] + + kafka_writer.write.assert_called_once_with("public.cps.za.test", valid_payload, "e1") + + +def test_post_missing_topic_key_field_falls_back_to_empty_key(event_gate_module, make_event, valid_payload): + """Missing configured key field falls back to empty message key.""" + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + event_gate_module.handler_topic.topic_keys["public.cps.za.test"] = "job_id" + kafka_writer = event_gate_module.handler_topic.writers["kafka"] + kafka_writer.write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["eventbridge"].write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["postgres"].write = MagicMock(return_value=None) + + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert 202 == resp["statusCode"] + + kafka_writer.write.assert_called_once_with("public.cps.za.test", valid_payload, "") + + def test_post_single_writer_failure(event_gate_module, make_event, valid_payload): with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): event_gate_module.handler_topic.writers["kafka"].write = MagicMock(side_effect=WriteError("Kafka boom")) diff --git a/tests/unit/utils/test_config_loader.py b/tests/unit/utils/test_config_loader.py index 361656b..bf21ff6 100644 --- a/tests/unit/utils/test_config_loader.py +++ b/tests/unit/utils/test_config_loader.py @@ -24,14 +24,25 @@ import pytest -from src.utils.config_loader import _normalize_access_config, load_access_config, load_config, load_topic_names +from src.utils.config_loader import ( + _normalize_access_config, + _normalize_topic_keys_config, + load_access_config, + load_config, + load_topic_keys_config, + load_topic_names, +) @pytest.fixture def conf_dir(tmp_path: Path) -> str: """Create a temporary config directory with topic schemas.""" # Write config.json. - config_data = {"token_provider_url": "http://test", "access_config": str(tmp_path / "access.json")} + config_data = { + "token_provider_url": "http://test", + "access_config": str(tmp_path / "access.json"), + "topic_keys_config": str(tmp_path / "topic_keys.json"), + } (tmp_path / "config.json").write_text(json.dumps(config_data), encoding="utf-8") # Write access.json. @@ -40,6 +51,12 @@ def conf_dir(tmp_path: Path) -> str: encoding="utf-8", ) + # Write topic_keys.json. + (tmp_path / "topic_keys.json").write_text( + json.dumps({"public.cps.za.runs": "event_id", "public.cps.za.status_change": "job_id"}), + encoding="utf-8", + ) + # Write topic_schemas. schemas_dir = tmp_path / "topic_schemas" schemas_dir.mkdir() @@ -129,6 +146,42 @@ def test_empty_schemas_dir(self, tmp_path: Path) -> None: assert [] == result +class TestLoadTopicKeysConfig: + """Tests for load_topic_keys_config().""" + + def test_loads_from_local_file(self, conf_dir: str) -> None: + """Test loading topic key config from local file path.""" + config = load_config(conf_dir) + aws_s3 = MagicMock() + + result = load_topic_keys_config(config, aws_s3) + + assert "job_id" == result["public.cps.za.status_change"] + aws_s3.Bucket.assert_not_called() + + def test_loads_from_s3(self) -> None: + """Test loading topic key config from S3 path.""" + config = {"topic_keys_config": "s3://my-bucket/conf/topic_keys.json"} + key_data = {"public.cps.za.test": "event_id"} + + mock_body = MagicMock() + mock_body.read.return_value = json.dumps(key_data).encode("utf-8") + + mock_s3 = MagicMock() + mock_s3.Bucket.return_value.Object.return_value.get.return_value = {"Body": mock_body} + + result = load_topic_keys_config(config, mock_s3) + + assert "event_id" == result["public.cps.za.test"] + mock_s3.Bucket.assert_called_once_with("my-bucket") + mock_s3.Bucket.return_value.Object.assert_called_once_with("conf/topic_keys.json") + + def test_missing_config_returns_empty_mapping(self) -> None: + """Test missing topic_keys_config returns empty mapping.""" + result = load_topic_keys_config({}, MagicMock()) + assert {} == result + + class TestNormalizeAccessConfig: """Tests for _normalize_access_config().""" @@ -178,3 +231,28 @@ def test_normalize_rejects_malformed_config(self, raw: dict, error_fragment: str """Test that malformed access config raises ValueError.""" with pytest.raises(ValueError, match=error_fragment): _normalize_access_config(raw) + + +class TestNormalizeTopicKeysConfig: + """Tests for _normalize_topic_keys_config().""" + + def test_normalize_valid_topic_keys(self) -> None: + """Test valid topic key mappings are preserved.""" + raw = {"public.cps.za.runs": "event_id", "public.cps.za.status_change": "job_id"} + + result = _normalize_topic_keys_config(raw) + + assert raw == result + + @pytest.mark.parametrize( + "raw,error_fragment", + [ + ({"topic": 123}, "expected key field name as string"), + ({"topic": " "}, "must be a non-empty string"), + ], + ids=["invalid-type", "empty-string"], + ) + def test_normalize_rejects_invalid_topic_keys(self, raw: dict[str, Any], error_fragment: str) -> None: + """Test malformed topic key config raises ValueError.""" + with pytest.raises(ValueError, match=error_fragment): + _normalize_topic_keys_config(raw) diff --git a/tests/unit/writers/test_writer_kafka.py b/tests/unit/writers/test_writer_kafka.py index b68e834..bce2baf 100644 --- a/tests/unit/writers/test_writer_kafka.py +++ b/tests/unit/writers/test_writer_kafka.py @@ -95,6 +95,16 @@ def test_write_success(): writer.write("topic", {"b": 2}) +def test_write_uses_message_key(): + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + producer = FakeProducerSuccess() + writer._producer = producer + + writer.write("topic", {"b": 2}, message_key="job-1") + + assert [("topic", "job-1", b'{"b": 2}')] == producer.produced + + def test_write_async_error(): writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) writer._producer = FakeProducerError() From 1a0c431d08a56f0bcdc9a6d1615908e0d9628f4a Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 28 May 2026 15:07:04 +0200 Subject: [PATCH 12/16] fix PR check issues --- src/utils/config_loader.py | 4 +--- tests/unit/utils/test_config_loader.py | 26 -------------------------- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py index 6bc4adf..3093da4 100644 --- a/src/utils/config_loader.py +++ b/src/utils/config_loader.py @@ -157,9 +157,7 @@ def _validate_topic_keys_config(topic_key_data: dict[str, Any]) -> None: """ for topic, field_name in topic_key_data.items(): if not isinstance(field_name, str): - raise ValueError( - f"Topic '{topic}': expected key field name as string, got {type(field_name).__name__}." - ) + raise ValueError(f"Topic '{topic}': expected key field name as string, got {type(field_name).__name__}.") return diff --git a/tests/unit/utils/test_config_loader.py b/tests/unit/utils/test_config_loader.py index bf21ff6..f48cba5 100644 --- a/tests/unit/utils/test_config_loader.py +++ b/tests/unit/utils/test_config_loader.py @@ -26,7 +26,6 @@ from src.utils.config_loader import ( _normalize_access_config, - _normalize_topic_keys_config, load_access_config, load_config, load_topic_keys_config, @@ -231,28 +230,3 @@ def test_normalize_rejects_malformed_config(self, raw: dict, error_fragment: str """Test that malformed access config raises ValueError.""" with pytest.raises(ValueError, match=error_fragment): _normalize_access_config(raw) - - -class TestNormalizeTopicKeysConfig: - """Tests for _normalize_topic_keys_config().""" - - def test_normalize_valid_topic_keys(self) -> None: - """Test valid topic key mappings are preserved.""" - raw = {"public.cps.za.runs": "event_id", "public.cps.za.status_change": "job_id"} - - result = _normalize_topic_keys_config(raw) - - assert raw == result - - @pytest.mark.parametrize( - "raw,error_fragment", - [ - ({"topic": 123}, "expected key field name as string"), - ({"topic": " "}, "must be a non-empty string"), - ], - ids=["invalid-type", "empty-string"], - ) - def test_normalize_rejects_invalid_topic_keys(self, raw: dict[str, Any], error_fragment: str) -> None: - """Test malformed topic key config raises ValueError.""" - with pytest.raises(ValueError, match=error_fragment): - _normalize_topic_keys_config(raw) From c39269c768f565011c9b7a5fe29c07f8546b0343 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 28 May 2026 15:30:08 +0200 Subject: [PATCH 13/16] Remove settings.json again --- .vscode/settings.json | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 9b38853..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "python.testing.pytestArgs": [ - "tests" - ], - "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true -} \ No newline at end of file From c215254811a157b594e877af0cb8ad413a63c765 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Fri, 29 May 2026 17:13:40 +0200 Subject: [PATCH 14/16] add adr --- adr/000-status-change.md | 722 ++++++++++++++++++++++++++ conf/topic_schemas/status_change.json | 6 +- 2 files changed, 724 insertions(+), 4 deletions(-) create mode 100644 adr/000-status-change.md diff --git a/adr/000-status-change.md b/adr/000-status-change.md new file mode 100644 index 0000000..cc83f42 --- /dev/null +++ b/adr/000-status-change.md @@ -0,0 +1,722 @@ +# ADR 000. Event Bus -- Status change topic + +## Decision + +Create Kafka Topics for Pipeline Status change topics. + +### Background +The pipeline runs topic has been introduced earlier. The main use-case behind that topic was to facilitate event-driven job executions in CurateApp, based on events emitted by IngestApp. + +The purpose of the status change topic is monitoring of job executions. The data model should allow the monitoring of executions in progress and updates to the status of an execution. Furthermore, the data model should allow grouping and nesting of executions, and model retries. The data model should be expressive enough to model dependencies across multiple applications (IngestApp, CurateApp, ProcessApp) + +While the runs topic is related, its schema assumes a run is finished. There is no support for status changes, to track a job execution while it is running. + +Eventually, the status-change topic may provide all the functionality, which is needed to serve the event-driven execution use-case, and thus make the runs topic redundant. However, changing the runs topic to support status updates is hardly possible in a backwards compatible way. Therefore, the status-change should be created as a dedicated new topic. + + +### Status change topic + +A status change event represents a point-in-time update about a single job execution. Instead of sending one final event at the end of a run, producers emit lifecycle events as the execution progresses. + +The five lifecycle event types are: + +- JobCreatedEvent: emitted when an execution is registered and potentially sent to a queue +- JobStartedEvent: emitted when processing actually begins. +- JobCreatedAndStartedEvent: emitted when an execution is started immediately. This replaces the previous two events. +- JobUpdatedEvent: emitted for in-flight updates that are relevant for monitoring (for example progress or contextual metadata enrichment). +- JobFinishedEvent: emitted when execution reaches a terminal state (SUCCEEDED, FAILED, or KILLED). + +Events should be sent whenever the lifecycle changes or monitoring-relevant information changes. At minimum, producers should send one created JobCreatedEvent or JobCreatedAndStartedEvent and one JobFinishedEvent event, with started/updated events included when available. + +Many fields are intentionally optional. In distributed execution environments, many values are only known eventually (for example platform IDs or enriched context). This schema allows progressive disclosure over time while still enforcing a minimal event envelope. + +#### Job hierarchies + +Nested jobs are modeled by setting `parent_job_id` to the parent `job_id`. To optimize analysis, it is advisable to additionally set the same `job_group_id` for all jobs in the hierarchy. It can be set to the root parent id, for example. + +#### Retries + +Retries can be modeled by setting the `initial_job_id` to the job id of the retried job and increasing the `attempt_number`. In case of retries of nested jobs, the `parent_job_id` should be set to the parent of the retried job, and the `initial_job_id` should be set to the job id of the retried job. In the job hierarchy, retried jobs should be siblings, and not have a parent-child relationship. + +## Examples + +Example `JobCreatedAndStartedEvent`: + +```json +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "6f4d8f0f-c9e3-4f53-8d5f-c16ff41f0f90", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646400000, + "country": "za", + "job_id": "e5b8f1cc-e40a-4923-8a77-9e36b1a7bdb0", + "job_name": "IngestApp Ingestion", + "platform": "aws.glue", + "platform_metadata": { + "numberOfWorkers": 8, + "glueVersion": "5.0" + }, + "input_arguments": { + "pipeline_id": "1234", + "current_date": "2026-05-19" + }, + "definition_id": "42", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } +} +``` + +Example `JobFinishedEvent` (FAILED): + +```json +{ + "event_type": "JobFinishedEvent", + "event_id": "f4c2f92d-b6fb-4ce4-a58c-7ba28632a73c", + "timestamp_event": 1747646705000, + "job_id": "e5b8f1cc-e40a-4923-8a77-9e36b1a7bdb0", + "status_type": "FAILED", + "status_subtype": "NO_DATA", + "status_detail": "No data at the source" +} +``` + +More examples can be found in the appendix. + + +## Alternatives Considered +- Reusing the runs topic + + Pros: The runs topic appears similar at first glance, and reusing it would avoid introducing a new topic. + + Cons: The runs schema assumes a completed run, while status-change events need to support live tracking. It also has limited support for job hierarchies. Finally, the primary concern differs: runs events are aimed at downstream triggering, while status-change events focus on detailed monitoring. + +- Separate schema for job steps versus nested jobs + + Pros: A dedicated step schema could reduce repeated fields such as tenant and country. + + Cons: Splitting hierarchy data across separate entities makes traversal and analysis harder. It also increases operational complexity by adding another schema/topic to manage, and introduces ambiguity in deciding whether something should be modeled as a step or as a nested job. + +## Limitations +- A job can only be assigned to one job group id. With this model, it's not possible to assign a job to multiple groups or to define group hierarchies. This may be a limitation when linking jobs across IngestApp, CurateApp and ProcessApp. + +## Future Consideration +- Merging runs topic into status-change topic. It depends on the concrete use-cases of the runs topic, +whether this is desirable and feasible. However, the status-change deliberately uses many of the same fields +that the runs topic uses. + +- Linking jobs across IngestApp, CurateApp and ProcessApp. The model allows this in principle, both as a parent-child relationship or a sibling relationship. For example, CurateApp jobs could reference an IngestApp job as a parent job id, or an artificial common parent job could be defined. In any case, the CurateApp job would need to know the parent job id. + +## Appendix A: Json schema +```json +{ + "type": "object", + "properties": { + "event_type": { + "type": "string", + "enum": [ + "JobCreatedEvent", + "JobCreatedAndStartedEvent", + "JobStartedEvent", + "JobUpdatedEvent", + "JobFinishedEvent" + ], + "description": "Lifecycle event type for job status changes." + }, + "event_id": { + "type": "string", + "format": "uuid", + "description": "Unique identifier for the event (UUID)" + }, + "job_ref": { + "type": [ + "string", + "null" + ], + "description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." + }, + "tenant_id": { + "type": [ + "string", + "null" + ], + "description": "Application ID or ServiceNow identifier" + }, + "source_app": { + "type": "string", + "description": " Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_event": { + "type": "integer", + "minimum": 0, + "description": "Timestamp of the event in epoch milliseconds" + }, + "country": { + "type": [ + "string", + "null" + ], + "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." + }, + "job_id": { + "type": "string", + "format": "uuid", + "description": "Primary job identifier (UUID)." + }, + "parent_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional parent job identifier (UUID), to represent nested job hierarchies." + }, + "initial_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional initial job identifier (UUID), to represent retried or replayed jobs." + }, + "job_group_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Job group identifier (UUID), may or may not reference a job id." + }, + "job_name": { + "type": "string", + "description": "Human-readable job name." + }, + "attempt_number": { + "type": [ + "integer", + "null" + ], + "minimum": 1, + "description": "Attempt number for this job." + }, + "platform": { + "type": [ + "string", + "null" + ], + "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." + }, + "platform_metadata": { + "type": [ + "object", + "null" + ], + "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." + }, + "input_arguments": { + "type": [ + "object", + "null" + ], + "description": "Arguments passed to the job." + }, + "definition_id": { + "type": "string", + "description": "Definition (Pipeline, Domain, Process) identifier." + }, + "definition_version": { + "type": [ + "string", + "null" + ], + "description": "Optional definition version." + }, + "status_type": { + "type": "string", + "enum": [ + "WAITING", + "RUNNING", + "SUCCEEDED", + "FAILED", + "KILLED" + ], + "description": "High-level status type for the current lifecycle event." + }, + "status_subtype": { + "type": [ + "string", + "null" + ], + "description": "Optional status subtype, e.g. NO_DATA or error code." + }, + "status_detail": { + "type": [ + "string", + "null" + ], + "description": "Optional human-readable status detail, e.g. short error message." + }, + "additional_context": { + "type": [ + "object", + "null" + ], + "description": "Additional context payload." + } + }, + "required": [ + "event_type", + "event_id", + "job_id", + "status_type" + ], + "allOf": [ + { + "if": { + "properties": { + "event_type": { + "enum": [ + "JobCreatedEvent" + ] + } + } + }, + "then": { + "required": [ + "job_name", + "source_app", + "source_app_version", + "timestamp_event", + "environment", + "definition_id", + "platform" + ], + "properties": { + "status_type": { + "enum": [ + "WAITING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "enum": [ + "JobCreatedAndStartedEvent" + ] + } + } + }, + "then": { + "required": [ + "job_name", + "source_app", + "source_app_version", + "timestamp_event", + "environment", + "definition_id", + "platform" + ], + "properties": { + "status_type": { + "enum": [ + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobStartedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobUpdatedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "WAITING", + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobFinishedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "SUCCEEDED", + "FAILED", + "KILLED" + ] + } + } + } + } + ] +} +``` + +## Appendix B: The list of allowed country codes + +If a producer sends a code which is not on the list, it should be a +warning, not an error. This is because this list is not final and is +going to be updated. + +| Code | Country | +| --- | --- | +| \* | Multiple countries | +| bw | Botswana | +| gh | Ghana | +| ke | Kenya | +| mu | Mauritius (legacy, prefer offshre/onshore when possible) | +| of-mu | Mauritius (offshore) | +| on-mu | Mauritius (onshore) | +| mz | Mozambique | +| sc | Seychelles | +| tz | Tanzania (legacy, prefer bank-specific versions below when possible) | +| abt-tz | Tanzania (Absa Bank Tanzania) | +| nbc-tz | Tanzania (National Bank of Commerce) | +| ug | Uganda | +| za | South African Republic | +| zm | Zambia | + +This list is not complete, EventGate should allow other values. + +## Appendix C: Example nested jobs + +Example hierarchy: + +- IngestApp Ingestion + - Create Pramen Config Lambda + - Pramen Glue Job + - Land + - Standardize + - Publish to Hive + - Add control metrics + +Example event sequence (same `job_group_id` used for the full hierarchy): + +```json +[ + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ea4ef752-2a2b-4f6c-93ee-f31a2f9b7d10", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646400000, + "country": "za", + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "IngestApp Ingestion", + "platform": "aws.stepfunctions", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19", + "triggerType": "SCHEDULE" + }, + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ad766149-2d27-4c80-833d-b2f5d7f8108f", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646410000, + "country": "za", + "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Create Pramen Config", + "platform": "aws.lambda", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobFinishedEvent", + "event_id": "ff1714bf-a3f6-4b0d-9b4b-5662b8e9c42e", + "tenant_id": "abcd", + "timestamp_event": 1747646445000, + "country": "za", + "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" + }, + { + "event_type": "JobUpdatedEvent", + "event_id": "1f2456d2-f2bf-40f7-a8be-e46f3dc9b4b3", + "timestamp_event": 1747646447000, + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } + }, + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646450000, + "country": "za", + "job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Pramen", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "4a4c2f5c-8be8-4b26-a5ff-60f6fcb32f43", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646490000, + "country": "za", + "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Land", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobFinishedEvent", + "event_id": "5f5f5f5f-5a5a-4f5f-8a5a-5f5f5f5f5f5f", + "job_ref": "jr_5ecc2b7609d0e9c665abdcb8af8059489fcd2e60c916bd52476dcfa41e1417af", + "timestamp_event": 1747646520000, + "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" + }, + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "6b885f6b-ac7f-4abf-8d20-8bb58f9bd12a", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646602000, + "country": "za", + "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Publish to Hive", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobFinishedEvent", + "event_id": "7f7f7f7f-7c7c-4f7f-8c7c-7f7f7f7f7f7f", + "timestamp_event": 1747646635000, + "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "FAILED", + "status_subtype": "HIVE_TABLE_UPDATE_FAILED", + "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." + }, + { + "event_type": "JobFinishedEvent", + "event_id": "7baea31d-b630-451f-a59e-699e0c4f53b3", + "timestamp_event": 1747646705000, + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "FAILED", + "status_subtype": "HIVE_TABLE_UPDATE_FAILED", + "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." + } +] +``` + +### Appendix D: Example job retry +This example builds upon the example in appendix C. In this scenario the failed job is retried with special configuration that only executes the Publish to Hive step in the Pramen job. + +The retried job is referenced using the `initial_job_id`, and the `attempt_number` is increased to 2. + +Example retry event sequence: + +```json +[ + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "9b7d2a40-6f12-4f9d-9c5e-8f2c0f524e11", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646800000, + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "initial_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "IngestApp Ingestion", + "attempt_number": 2, + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "f2f0cf47-a738-489c-b900-671f74c6f58b", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646810000, + "country": "za", + "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Create Pramen Config", + "platform": "aws.lambda", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobFinishedEvent", + "event_id": "4b77d3f1-62d1-42b4-a7f5-83f36fd24818", + "timestamp_event": 1747646824000, + "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" + }, + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646825000, + "country": "za", + "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Pramen", + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobUpdatedEvent", + "event_id": "9dbf71c1-5f71-4a92-8747-3db90d6904e4", + "tenant_id": "abcd", + "timestamp_event": 1747646826000, + "country": "za", + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } + }, + { + "event_type": "JobCreatedAndStartedEvent", + "event_id": "2e6f31c8-76b8-4bcf-9066-b27cbd5564e1", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646830000, + "country": "za", + "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Publish to Hive", + "attempt_number": 1, + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" + }, + { + "event_type": "JobFinishedEvent", + "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", + "timestamp_event": 1747646865000, + "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" + }, + { + "event_type": "JobFinishedEvent", + "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", + "timestamp_event": 1747646865000, + "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" + }, + { + "event_type": "JobFinishedEvent", + "event_id": "2fe09e1f-25d0-4d89-911e-f5fbf699d1f5", + "timestamp_event": 1747646870000, + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" + } +] +``` \ No newline at end of file diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index eaa6f4d..fe9ecfa 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -186,8 +186,7 @@ "timestamp_event", "environment", "definition_id", - "platform", - "input_arguments" + "platform" ], "properties": { "status_type": { @@ -216,8 +215,7 @@ "timestamp_event", "environment", "definition_id", - "platform", - "input_arguments" + "platform" ], "properties": { "status_type": { From 2660b825a9866de25dca74896ae5448d1f0b1c87 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Fri, 29 May 2026 17:20:06 +0200 Subject: [PATCH 15/16] Move adr to subfolder --- adr/{ => 000-status-change}/000-status-change.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename adr/{ => 000-status-change}/000-status-change.md (100%) diff --git a/adr/000-status-change.md b/adr/000-status-change/000-status-change.md similarity index 100% rename from adr/000-status-change.md rename to adr/000-status-change/000-status-change.md From eb463904695f89d3d9122f90fd05cef936274628 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Fri, 29 May 2026 17:32:14 +0200 Subject: [PATCH 16/16] Move json event examples to separate files for better use by AI --- adr/000-status-change/000-status-change.md | 583 +----------------- .../01-job-created-and-started-ingestion.json | 21 + ...ated-and-started-create-pramen-config.json | 21 + ...nished-create-pramen-config-succeeded.json | 9 + .../04-job-updated-ingestion-running.json | 10 + .../05-job-created-and-started-pramen.json | 17 + .../06-job-created-and-started-land.json | 17 + .../07-job-finished-land-succeeded.json | 8 + ...b-created-and-started-publish-to-hive.json | 17 + ...9-job-finished-publish-to-hive-failed.json | 9 + .../10-job-finished-ingestion-failed.json | 9 + ...b-created-and-started-ingestion-retry.json | 20 + ...ated-and-started-create-pramen-config.json | 21 + ...nished-create-pramen-config-succeeded.json | 7 + .../04-job-created-and-started-pramen.json | 21 + .../05-job-updated-ingestion-running.json | 12 + ...b-created-and-started-publish-to-hive.json | 22 + ...ob-finished-publish-to-hive-succeeded.json | 7 + .../08-job-finished-pramen-succeeded.json | 7 + .../09-job-finished-ingestion-succeeded.json | 7 + 20 files changed, 286 insertions(+), 559 deletions(-) create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/06-job-created-and-started-land.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json create mode 100644 adr/000-status-change/examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/04-job-created-and-started-pramen.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/05-job-updated-ingestion-running.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json create mode 100644 adr/000-status-change/examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json diff --git a/adr/000-status-change/000-status-change.md b/adr/000-status-change/000-status-change.md index cc83f42..b11b3fe 100644 --- a/adr/000-status-change/000-status-change.md +++ b/adr/000-status-change/000-status-change.md @@ -112,295 +112,7 @@ that the runs topic uses. - Linking jobs across IngestApp, CurateApp and ProcessApp. The model allows this in principle, both as a parent-child relationship or a sibling relationship. For example, CurateApp jobs could reference an IngestApp job as a parent job id, or an artificial common parent job could be defined. In any case, the CurateApp job would need to know the parent job id. ## Appendix A: Json schema -```json -{ - "type": "object", - "properties": { - "event_type": { - "type": "string", - "enum": [ - "JobCreatedEvent", - "JobCreatedAndStartedEvent", - "JobStartedEvent", - "JobUpdatedEvent", - "JobFinishedEvent" - ], - "description": "Lifecycle event type for job status changes." - }, - "event_id": { - "type": "string", - "format": "uuid", - "description": "Unique identifier for the event (UUID)" - }, - "job_ref": { - "type": [ - "string", - "null" - ], - "description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." - }, - "tenant_id": { - "type": [ - "string", - "null" - ], - "description": "Application ID or ServiceNow identifier" - }, - "source_app": { - "type": "string", - "description": " Standardized source application name (aqueduct, unify, lum, etc)" - }, - "source_app_version": { - "type": "string", - "description": "Source application version (SemVer preferred)" - }, - "environment": { - "type": "string", - "description": "Environment (dev, uat, pre-prod, prod, test or others)" - }, - "timestamp_event": { - "type": "integer", - "minimum": 0, - "description": "Timestamp of the event in epoch milliseconds" - }, - "country": { - "type": [ - "string", - "null" - ], - "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." - }, - "job_id": { - "type": "string", - "format": "uuid", - "description": "Primary job identifier (UUID)." - }, - "parent_job_id": { - "type": [ - "string", - "null" - ], - "format": "uuid", - "description": "Optional parent job identifier (UUID), to represent nested job hierarchies." - }, - "initial_job_id": { - "type": [ - "string", - "null" - ], - "format": "uuid", - "description": "Optional initial job identifier (UUID), to represent retried or replayed jobs." - }, - "job_group_id": { - "type": [ - "string", - "null" - ], - "format": "uuid", - "description": "Job group identifier (UUID), may or may not reference a job id." - }, - "job_name": { - "type": "string", - "description": "Human-readable job name." - }, - "attempt_number": { - "type": [ - "integer", - "null" - ], - "minimum": 1, - "description": "Attempt number for this job." - }, - "platform": { - "type": [ - "string", - "null" - ], - "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." - }, - "platform_metadata": { - "type": [ - "object", - "null" - ], - "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." - }, - "input_arguments": { - "type": [ - "object", - "null" - ], - "description": "Arguments passed to the job." - }, - "definition_id": { - "type": "string", - "description": "Definition (Pipeline, Domain, Process) identifier." - }, - "definition_version": { - "type": [ - "string", - "null" - ], - "description": "Optional definition version." - }, - "status_type": { - "type": "string", - "enum": [ - "WAITING", - "RUNNING", - "SUCCEEDED", - "FAILED", - "KILLED" - ], - "description": "High-level status type for the current lifecycle event." - }, - "status_subtype": { - "type": [ - "string", - "null" - ], - "description": "Optional status subtype, e.g. NO_DATA or error code." - }, - "status_detail": { - "type": [ - "string", - "null" - ], - "description": "Optional human-readable status detail, e.g. short error message." - }, - "additional_context": { - "type": [ - "object", - "null" - ], - "description": "Additional context payload." - } - }, - "required": [ - "event_type", - "event_id", - "job_id", - "status_type" - ], - "allOf": [ - { - "if": { - "properties": { - "event_type": { - "enum": [ - "JobCreatedEvent" - ] - } - } - }, - "then": { - "required": [ - "job_name", - "source_app", - "source_app_version", - "timestamp_event", - "environment", - "definition_id", - "platform" - ], - "properties": { - "status_type": { - "enum": [ - "WAITING" - ] - } - } - } - }, - { - "if": { - "properties": { - "event_type": { - "enum": [ - "JobCreatedAndStartedEvent" - ] - } - } - }, - "then": { - "required": [ - "job_name", - "source_app", - "source_app_version", - "timestamp_event", - "environment", - "definition_id", - "platform" - ], - "properties": { - "status_type": { - "enum": [ - "RUNNING" - ] - } - } - } - }, - { - "if": { - "properties": { - "event_type": { - "const": "JobStartedEvent" - } - } - }, - "then": { - "properties": { - "status_type": { - "enum": [ - "RUNNING" - ] - } - } - } - }, - { - "if": { - "properties": { - "event_type": { - "const": "JobUpdatedEvent" - } - } - }, - "then": { - "properties": { - "status_type": { - "enum": [ - "WAITING", - "RUNNING" - ] - } - } - } - }, - { - "if": { - "properties": { - "event_type": { - "const": "JobFinishedEvent" - } - } - }, - "then": { - "properties": { - "status_type": { - "enum": [ - "SUCCEEDED", - "FAILED", - "KILLED" - ] - } - } - } - } - ] -} -``` +The json schema is located here: [status_change.json](../../conf/topic_schemas/status_change.json/) ## Appendix B: The list of allowed country codes @@ -442,148 +154,18 @@ Example hierarchy: Example event sequence (same `job_group_id` used for the full hierarchy): -```json -[ - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "ea4ef752-2a2b-4f6c-93ee-f31a2f9b7d10", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646400000, - "country": "za", - "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "IngestApp Ingestion", - "platform": "aws.stepfunctions", - "input_arguments": { - "pipelineId": 1234, - "currentDate": "2026-05-19", - "triggerType": "SCHEDULE" - }, - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "ad766149-2d27-4c80-833d-b2f5d7f8108f", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646410000, - "country": "za", - "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "Create Pramen Config", - "platform": "aws.lambda", - "input_arguments": { - "pipelineId": 1234, - "currentDate": "2026-05-19" - }, - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobFinishedEvent", - "event_id": "ff1714bf-a3f6-4b0d-9b4b-5662b8e9c42e", - "tenant_id": "abcd", - "timestamp_event": 1747646445000, - "country": "za", - "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "SUCCEEDED" - }, - { - "event_type": "JobUpdatedEvent", - "event_id": "1f2456d2-f2bf-40f7-a8be-e46f3dc9b4b3", - "timestamp_event": 1747646447000, - "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "RUNNING", - "additional_context": { - "pipeline_name": "MYPIPELINE" - } - }, - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646450000, - "country": "za", - "job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "Pramen", - "platform": "aws.glue", - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "4a4c2f5c-8be8-4b26-a5ff-60f6fcb32f43", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646490000, - "country": "za", - "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "Land", - "platform": "aws.glue", - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobFinishedEvent", - "event_id": "5f5f5f5f-5a5a-4f5f-8a5a-5f5f5f5f5f5f", - "job_ref": "jr_5ecc2b7609d0e9c665abdcb8af8059489fcd2e60c916bd52476dcfa41e1417af", - "timestamp_event": 1747646520000, - "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "SUCCEEDED" - }, - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "6b885f6b-ac7f-4abf-8d20-8bb58f9bd12a", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646602000, - "country": "za", - "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "Publish to Hive", - "platform": "aws.glue", - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobFinishedEvent", - "event_id": "7f7f7f7f-7c7c-4f7f-8c7c-7f7f7f7f7f7f", - "timestamp_event": 1747646635000, - "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "FAILED", - "status_subtype": "HIVE_TABLE_UPDATE_FAILED", - "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." - }, - { - "event_type": "JobFinishedEvent", - "event_id": "7baea31d-b630-451f-a59e-699e0c4f53b3", - "timestamp_event": 1747646705000, - "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "FAILED", - "status_subtype": "HIVE_TABLE_UPDATE_FAILED", - "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." - } -] -``` +The events are stored in separate files under `adr/000-status-change/examples/appendix-c-nested-jobs/`: + +- [01-job-created-and-started-ingestion.json](examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json) +- [02-job-created-and-started-create-pramen-config.json](examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json) +- [03-job-finished-create-pramen-config-succeeded.json](examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json) +- [04-job-updated-ingestion-running.json](examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json) +- [05-job-created-and-started-pramen.json](examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json) +- [06-job-created-and-started-land.json](examples/appendix-c-nested-jobs/06-job-created-and-started-land.json) +- [07-job-finished-land-succeeded.json](examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json) +- [08-job-created-and-started-publish-to-hive.json](examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json) +- [09-job-finished-publish-to-hive-failed.json](examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json) +- [10-job-finished-ingestion-failed.json](examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json) ### Appendix D: Example job retry This example builds upon the example in appendix C. In this scenario the failed job is retried with special configuration that only executes the Publish to Hive step in the Pramen job. @@ -592,131 +174,14 @@ The retried job is referenced using the `initial_job_id`, and the `attempt_numbe Example retry event sequence: -```json -[ - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "9b7d2a40-6f12-4f9d-9c5e-8f2c0f524e11", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646800000, - "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "initial_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "IngestApp Ingestion", - "attempt_number": 2, - "platform": "aws.glue", - "input_arguments": { - "pipelineId": 1234, - "currentDate": "2026-05-19" - }, - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "f2f0cf47-a738-489c-b900-671f74c6f58b", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646810000, - "country": "za", - "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "Create Pramen Config", - "platform": "aws.lambda", - "input_arguments": { - "pipelineId": 1234, - "currentDate": "2026-05-19" - }, - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobFinishedEvent", - "event_id": "4b77d3f1-62d1-42b4-a7f5-83f36fd24818", - "timestamp_event": 1747646824000, - "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "SUCCEEDED" - }, - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646825000, - "country": "za", - "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "Pramen", - "platform": "aws.glue", - "input_arguments": { - "pipelineId": 1234, - "currentDate": "2026-05-19" - }, - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobUpdatedEvent", - "event_id": "9dbf71c1-5f71-4a92-8747-3db90d6904e4", - "tenant_id": "abcd", - "timestamp_event": 1747646826000, - "country": "za", - "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "RUNNING", - "additional_context": { - "pipeline_name": "MYPIPELINE" - } - }, - { - "event_type": "JobCreatedAndStartedEvent", - "event_id": "2e6f31c8-76b8-4bcf-9066-b27cbd5564e1", - "tenant_id": "abcd", - "source_app": "ingestapp", - "source_app_version": "2.14.0", - "environment": "dev", - "timestamp_event": 1747646830000, - "country": "za", - "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "job_name": "Publish to Hive", - "attempt_number": 1, - "platform": "aws.glue", - "input_arguments": { - "pipelineId": 1234, - "currentDate": "2026-05-19" - }, - "definition_id": "1234", - "status_type": "RUNNING" - }, - { - "event_type": "JobFinishedEvent", - "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", - "timestamp_event": 1747646865000, - "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "SUCCEEDED" - }, - { - "event_type": "JobFinishedEvent", - "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", - "timestamp_event": 1747646865000, - "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "SUCCEEDED" - }, - { - "event_type": "JobFinishedEvent", - "event_id": "2fe09e1f-25d0-4d89-911e-f5fbf699d1f5", - "timestamp_event": 1747646870000, - "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", - "status_type": "SUCCEEDED" - } -] -``` \ No newline at end of file +The events are stored in separate files under `adr/000-status-change/examples/appendix-d-job-retry/`: + +- [01-job-created-and-started-ingestion-retry.json](examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json) +- [02-job-created-and-started-create-pramen-config.json](examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json) +- [03-job-finished-create-pramen-config-succeeded.json](examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json) +- [04-job-created-and-started-pramen.json](examples/appendix-d-job-retry/04-job-created-and-started-pramen.json) +- [05-job-updated-ingestion-running.json](examples/appendix-d-job-retry/05-job-updated-ingestion-running.json) +- [06-job-created-and-started-publish-to-hive.json](examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json) +- [07-job-finished-publish-to-hive-succeeded.json](examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json) +- [08-job-finished-pramen-succeeded.json](examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json) +- [09-job-finished-ingestion-succeeded.json](examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json) \ No newline at end of file diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json b/adr/000-status-change/examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json new file mode 100644 index 0000000..0d3ace2 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ea4ef752-2a2b-4f6c-93ee-f31a2f9b7d10", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646400000, + "country": "za", + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "IngestApp Ingestion", + "platform": "aws.stepfunctions", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19", + "triggerType": "SCHEDULE" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json b/adr/000-status-change/examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json new file mode 100644 index 0000000..ba1348b --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ad766149-2d27-4c80-833d-b2f5d7f8108f", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646410000, + "country": "za", + "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Create Pramen Config", + "platform": "aws.lambda", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json b/adr/000-status-change/examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json new file mode 100644 index 0000000..8cf5857 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json @@ -0,0 +1,9 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "ff1714bf-a3f6-4b0d-9b4b-5662b8e9c42e", + "tenant_id": "abcd", + "timestamp_event": 1747646445000, + "country": "za", + "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json b/adr/000-status-change/examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json new file mode 100644 index 0000000..c9089c4 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json @@ -0,0 +1,10 @@ +{ + "event_type": "JobUpdatedEvent", + "event_id": "1f2456d2-f2bf-40f7-a8be-e46f3dc9b4b3", + "timestamp_event": 1747646447000, + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json b/adr/000-status-change/examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json new file mode 100644 index 0000000..2d8cc4e --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json @@ -0,0 +1,17 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646450000, + "country": "za", + "job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Pramen", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/06-job-created-and-started-land.json b/adr/000-status-change/examples/appendix-c-nested-jobs/06-job-created-and-started-land.json new file mode 100644 index 0000000..f64866c --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/06-job-created-and-started-land.json @@ -0,0 +1,17 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "4a4c2f5c-8be8-4b26-a5ff-60f6fcb32f43", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646490000, + "country": "za", + "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Land", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json b/adr/000-status-change/examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json new file mode 100644 index 0000000..4c18225 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json @@ -0,0 +1,8 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "5f5f5f5f-5a5a-4f5f-8a5a-5f5f5f5f5f5f", + "job_ref": "jr_5ecc2b7609d0e9c665abdcb8af8059489fcd2e60c916bd52476dcfa41e1417af", + "timestamp_event": 1747646520000, + "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json b/adr/000-status-change/examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json new file mode 100644 index 0000000..647fd5a --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json @@ -0,0 +1,17 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "6b885f6b-ac7f-4abf-8d20-8bb58f9bd12a", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646602000, + "country": "za", + "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Publish to Hive", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json b/adr/000-status-change/examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json new file mode 100644 index 0000000..d81050a --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json @@ -0,0 +1,9 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "7f7f7f7f-7c7c-4f7f-8c7c-7f7f7f7f7f7f", + "timestamp_event": 1747646635000, + "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "FAILED", + "status_subtype": "HIVE_TABLE_UPDATE_FAILED", + "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json b/adr/000-status-change/examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json new file mode 100644 index 0000000..ab9e241 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json @@ -0,0 +1,9 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "7baea31d-b630-451f-a59e-699e0c4f53b3", + "timestamp_event": 1747646705000, + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "FAILED", + "status_subtype": "HIVE_TABLE_UPDATE_FAILED", + "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json b/adr/000-status-change/examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json new file mode 100644 index 0000000..f1cbd73 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json @@ -0,0 +1,20 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "9b7d2a40-6f12-4f9d-9c5e-8f2c0f524e11", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646800000, + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "initial_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "IngestApp Ingestion", + "attempt_number": 2, + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json b/adr/000-status-change/examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json new file mode 100644 index 0000000..f0bc2c8 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "f2f0cf47-a738-489c-b900-671f74c6f58b", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646810000, + "country": "za", + "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Create Pramen Config", + "platform": "aws.lambda", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json new file mode 100644 index 0000000..f2c42e2 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "4b77d3f1-62d1-42b4-a7f5-83f36fd24818", + "timestamp_event": 1747646824000, + "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/04-job-created-and-started-pramen.json b/adr/000-status-change/examples/appendix-d-job-retry/04-job-created-and-started-pramen.json new file mode 100644 index 0000000..f7a2acd --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/04-job-created-and-started-pramen.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646825000, + "country": "za", + "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Pramen", + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/05-job-updated-ingestion-running.json b/adr/000-status-change/examples/appendix-d-job-retry/05-job-updated-ingestion-running.json new file mode 100644 index 0000000..5512a7c --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/05-job-updated-ingestion-running.json @@ -0,0 +1,12 @@ +{ + "event_type": "JobUpdatedEvent", + "event_id": "9dbf71c1-5f71-4a92-8747-3db90d6904e4", + "tenant_id": "abcd", + "timestamp_event": 1747646826000, + "country": "za", + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json b/adr/000-status-change/examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json new file mode 100644 index 0000000..fd8258d --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json @@ -0,0 +1,22 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "2e6f31c8-76b8-4bcf-9066-b27cbd5564e1", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646830000, + "country": "za", + "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Publish to Hive", + "attempt_number": 1, + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json new file mode 100644 index 0000000..b43d583 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", + "timestamp_event": 1747646865000, + "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json new file mode 100644 index 0000000..39705f4 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", + "timestamp_event": 1747646865000, + "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json new file mode 100644 index 0000000..e2c5909 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "2fe09e1f-25d0-4d89-911e-f5fbf699d1f5", + "timestamp_event": 1747646870000, + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +}