diff --git a/adr/000-status-change/000-status-change.md b/adr/000-status-change/000-status-change.md new file mode 100644 index 0000000..b11b3fe --- /dev/null +++ b/adr/000-status-change/000-status-change.md @@ -0,0 +1,187 @@ +# ADR 000. Event Bus -- Status change topic + +## Decision + +Create Kafka Topics for Pipeline Status change topics. + +### Background +The pipeline runs topic has been introduced earlier. The main use-case behind that topic was to facilitate event-driven job executions in CurateApp, based on events emitted by IngestApp. + +The purpose of the status change topic is monitoring of job executions. The data model should allow the monitoring of executions in progress and updates to the status of an execution. Furthermore, the data model should allow grouping and nesting of executions, and model retries. The data model should be expressive enough to model dependencies across multiple applications (IngestApp, CurateApp, ProcessApp) + +While the runs topic is related, its schema assumes a run is finished. There is no support for status changes, to track a job execution while it is running. + +Eventually, the status-change topic may provide all the functionality, which is needed to serve the event-driven execution use-case, and thus make the runs topic redundant. However, changing the runs topic to support status updates is hardly possible in a backwards compatible way. Therefore, the status-change should be created as a dedicated new topic. + + +### Status change topic + +A status change event represents a point-in-time update about a single job execution. Instead of sending one final event at the end of a run, producers emit lifecycle events as the execution progresses. + +The five lifecycle event types are: + +- JobCreatedEvent: emitted when an execution is registered and potentially sent to a queue +- JobStartedEvent: emitted when processing actually begins. +- JobCreatedAndStartedEvent: emitted when an execution is started immediately. This replaces the previous two events. +- JobUpdatedEvent: emitted for in-flight updates that are relevant for monitoring (for example progress or contextual metadata enrichment). +- JobFinishedEvent: emitted when execution reaches a terminal state (SUCCEEDED, FAILED, or KILLED). + +Events should be sent whenever the lifecycle changes or monitoring-relevant information changes. At minimum, producers should send one created JobCreatedEvent or JobCreatedAndStartedEvent and one JobFinishedEvent event, with started/updated events included when available. + +Many fields are intentionally optional. In distributed execution environments, many values are only known eventually (for example platform IDs or enriched context). This schema allows progressive disclosure over time while still enforcing a minimal event envelope. + +#### Job hierarchies + +Nested jobs are modeled by setting `parent_job_id` to the parent `job_id`. To optimize analysis, it is advisable to additionally set the same `job_group_id` for all jobs in the hierarchy. It can be set to the root parent id, for example. + +#### Retries + +Retries can be modeled by setting the `initial_job_id` to the job id of the retried job and increasing the `attempt_number`. In case of retries of nested jobs, the `parent_job_id` should be set to the parent of the retried job, and the `initial_job_id` should be set to the job id of the retried job. In the job hierarchy, retried jobs should be siblings, and not have a parent-child relationship. + +## Examples + +Example `JobCreatedAndStartedEvent`: + +```json +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "6f4d8f0f-c9e3-4f53-8d5f-c16ff41f0f90", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646400000, + "country": "za", + "job_id": "e5b8f1cc-e40a-4923-8a77-9e36b1a7bdb0", + "job_name": "IngestApp Ingestion", + "platform": "aws.glue", + "platform_metadata": { + "numberOfWorkers": 8, + "glueVersion": "5.0" + }, + "input_arguments": { + "pipeline_id": "1234", + "current_date": "2026-05-19" + }, + "definition_id": "42", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } +} +``` + +Example `JobFinishedEvent` (FAILED): + +```json +{ + "event_type": "JobFinishedEvent", + "event_id": "f4c2f92d-b6fb-4ce4-a58c-7ba28632a73c", + "timestamp_event": 1747646705000, + "job_id": "e5b8f1cc-e40a-4923-8a77-9e36b1a7bdb0", + "status_type": "FAILED", + "status_subtype": "NO_DATA", + "status_detail": "No data at the source" +} +``` + +More examples can be found in the appendix. + + +## Alternatives Considered +- Reusing the runs topic + + Pros: The runs topic appears similar at first glance, and reusing it would avoid introducing a new topic. + + Cons: The runs schema assumes a completed run, while status-change events need to support live tracking. It also has limited support for job hierarchies. Finally, the primary concern differs: runs events are aimed at downstream triggering, while status-change events focus on detailed monitoring. + +- Separate schema for job steps versus nested jobs + + Pros: A dedicated step schema could reduce repeated fields such as tenant and country. + + Cons: Splitting hierarchy data across separate entities makes traversal and analysis harder. It also increases operational complexity by adding another schema/topic to manage, and introduces ambiguity in deciding whether something should be modeled as a step or as a nested job. + +## Limitations +- A job can only be assigned to one job group id. With this model, it's not possible to assign a job to multiple groups or to define group hierarchies. This may be a limitation when linking jobs across IngestApp, CurateApp and ProcessApp. + +## Future Consideration +- Merging runs topic into status-change topic. It depends on the concrete use-cases of the runs topic, +whether this is desirable and feasible. However, the status-change deliberately uses many of the same fields +that the runs topic uses. + +- Linking jobs across IngestApp, CurateApp and ProcessApp. The model allows this in principle, both as a parent-child relationship or a sibling relationship. For example, CurateApp jobs could reference an IngestApp job as a parent job id, or an artificial common parent job could be defined. In any case, the CurateApp job would need to know the parent job id. + +## Appendix A: Json schema +The json schema is located here: [status_change.json](../../conf/topic_schemas/status_change.json/) + +## Appendix B: The list of allowed country codes + +If a producer sends a code which is not on the list, it should be a +warning, not an error. This is because this list is not final and is +going to be updated. + +| Code | Country | +| --- | --- | +| \* | Multiple countries | +| bw | Botswana | +| gh | Ghana | +| ke | Kenya | +| mu | Mauritius (legacy, prefer offshre/onshore when possible) | +| of-mu | Mauritius (offshore) | +| on-mu | Mauritius (onshore) | +| mz | Mozambique | +| sc | Seychelles | +| tz | Tanzania (legacy, prefer bank-specific versions below when possible) | +| abt-tz | Tanzania (Absa Bank Tanzania) | +| nbc-tz | Tanzania (National Bank of Commerce) | +| ug | Uganda | +| za | South African Republic | +| zm | Zambia | + +This list is not complete, EventGate should allow other values. + +## Appendix C: Example nested jobs + +Example hierarchy: + +- IngestApp Ingestion + - Create Pramen Config Lambda + - Pramen Glue Job + - Land + - Standardize + - Publish to Hive + - Add control metrics + +Example event sequence (same `job_group_id` used for the full hierarchy): + +The events are stored in separate files under `adr/000-status-change/examples/appendix-c-nested-jobs/`: + +- [01-job-created-and-started-ingestion.json](examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json) +- [02-job-created-and-started-create-pramen-config.json](examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json) +- [03-job-finished-create-pramen-config-succeeded.json](examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json) +- [04-job-updated-ingestion-running.json](examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json) +- [05-job-created-and-started-pramen.json](examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json) +- [06-job-created-and-started-land.json](examples/appendix-c-nested-jobs/06-job-created-and-started-land.json) +- [07-job-finished-land-succeeded.json](examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json) +- [08-job-created-and-started-publish-to-hive.json](examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json) +- [09-job-finished-publish-to-hive-failed.json](examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json) +- [10-job-finished-ingestion-failed.json](examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json) + +### Appendix D: Example job retry +This example builds upon the example in appendix C. In this scenario the failed job is retried with special configuration that only executes the Publish to Hive step in the Pramen job. + +The retried job is referenced using the `initial_job_id`, and the `attempt_number` is increased to 2. + +Example retry event sequence: + +The events are stored in separate files under `adr/000-status-change/examples/appendix-d-job-retry/`: + +- [01-job-created-and-started-ingestion-retry.json](examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json) +- [02-job-created-and-started-create-pramen-config.json](examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json) +- [03-job-finished-create-pramen-config-succeeded.json](examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json) +- [04-job-created-and-started-pramen.json](examples/appendix-d-job-retry/04-job-created-and-started-pramen.json) +- [05-job-updated-ingestion-running.json](examples/appendix-d-job-retry/05-job-updated-ingestion-running.json) +- [06-job-created-and-started-publish-to-hive.json](examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json) +- [07-job-finished-publish-to-hive-succeeded.json](examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json) +- [08-job-finished-pramen-succeeded.json](examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json) +- [09-job-finished-ingestion-succeeded.json](examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json) \ No newline at end of file diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json b/adr/000-status-change/examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json new file mode 100644 index 0000000..0d3ace2 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/01-job-created-and-started-ingestion.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ea4ef752-2a2b-4f6c-93ee-f31a2f9b7d10", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646400000, + "country": "za", + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "IngestApp Ingestion", + "platform": "aws.stepfunctions", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19", + "triggerType": "SCHEDULE" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json b/adr/000-status-change/examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json new file mode 100644 index 0000000..ba1348b --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/02-job-created-and-started-create-pramen-config.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ad766149-2d27-4c80-833d-b2f5d7f8108f", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646410000, + "country": "za", + "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Create Pramen Config", + "platform": "aws.lambda", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json b/adr/000-status-change/examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json new file mode 100644 index 0000000..8cf5857 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/03-job-finished-create-pramen-config-succeeded.json @@ -0,0 +1,9 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "ff1714bf-a3f6-4b0d-9b4b-5662b8e9c42e", + "tenant_id": "abcd", + "timestamp_event": 1747646445000, + "country": "za", + "job_id": "0002aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json b/adr/000-status-change/examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json new file mode 100644 index 0000000..c9089c4 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/04-job-updated-ingestion-running.json @@ -0,0 +1,10 @@ +{ + "event_type": "JobUpdatedEvent", + "event_id": "1f2456d2-f2bf-40f7-a8be-e46f3dc9b4b3", + "timestamp_event": 1747646447000, + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json b/adr/000-status-change/examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json new file mode 100644 index 0000000..2d8cc4e --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/05-job-created-and-started-pramen.json @@ -0,0 +1,17 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646450000, + "country": "za", + "job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Pramen", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/06-job-created-and-started-land.json b/adr/000-status-change/examples/appendix-c-nested-jobs/06-job-created-and-started-land.json new file mode 100644 index 0000000..f64866c --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/06-job-created-and-started-land.json @@ -0,0 +1,17 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "4a4c2f5c-8be8-4b26-a5ff-60f6fcb32f43", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646490000, + "country": "za", + "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Land", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json b/adr/000-status-change/examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json new file mode 100644 index 0000000..4c18225 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/07-job-finished-land-succeeded.json @@ -0,0 +1,8 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "5f5f5f5f-5a5a-4f5f-8a5a-5f5f5f5f5f5f", + "job_ref": "jr_5ecc2b7609d0e9c665abdcb8af8059489fcd2e60c916bd52476dcfa41e1417af", + "timestamp_event": 1747646520000, + "job_id": "0004aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json b/adr/000-status-change/examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json new file mode 100644 index 0000000..647fd5a --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/08-job-created-and-started-publish-to-hive.json @@ -0,0 +1,17 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "6b885f6b-ac7f-4abf-8d20-8bb58f9bd12a", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646602000, + "country": "za", + "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0003aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Publish to Hive", + "platform": "aws.glue", + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json b/adr/000-status-change/examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json new file mode 100644 index 0000000..d81050a --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/09-job-finished-publish-to-hive-failed.json @@ -0,0 +1,9 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "7f7f7f7f-7c7c-4f7f-8c7c-7f7f7f7f7f7f", + "timestamp_event": 1747646635000, + "job_id": "0006aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "FAILED", + "status_subtype": "HIVE_TABLE_UPDATE_FAILED", + "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." +} diff --git a/adr/000-status-change/examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json b/adr/000-status-change/examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json new file mode 100644 index 0000000..ab9e241 --- /dev/null +++ b/adr/000-status-change/examples/appendix-c-nested-jobs/10-job-finished-ingestion-failed.json @@ -0,0 +1,9 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "7baea31d-b630-451f-a59e-699e0c4f53b3", + "timestamp_event": 1747646705000, + "job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "FAILED", + "status_subtype": "HIVE_TABLE_UPDATE_FAILED", + "status_detail": "sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target: Unable to execute HTTP request: PKIX path building failed." +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json b/adr/000-status-change/examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json new file mode 100644 index 0000000..f1cbd73 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/01-job-created-and-started-ingestion-retry.json @@ -0,0 +1,20 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "9b7d2a40-6f12-4f9d-9c5e-8f2c0f524e11", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646800000, + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "initial_job_id": "0001aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "IngestApp Ingestion", + "attempt_number": 2, + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json b/adr/000-status-change/examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json new file mode 100644 index 0000000..f0bc2c8 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/02-job-created-and-started-create-pramen-config.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "f2f0cf47-a738-489c-b900-671f74c6f58b", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646810000, + "country": "za", + "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Create Pramen Config", + "platform": "aws.lambda", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json new file mode 100644 index 0000000..f2c42e2 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/03-job-finished-create-pramen-config-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "4b77d3f1-62d1-42b4-a7f5-83f36fd24818", + "timestamp_event": 1747646824000, + "job_id": "0012aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/04-job-created-and-started-pramen.json b/adr/000-status-change/examples/appendix-d-job-retry/04-job-created-and-started-pramen.json new file mode 100644 index 0000000..f7a2acd --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/04-job-created-and-started-pramen.json @@ -0,0 +1,21 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "ec36ab3b-0f97-4e81-8f4d-b88f493f6ca2", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646825000, + "country": "za", + "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Pramen", + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/05-job-updated-ingestion-running.json b/adr/000-status-change/examples/appendix-d-job-retry/05-job-updated-ingestion-running.json new file mode 100644 index 0000000..5512a7c --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/05-job-updated-ingestion-running.json @@ -0,0 +1,12 @@ +{ + "event_type": "JobUpdatedEvent", + "event_id": "9dbf71c1-5f71-4a92-8747-3db90d6904e4", + "tenant_id": "abcd", + "timestamp_event": 1747646826000, + "country": "za", + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "RUNNING", + "additional_context": { + "pipeline_name": "MYPIPELINE" + } +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json b/adr/000-status-change/examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json new file mode 100644 index 0000000..fd8258d --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/06-job-created-and-started-publish-to-hive.json @@ -0,0 +1,22 @@ +{ + "event_type": "JobCreatedAndStartedEvent", + "event_id": "2e6f31c8-76b8-4bcf-9066-b27cbd5564e1", + "tenant_id": "abcd", + "source_app": "ingestapp", + "source_app_version": "2.14.0", + "environment": "dev", + "timestamp_event": 1747646830000, + "country": "za", + "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "parent_job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_group_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "job_name": "Publish to Hive", + "attempt_number": 1, + "platform": "aws.glue", + "input_arguments": { + "pipelineId": 1234, + "currentDate": "2026-05-19" + }, + "definition_id": "1234", + "status_type": "RUNNING" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json new file mode 100644 index 0000000..b43d583 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/07-job-finished-publish-to-hive-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", + "timestamp_event": 1747646865000, + "job_id": "0014aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json new file mode 100644 index 0000000..39705f4 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/08-job-finished-pramen-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "e6207f4e-f2cd-4d9c-b2f0-f6490c15f89f", + "timestamp_event": 1747646865000, + "job_id": "0013aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/adr/000-status-change/examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json b/adr/000-status-change/examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json new file mode 100644 index 0000000..e2c5909 --- /dev/null +++ b/adr/000-status-change/examples/appendix-d-job-retry/09-job-finished-ingestion-succeeded.json @@ -0,0 +1,7 @@ +{ + "event_type": "JobFinishedEvent", + "event_id": "2fe09e1f-25d0-4d89-911e-f5fbf699d1f5", + "timestamp_event": 1747646870000, + "job_id": "0011aaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaaaaaa", + "status_type": "SUCCEEDED" +} diff --git a/conf/access.json b/conf/access.json index bb21ccc..9d80b59 100644 --- a/conf/access.json +++ b/conf/access.json @@ -9,5 +9,6 @@ } }, "public.cps.za.dlchange": ["FooUser", "BarUser"], - "public.cps.za.test": ["TestUser"] + "public.cps.za.test": ["TestUser"], + "public.cps.za.status_change": ["TestUser"] } diff --git a/conf/config.json b/conf/config.json index 9770b27..2265cdc 100644 --- a/conf/config.json +++ b/conf/config.json @@ -1,5 +1,6 @@ { "access_config": "s3:///access.json", + "topic_keys_config": "conf/topic_keys.json", "token_provider_url": "https://", "token_public_keys_url": "https://", "kafka_bootstrap_server": "localhost:9092", diff --git a/conf/topic_keys.json b/conf/topic_keys.json new file mode 100644 index 0000000..db0a690 --- /dev/null +++ b/conf/topic_keys.json @@ -0,0 +1,3 @@ +{ + "public.cps.za.status_change": "job_id" +} diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json new file mode 100644 index 0000000..fe9ecfa --- /dev/null +++ b/conf/topic_schemas/status_change.json @@ -0,0 +1,287 @@ +{ + "type": "object", + "properties": { + "event_type": { + "type": "string", + "enum": [ + "JobCreatedEvent", + "JobCreatedAndStartedEvent", + "JobStartedEvent", + "JobUpdatedEvent", + "JobFinishedEvent" + ], + "description": "Lifecycle event type for job status changes." + }, + "event_id": { + "type": "string", + "format": "uuid", + "description": "Unique identifier for the event (UUID)" + }, + "job_ref": { + "type": [ + "string", + "null" + ], + "description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." + }, + "tenant_id": { + "type": [ + "string", + "null" + ], + "description": "Application ID or ServiceNow identifier" + }, + "source_app": { + "type": "string", + "description": " Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_event": { + "type": "integer", + "minimum": 0, + "description": "Timestamp of the event in epoch milliseconds" + }, + "country": { + "type": [ + "string", + "null" + ], + "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." + }, + "job_id": { + "type": "string", + "format": "uuid", + "description": "Primary job identifier (UUID)." + }, + "parent_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional parent job identifier (UUID), to represent nested job hierarchies." + }, + "initial_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional initial job identifier (UUID), to represent retried or replayed jobs." + }, + "job_group_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Job group identifier (UUID), may or may not reference a job id." + }, + "job_name": { + "type": "string", + "description": "Human-readable job name." + }, + "attempt_number": { + "type": [ + "integer", + "null" + ], + "minimum": 1, + "description": "Attempt number for this job." + }, + "platform": { + "type": [ + "string", + "null" + ], + "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." + }, + "platform_metadata": { + "type": [ + "object", + "null" + ], + "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." + }, + "input_arguments": { + "type": [ + "object", + "null" + ], + "description": "Arguments passed to the job." + }, + "definition_id": { + "type": "string", + "description": "Definition (Pipeline, Domain, Process) identifier." + }, + "definition_version": { + "type": [ + "string", + "null" + ], + "description": "Optional definition version." + }, + "status_type": { + "type": "string", + "enum": [ + "WAITING", + "RUNNING", + "SUCCEEDED", + "FAILED", + "KILLED" + ], + "description": "High-level status type for the current lifecycle event." + }, + "status_subtype": { + "type": [ + "string", + "null" + ], + "description": "Optional status subtype, e.g. NO_DATA or error code." + }, + "status_detail": { + "type": [ + "string", + "null" + ], + "description": "Optional human-readable status detail, e.g. short error message." + }, + "additional_context": { + "type": [ + "object", + "null" + ], + "description": "Additional context payload." + } + }, + "required": [ + "event_type", + "event_id", + "job_id", + "status_type" + ], + "allOf": [ + { + "if": { + "properties": { + "event_type": { + "enum": [ + "JobCreatedEvent" + ] + } + } + }, + "then": { + "required": [ + "job_name", + "source_app", + "source_app_version", + "timestamp_event", + "environment", + "definition_id", + "platform" + ], + "properties": { + "status_type": { + "enum": [ + "WAITING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "enum": [ + "JobCreatedAndStartedEvent" + ] + } + } + }, + "then": { + "required": [ + "job_name", + "source_app", + "source_app_version", + "timestamp_event", + "environment", + "definition_id", + "platform" + ], + "properties": { + "status_type": { + "enum": [ + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobStartedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobUpdatedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "WAITING", + "RUNNING" + ] + } + } + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobFinishedEvent" + } + } + }, + "then": { + "properties": { + "status_type": { + "enum": [ + "SUCCEEDED", + "FAILED", + "KILLED" + ] + } + } + } + } + ] +} diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 9861496..7fbae8e 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -64,7 +64,12 @@ # Initialize EventGate handlers handler_token = HandlerToken(config).with_public_keys_queried() -handler_topic = HandlerTopic(config, aws_s3, handler_token, writers).with_load_access_config().with_load_topic_schemas() +handler_topic = ( + HandlerTopic(config, aws_s3, handler_token, writers) + .with_load_access_config() + .with_load_topic_keys_config() + .with_load_topic_schemas() +) handler_health = HandlerHealth(writers) handler_api = HandlerApi().with_api_definition_loaded() diff --git a/src/handlers/handler_topic.py b/src/handlers/handler_topic.py index 5426805..34b0107 100644 --- a/src/handlers/handler_topic.py +++ b/src/handlers/handler_topic.py @@ -28,8 +28,8 @@ from src.handlers.handler_token import HandlerToken from src.utils.conf_path import CONF_DIR -from src.utils.config_loader import TopicAccessMap, load_access_config -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.config_loader import TopicAccessMap, TopicKeyMap, load_access_config, load_topic_keys_config +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST from src.utils.utils import build_error_response from src.writers.writer import WriteError, Writer @@ -51,6 +51,7 @@ def __init__( self.handler_token = handler_token self.writers = writers self.access_config: TopicAccessMap = {} + self.topic_keys: TopicKeyMap = {} self.topics: dict[str, dict[str, Any]] = {} def with_load_access_config(self) -> "HandlerTopic": @@ -75,10 +76,20 @@ def with_load_topic_schemas(self) -> "HandlerTopic": self.topics[TOPIC_DLCHANGE] = json.load(file) with open(os.path.join(topic_schemas_dir, "test.json"), "r", encoding="utf-8") as file: self.topics[TOPIC_TEST] = json.load(file) + with open(os.path.join(topic_schemas_dir, "status_change.json"), "r", encoding="utf-8") as file: + self.topics[TOPIC_STATUS_CHANGE] = json.load(file) logger.debug("Loaded topic schemas successfully.") return self + def with_load_topic_keys_config(self) -> "HandlerTopic": + """Load topic key mapping configuration from S3 or local file. + Returns: + The current instance with loaded topic key config. + """ + self.topic_keys = load_topic_keys_config(self.config, self.aws_s3) + return self + def get_topics_list(self) -> dict[str, Any]: """Return the list of available topics. Returns: @@ -169,10 +180,11 @@ def _post_topic_message(self, topic_name: str, topic_message: dict[str, Any], to except ValidationError as exc: return build_error_response(400, "validation", exc.message) + message_key = self._resolve_message_key(topic_name, topic_message) errors = [] for writer_name, writer in self.writers.items(): try: - writer.write(topic_name, topic_message) + writer.write(topic_name, topic_message, message_key) except WriteError as exc: errors.append({"type": writer_name, "message": str(exc)}) @@ -215,3 +227,26 @@ def _validate_user_permissions( return False, f"Field '{restricted_field}' value not permitted for user '{user}'" return True, None + + def _resolve_message_key(self, topic_name: str, message: dict[str, Any]) -> str: + """Resolve topic key value from message using configured field mapping. + Args: + topic_name: Target topic name. + message: Topic payload. + Returns: + String key value for writers, or empty string when no key is configured/resolvable. + """ + key_field = self.topic_keys.get(topic_name) + if not key_field: + return "" + + key_value = message.get(key_field) + if key_value is None: + logger.warning("Topic key field '%s' missing for topic '%s'.", key_field, topic_name) + return "" + + if isinstance(key_value, (dict, list)): + logger.warning("Topic key field '%s' for topic '%s' is not scalar.", key_field, topic_name) + return "" + + return str(key_value) diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py index 673bcbf..3093da4 100644 --- a/src/utils/config_loader.py +++ b/src/utils/config_loader.py @@ -24,12 +24,13 @@ from boto3.resources.base import ServiceResource -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST logger = logging.getLogger(__name__) FieldPatterns = dict[str, list[re.Pattern[str]]] TopicAccessMap = dict[str, dict[str, FieldPatterns]] +TopicKeyMap = dict[str, str] def load_config(conf_dir: str) -> dict[str, Any]: @@ -46,6 +47,24 @@ def load_config(conf_dir: str) -> dict[str, Any]: return config +def _load_json_from_path(path: str, aws_s3: ServiceResource) -> dict[str, Any]: + """Load JSON data from either S3 (`s3://...`) or local file path. + Args: + path: JSON source path. + aws_s3: Boto3 S3 resource used for S3-backed paths. + Returns: + Parsed JSON object. + """ + if path.startswith("s3://"): + name_parts = path.split("/") + bucket_name = name_parts[2] + bucket_object_key = "/".join(name_parts[3:]) + return json.loads(aws_s3.Bucket(bucket_name).Object(bucket_object_key).get()["Body"].read().decode("utf-8")) + + with open(path, "r", encoding="utf-8") as file: + return json.load(file) + + def _compile_topic_patterns( topic: str, user_constraints: dict[str, dict[str, list[str]]], @@ -121,24 +140,48 @@ def load_access_config(config: dict[str, Any], aws_s3: ServiceResource) -> Topic """ access_path: str = config["access_config"] logger.debug("Loading access configuration from %s.", access_path) - - access_data: dict[str, Any] = {} - - if access_path.startswith("s3://"): - name_parts = access_path.split("/") - bucket_name = name_parts[2] - bucket_object_key = "/".join(name_parts[3:]) - access_data = json.loads( - aws_s3.Bucket(bucket_name).Object(bucket_object_key).get()["Body"].read().decode("utf-8") - ) - else: - with open(access_path, "r", encoding="utf-8") as file: - access_data = json.load(file) + access_data = _load_json_from_path(access_path, aws_s3) logger.debug("Loaded access configuration.") return _normalize_access_config(access_data) +def _validate_topic_keys_config(topic_key_data: dict[str, Any]) -> None: + """Validate topic key mapping config. + Args: + topic_key_data: Parsed JSON mapping of topic to event property name. + Returns: + None + Raises: + ValueError: If any topic key mapping is not a string. + """ + for topic, field_name in topic_key_data.items(): + if not isinstance(field_name, str): + raise ValueError(f"Topic '{topic}': expected key field name as string, got {type(field_name).__name__}.") + return + + +def load_topic_keys_config(config: dict[str, Any], aws_s3: ServiceResource) -> TopicKeyMap: + """Load topic key configuration from S3 or a local file. + Args: + config: Main configuration dict. If `topic_keys_config` is absent, returns empty mapping. + aws_s3: Boto3 S3 resource for loading from S3 paths. + Returns: + Mapping of topic name to message property used as Kafka key. + """ + topic_keys_path = config.get("topic_keys_config") + if not topic_keys_path: + logger.debug("No topic_keys_config configured. Using empty topic key mapping.") + return {} + + logger.debug("Loading topic key configuration from %s.", topic_keys_path) + topic_key_data = _load_json_from_path(topic_keys_path, aws_s3) + + logger.debug("Loaded topic key configuration.") + _validate_topic_keys_config(topic_key_data) + return topic_key_data + + def load_topic_names(conf_dir: str) -> list[str]: """Discover topic names from the topic_schemas directory. Args: @@ -150,6 +193,7 @@ def load_topic_names(conf_dir: str) -> list[str]: "runs.json": TOPIC_RUNS, "dlchange.json": TOPIC_DLCHANGE, "test.json": TOPIC_TEST, + "status_change.json": TOPIC_STATUS_CHANGE, } schemas_dir = os.path.join(conf_dir, "topic_schemas") topics: list[str] = [] diff --git a/src/utils/constants.py b/src/utils/constants.py index 425b376..f0af65a 100644 --- a/src/utils/constants.py +++ b/src/utils/constants.py @@ -37,6 +37,7 @@ TOPIC_RUNS = "public.cps.za.runs" TOPIC_DLCHANGE = "public.cps.za.dlchange" TOPIC_TEST = "public.cps.za.test" +TOPIC_STATUS_CHANGE = "public.cps.za.status_change" SUPPORTED_WRITE_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS, TOPIC_DLCHANGE, TOPIC_TEST}) SUPPORTED_STATS_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS}) diff --git a/src/writers/writer.py b/src/writers/writer.py index af5f7eb..0269714 100644 --- a/src/writers/writer.py +++ b/src/writers/writer.py @@ -38,11 +38,12 @@ def __init__(self, config: dict[str, Any]) -> None: self.config = config @abstractmethod - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Publish a message to the target system. Args: topic_name: Target writer topic (destination) name. message: JSON-serializable payload to publish. + message_key: Optional transport key for systems that support keyed messages. Raises: WriteError: If publishing fails. """ diff --git a/src/writers/writer_eventbridge.py b/src/writers/writer_eventbridge.py index 5bcac47..076f007 100644 --- a/src/writers/writer_eventbridge.py +++ b/src/writers/writer_eventbridge.py @@ -49,11 +49,12 @@ def _format_failed_entries(self) -> str: failed = [e for e in self._entries if "ErrorCode" in e or "ErrorMessage" in e] return json.dumps(failed) if failed else "[]" - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Publish a message to EventBridge. Args: topic_name: Target EventBridge writer topic (destination) name. message: JSON-serializable payload to publish. + message_key: Optional transport key (unused by EventBridge). Raises: WriteError: If publishing fails. """ diff --git a/src/writers/writer_kafka.py b/src/writers/writer_kafka.py index 831789a..58a9ad3 100644 --- a/src/writers/writer_kafka.py +++ b/src/writers/writer_kafka.py @@ -88,11 +88,12 @@ def _flush_with_timeout(self, timeout: float) -> int | None: except TypeError: return self._producer.flush() - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Publish a message to Kafka. Args: topic_name: Kafka topic to publish to. message: JSON-serializable payload. + message_key: Optional Kafka key used for partitioning. Raises: WriteError: If publishing fails. """ @@ -115,7 +116,7 @@ def write(self, topic_name: str, message: dict[str, Any]) -> None: logger.debug("Sending to Kafka %s.", topic_name) self._producer.produce( topic_name, - key="", + key=message_key, value=json.dumps(message).encode("utf-8"), callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), ) diff --git a/src/writers/writer_postgres.py b/src/writers/writer_postgres.py index b405d77..825a230 100644 --- a/src/writers/writer_postgres.py +++ b/src/writers/writer_postgres.py @@ -158,11 +158,12 @@ def _insert_test(self, cursor: Any, message: dict[str, Any]) -> None: }, ) - def write(self, topic_name: str, message: dict[str, Any]) -> None: + def write(self, topic_name: str, message: dict[str, Any], message_key: str = "") -> None: """Dispatch insertion for a topic into the correct Postgres table(s). Args: topic_name: Incoming topic identifier. message: JSON-serializable payload. + message_key: Optional transport key (unused by Postgres writer). Raises: WriteError: If publishing fails. """ diff --git a/tests/unit/handlers/test_handler_topic.py b/tests/unit/handlers/test_handler_topic.py index fc4a5b7..a8123a8 100644 --- a/tests/unit/handlers/test_handler_topic.py +++ b/tests/unit/handlers/test_handler_topic.py @@ -71,6 +71,52 @@ def test_load_access_config_from_s3(): mock_aws_s3.Bucket.return_value.Object.assert_called_once_with("path/to/access.json") +## load_topic_keys_config() +def test_load_topic_keys_config_from_local_file(): + """Test loading topic key config from local file.""" + mock_handler_token = MagicMock() + mock_aws_s3 = MagicMock() + mock_writers = { + "kafka": MagicMock(), + "eventbridge": MagicMock(), + "postgres": MagicMock(), + } + config = {"topic_keys_config": "conf/topic_keys.json"} + handler = HandlerTopic(config, mock_aws_s3, mock_handler_token, mock_writers) + + topic_keys_data = {"public.cps.za.test": "event_id"} + with patch("builtins.open", mock_open(read_data=json.dumps(topic_keys_data))): + result = handler.with_load_topic_keys_config() + + assert result is handler + assert "event_id" == handler.topic_keys["public.cps.za.test"] + + +def test_load_topic_keys_config_from_s3(): + """Test loading topic key config from S3.""" + mock_handler_token = MagicMock() + mock_aws_s3 = MagicMock() + mock_writers = { + "kafka": MagicMock(), + "eventbridge": MagicMock(), + "postgres": MagicMock(), + } + config = {"topic_keys_config": "s3://my-bucket/path/to/topic_keys.json"} + handler = HandlerTopic(config, mock_aws_s3, mock_handler_token, mock_writers) + + topic_keys_data = {"public.cps.za.status_change": "job_id"} + mock_body = MagicMock() + mock_body.read.return_value = json.dumps(topic_keys_data).encode("utf-8") + mock_aws_s3.Bucket.return_value.Object.return_value.get.return_value = {"Body": mock_body} + + result = handler.with_load_topic_keys_config() + + assert result is handler + assert "job_id" == handler.topic_keys["public.cps.za.status_change"] + mock_aws_s3.Bucket.assert_called_once_with("my-bucket") + mock_aws_s3.Bucket.return_value.Object.assert_called_once_with("path/to/topic_keys.json") + + ## load_topic_schemas() def test_load_topic_schemas_success(): mock_handler_token = MagicMock() @@ -87,6 +133,7 @@ def test_load_topic_schemas_success(): "runs.json": {"type": "object", "properties": {"run_id": {"type": "string"}}}, "dlchange.json": {"type": "object", "properties": {"change_id": {"type": "string"}}}, "test.json": {"type": "object", "properties": {"event_id": {"type": "string"}}}, + "status_change.json": {"type": "object", "properties": {"execution_id": {"type": "string"}}}, } def mock_open_side_effect(file_path, *_args, **_kwargs): @@ -99,10 +146,11 @@ def mock_open_side_effect(file_path, *_args, **_kwargs): result = handler.with_load_topic_schemas() assert result is handler - assert 3 == len(handler.topics) + assert 4 == len(handler.topics) assert "public.cps.za.runs" in handler.topics assert "public.cps.za.dlchange" in handler.topics assert "public.cps.za.test" in handler.topics + assert "public.cps.za.status_change" in handler.topics ## get_topics_list() @@ -112,6 +160,7 @@ def test_get_topics(event_gate_module, make_event): assert 200 == resp["statusCode"] body = json.loads(resp["body"]) assert "public.cps.za.test" in body + assert "public.cps.za.status_change" in body ## get_topic_schema() @@ -208,6 +257,50 @@ def test_post_success_all_writers(event_gate_module, make_event, valid_payload): assert 202 == body["statusCode"] +def test_post_passes_topic_key_to_writers(event_gate_module, make_event, valid_payload): + """Configured topic key field is extracted and passed to writer.write.""" + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + event_gate_module.handler_topic.topic_keys["public.cps.za.test"] = "event_id" + kafka_writer = event_gate_module.handler_topic.writers["kafka"] + kafka_writer.write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["eventbridge"].write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["postgres"].write = MagicMock(return_value=None) + + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert 202 == resp["statusCode"] + + kafka_writer.write.assert_called_once_with("public.cps.za.test", valid_payload, "e1") + + +def test_post_missing_topic_key_field_falls_back_to_empty_key(event_gate_module, make_event, valid_payload): + """Missing configured key field falls back to empty message key.""" + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + event_gate_module.handler_topic.topic_keys["public.cps.za.test"] = "job_id" + kafka_writer = event_gate_module.handler_topic.writers["kafka"] + kafka_writer.write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["eventbridge"].write = MagicMock(return_value=None) + event_gate_module.handler_topic.writers["postgres"].write = MagicMock(return_value=None) + + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert 202 == resp["statusCode"] + + kafka_writer.write.assert_called_once_with("public.cps.za.test", valid_payload, "") + + def test_post_single_writer_failure(event_gate_module, make_event, valid_payload): with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): event_gate_module.handler_topic.writers["kafka"].write = MagicMock(side_effect=WriteError("Kafka boom")) diff --git a/tests/unit/utils/test_config_loader.py b/tests/unit/utils/test_config_loader.py index 82301f7..f48cba5 100644 --- a/tests/unit/utils/test_config_loader.py +++ b/tests/unit/utils/test_config_loader.py @@ -24,14 +24,24 @@ import pytest -from src.utils.config_loader import _normalize_access_config, load_access_config, load_config, load_topic_names +from src.utils.config_loader import ( + _normalize_access_config, + load_access_config, + load_config, + load_topic_keys_config, + load_topic_names, +) @pytest.fixture def conf_dir(tmp_path: Path) -> str: """Create a temporary config directory with topic schemas.""" # Write config.json. - config_data = {"token_provider_url": "http://test", "access_config": str(tmp_path / "access.json")} + config_data = { + "token_provider_url": "http://test", + "access_config": str(tmp_path / "access.json"), + "topic_keys_config": str(tmp_path / "topic_keys.json"), + } (tmp_path / "config.json").write_text(json.dumps(config_data), encoding="utf-8") # Write access.json. @@ -40,12 +50,19 @@ def conf_dir(tmp_path: Path) -> str: encoding="utf-8", ) + # Write topic_keys.json. + (tmp_path / "topic_keys.json").write_text( + json.dumps({"public.cps.za.runs": "event_id", "public.cps.za.status_change": "job_id"}), + encoding="utf-8", + ) + # Write topic_schemas. schemas_dir = tmp_path / "topic_schemas" schemas_dir.mkdir() (schemas_dir / "runs.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "dlchange.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "test.json").write_text('{"type": "object"}', encoding="utf-8") + (schemas_dir / "status_change.json").write_text('{"type": "object"}', encoding="utf-8") return str(tmp_path) @@ -101,13 +118,14 @@ class TestLoadTopicNames: """Tests for load_topic_names().""" def test_discovers_all_topics(self, conf_dir: str) -> None: - """Test that all three topics are discovered from schema files.""" + """Test that all configured topics are discovered from schema files.""" result = load_topic_names(conf_dir) - assert 3 == len(result) + assert 4 == len(result) assert "public.cps.za.runs" in result assert "public.cps.za.dlchange" in result assert "public.cps.za.test" in result + assert "public.cps.za.status_change" in result def test_missing_schema_file_excluded(self, conf_dir: str) -> None: """Test that a missing schema file excludes that topic.""" @@ -115,7 +133,7 @@ def test_missing_schema_file_excluded(self, conf_dir: str) -> None: result = load_topic_names(conf_dir) - assert 2 == len(result) + assert 3 == len(result) assert "public.cps.za.test" not in result def test_empty_schemas_dir(self, tmp_path: Path) -> None: @@ -127,6 +145,42 @@ def test_empty_schemas_dir(self, tmp_path: Path) -> None: assert [] == result +class TestLoadTopicKeysConfig: + """Tests for load_topic_keys_config().""" + + def test_loads_from_local_file(self, conf_dir: str) -> None: + """Test loading topic key config from local file path.""" + config = load_config(conf_dir) + aws_s3 = MagicMock() + + result = load_topic_keys_config(config, aws_s3) + + assert "job_id" == result["public.cps.za.status_change"] + aws_s3.Bucket.assert_not_called() + + def test_loads_from_s3(self) -> None: + """Test loading topic key config from S3 path.""" + config = {"topic_keys_config": "s3://my-bucket/conf/topic_keys.json"} + key_data = {"public.cps.za.test": "event_id"} + + mock_body = MagicMock() + mock_body.read.return_value = json.dumps(key_data).encode("utf-8") + + mock_s3 = MagicMock() + mock_s3.Bucket.return_value.Object.return_value.get.return_value = {"Body": mock_body} + + result = load_topic_keys_config(config, mock_s3) + + assert "event_id" == result["public.cps.za.test"] + mock_s3.Bucket.assert_called_once_with("my-bucket") + mock_s3.Bucket.return_value.Object.assert_called_once_with("conf/topic_keys.json") + + def test_missing_config_returns_empty_mapping(self) -> None: + """Test missing topic_keys_config returns empty mapping.""" + result = load_topic_keys_config({}, MagicMock()) + assert {} == result + + class TestNormalizeAccessConfig: """Tests for _normalize_access_config().""" diff --git a/tests/unit/writers/test_writer_kafka.py b/tests/unit/writers/test_writer_kafka.py index b68e834..bce2baf 100644 --- a/tests/unit/writers/test_writer_kafka.py +++ b/tests/unit/writers/test_writer_kafka.py @@ -95,6 +95,16 @@ def test_write_success(): writer.write("topic", {"b": 2}) +def test_write_uses_message_key(): + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + producer = FakeProducerSuccess() + writer._producer = producer + + writer.write("topic", {"b": 2}, message_key="job-1") + + assert [("topic", "job-1", b'{"b": 2}')] == producer.produced + + def test_write_async_error(): writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) writer._producer = FakeProducerError()