Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions adr/000-status-change/000-status-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# ADR 000. Event Bus -- Status change topic

## Decision

Create Kafka Topics for Pipeline Status change topics.

### Background
The pipeline runs topic has been introduced earlier. The main use-case behind that topic was to facilitate event-driven job executions in CurateApp, based on events emitted by IngestApp.

The purpose of the status change topic is monitoring of job executions. The data model should allow the monitoring of executions in progress and updates to the status of an execution. Furthermore, the data model should allow grouping and nesting of executions, and model retries. The data model should be expressive enough to model dependencies across multiple applications (IngestApp, CurateApp, ProcessApp)

While the runs topic is related, its schema assumes a run is finished. There is no support for status changes, to track a job execution while it is running.

Eventually, the status-change topic may provide all the functionality, which is needed to serve the event-driven execution use-case, and thus make the runs topic redundant. However, changing the runs topic to support status updates is hardly possible in a backwards compatible way. Therefore, the status-change should be created as a dedicated new topic.


### Status change topic

A status change event represents a point-in-time update about a single job execution. Instead of sending one final event at the end of a run, producers emit lifecycle events as the execution progresses.

The five lifecycle event types are:

- JobCreatedEvent: emitted when an execution is registered and potentially sent to a queue
- JobStartedEvent: emitted when processing actually begins.
- JobCreatedAndStartedEvent: emitted when an execution is started immediately. This replaces the previous two events.
- JobUpdatedEvent: emitted for in-flight updates that are relevant for monitoring (for example progress or contextual metadata enrichment).
- JobFinishedEvent: emitted when execution reaches a terminal state (SUCCEEDED, FAILED, or KILLED).

Events should be sent whenever the lifecycle changes or monitoring-relevant information changes. At minimum, producers should send one created JobCreatedEvent or JobCreatedAndStartedEvent and one JobFinishedEvent event, with started/updated events included when available.

Many fields are intentionally optional. In distributed execution environments, many values are only known eventually (for example platform IDs or enriched context). This schema allows progressive disclosure over time while still enforcing a minimal event envelope.

#### Job hierarchies

Nested jobs are modeled by setting `parent_job_id` to the parent `job_id`. To optimize analysis, it is advisable to additionally set the same `job_group_id` for all jobs in the hierarchy. It can be set to the root parent id, for example.

#### Retries

Retries can be modeled by setting the `initial_job_id` to the job id of the retried job and increasing the `attempt_number`. In case of retries of nested jobs, the `parent_job_id` should be set to the parent of the retried job, and the `initial_job_id` should be set to the job id of the retried job. In the job hierarchy, retried jobs should be siblings, and not have a parent-child relationship.

## Examples

Example `JobCreatedAndStartedEvent`:

```json
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "6f4d8f0f-c9e3-4f53-8d5f-c16ff41f0f90",
"tenant_id": "abcd",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646400000,
"country": "za",
"job_id": "e5b8f1cc-e40a-4923-8a77-9e36b1a7bdb0",
"job_name": "IngestApp Ingestion",
"platform": "aws.glue",
"platform_metadata": {
"numberOfWorkers": 8,
"glueVersion": "5.0"
},
"input_arguments": {
"pipeline_id": "1234",
"current_date": "2026-05-19"
},
"definition_id": "42",
"status_type": "RUNNING",
"additional_context": {
"pipeline_name": "MYPIPELINE"
}
}
```

Example `JobFinishedEvent` (FAILED):

```json
{
"event_type": "JobFinishedEvent",
"event_id": "f4c2f92d-b6fb-4ce4-a58c-7ba28632a73c",
"timestamp_event": 1747646705000,
"job_id": "e5b8f1cc-e40a-4923-8a77-9e36b1a7bdb0",
"status_type": "FAILED",
"status_subtype": "NO_DATA",
"status_detail": "No data at the source"
}
```

More examples can be found in the appendix.


## Alternatives Considered
- Reusing the runs topic

Pros: The runs topic appears similar at first glance, and reusing it would avoid introducing a new topic.

Cons: The runs schema assumes a completed run, while status-change events need to support live tracking. It also has limited support for job hierarchies. Finally, the primary concern differs: runs events are aimed at downstream triggering, while status-change events focus on detailed monitoring.

- Separate schema for job steps versus nested jobs

Pros: A dedicated step schema could reduce repeated fields such as tenant and country.

Cons: Splitting hierarchy data across separate entities makes traversal and analysis harder. It also increases operational complexity by adding another schema/topic to manage, and introduces ambiguity in deciding whether something should be modeled as a step or as a nested job.

## Limitations
- A job can only be assigned to one job group id. With this model, it's not possible to assign a job to multiple groups or to define group hierarchies. This may be a limitation when linking jobs across IngestApp, CurateApp and ProcessApp.

## Future Consideration
- Merging runs topic into status-change topic. It depends on the concrete use-cases of the runs topic,
whether this is desirable and feasible. However, the status-change deliberately uses many of the same fields
that the runs topic uses.

- Linking jobs across IngestApp, CurateApp and ProcessApp. The model allows this in principle, both as a parent-child relationship or a sibling relationship. For example, CurateApp jobs could reference an IngestApp job as a parent job id, or an artificial common parent job could be defined. In any case, the CurateApp job would need to know the parent job id.

## Appendix A: Json schema
The json schema is located here: [status_change.json](../../conf/topic_schemas/status_change.json/)

## Appendix B: The list of allowed country codes

If a producer sends a code which is not on the list, it should be a
warning, not an error. This is because this list is not final and is
going to be updated.

| Code | Country |
| --- | --- |
| \* | Multiple countries |
| bw | Botswana |
| gh | Ghana |
| ke | Kenya |
| mu | Mauritius (legacy, prefer offshre/onshore when possible) |
| of-mu | Mauritius (offshore) |
| on-mu | Mauritius (onshore) |
| mz | Mozambique |
| sc | Seychelles |
| tz | Tanzania (legacy, prefer bank-specific versions below when possible) |
| abt-tz | Tanzania (Absa Bank Tanzania) |
| nbc-tz | Tanzania (National Bank of Commerce) |
| ug | Uganda |
| za | South African Republic |
| zm | Zambia |

This list is not complete, EventGate should allow other values.

## Appendix C: Example nested jobs

Example hierarchy:

- IngestApp Ingestion
- Create Pramen Config Lambda
- Pramen Glue Job
- Land
- Standardize
- Publish to Hive
- Add control metrics

Example event sequence (same `job_group_id` used for the full hierarchy):

The events are stored in separate files under `adr/000-status-change/examples/appendix-c-nested-jobs/`:

- [01-job-created-and-started-ingestion.json](examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json)
- [02-job-created-and-started-create-pramen-config.json](examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json)
- [03-job-finished-create-pramen-config-succeeded.json](examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json)
- [04-job-updated-ingestion-running.json](examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json)
- [05-job-created-and-started-pramen.json](examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json)
- [06-job-created-and-started-land.json](examples/appendix-c-nested-jobs/06-job-created-and-started-land.json)
- [07-job-finished-land-succeeded.json](examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json)
- [08-job-created-and-started-publish-to-hive.json](examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json)
- [09-job-finished-publish-to-hive-failed.json](examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json)
- [10-job-finished-ingestion-failed.json](examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json)

### Appendix D: Example job retry
This example builds upon the example in appendix C. In this scenario the failed job is retried with special configuration that only executes the Publish to Hive step in the Pramen job.

The retried job is referenced using the `initial_job_id`, and the `attempt_number` is increased to 2.

Example retry event sequence:

The events are stored in separate files under `adr/000-status-change/examples/appendix-d-job-retry/`:

- [01-job-created-and-started-ingestion-retry.json](examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json)
- [02-job-created-and-started-create-pramen-config.json](examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json)
- [03-job-finished-create-pramen-config-succeeded.json](examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json)
- [04-job-created-and-started-pramen.json](examples/appendix-d-job-retry/04-job-created-and-started-pramen.json)
- [05-job-updated-ingestion-running.json](examples/appendix-d-job-retry/05-job-updated-ingestion-running.json)
- [06-job-created-and-started-publish-to-hive.json](examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json)
- [07-job-finished-publish-to-hive-succeeded.json](examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json)
- [08-job-finished-pramen-succeeded.json](examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json)
- [09-job-finished-ingestion-succeeded.json](examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "ea4ef752-2a2b-4f6c-93ee-f31a2f9b7d10",
"tenant_id": "abcd",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646400000,
"country": "za",
"job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_name": "IngestApp Ingestion",
"platform": "aws.stepfunctions",
"input_arguments": {
"pipelineId": 1234,
"currentDate": "2026-05-19",
"triggerType": "SCHEDULE"
},
"definition_id": "1234",
"status_type": "RUNNING"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "ad766149-2d27-4c80-833d-b2f5d7f8108f",
"tenant_id": "abcd",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646410000,
"country": "za",
"job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_name": "Create Pramen Config",
"platform": "aws.lambda",
"input_arguments": {
"pipelineId": 1234,
"currentDate": "2026-05-19"
},
"definition_id": "1234",
"status_type": "RUNNING"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"event_type": "JobFinishedEvent",
"event_id": "ff1714bf-a3f6-4b0d-9b4b-5662b8e9c42e",
"tenant_id": "abcd",
"timestamp_event": 1747646445000,
"country": "za",
"job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"status_type": "SUCCEEDED"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"event_type": "JobUpdatedEvent",
"event_id": "1f2456d2-f2bf-40f7-a8be-e46f3dc9b4b3",
"timestamp_event": 1747646447000,
"job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"status_type": "RUNNING",
"additional_context": {
"pipeline_name": "MYPIPELINE"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2",
"tenant_id": "abcd",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646450000,
"country": "za",
"job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_name": "Pramen",
"platform": "aws.glue",
"definition_id": "1234",
"status_type": "RUNNING"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "4a4c2f5c-8be8-4b26-a5ff-60f6fcb32f43",
"tenant_id": "abcd",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646490000,
"country": "za",
"job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_name": "Land",
"platform": "aws.glue",
"definition_id": "1234",
"status_type": "RUNNING"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"event_type": "JobFinishedEvent",
"event_id": "5f5f5f5f-5a5a-4f5f-8a5a-5f5f5f5f5f5f",
"job_ref": "jr_5ecc2b7609d0e9c665abdcb8af8059489fcd2e60c916bd52476dcfa41e1417af",
"timestamp_event": 1747646520000,
"job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"status_type": "SUCCEEDED"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "6b885f6b-ac7f-4abf-8d20-8bb58f9bd12a",
"tenant_id": "abcd",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646602000,
"country": "za",
"job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_name": "Publish to Hive",
"platform": "aws.glue",
"definition_id": "1234",
"status_type": "RUNNING"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"event_type": "JobFinishedEvent",
"event_id": "7f7f7f7f-7c7c-4f7f-8c7c-7f7f7f7f7f7f",
"timestamp_event": 1747646635000,
"job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"status_type": "FAILED",
"status_subtype": "HIVE_TABLE_UPDATE_FAILED",
"status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"event_type": "JobFinishedEvent",
"event_id": "7baea31d-b630-451f-a59e-699e0c4f53b3",
"timestamp_event": 1747646705000,
"job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"status_type": "FAILED",
"status_subtype": "HIVE_TABLE_UPDATE_FAILED",
"status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "9b7d2a40-6f12-4f9d-9c5e-8f2c0f524e11",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646800000,
"job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"initial_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_name": "IngestApp Ingestion",
"attempt_number": 2,
"platform": "aws.glue",
"input_arguments": {
"pipelineId": 1234,
"currentDate": "2026-05-19"
},
"definition_id": "1234",
"status_type": "RUNNING"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"event_type": "JobCreatedAndStartedEvent",
"event_id": "f2f0cf47-a738-489c-b900-671f74c6f58b",
"tenant_id": "abcd",
"source_app": "ingestapp",
"source_app_version": "2.14.0",
"environment": "dev",
"timestamp_event": 1747646810000,
"country": "za",
"job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"job_name": "Create Pramen Config",
"platform": "aws.lambda",
"input_arguments": {
"pipelineId": 1234,
"currentDate": "2026-05-19"
},
"definition_id": "1234",
"status_type": "RUNNING"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"event_type": "JobFinishedEvent",
"event_id": "4b77d3f1-62d1-42b4-a7f5-83f36fd24818",
"timestamp_event": 1747646824000,
"job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa",
"status_type": "SUCCEEDED"
}
Loading