Skip to content

RGadi9360/workspace_all

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

                                # DQ Monitoring Bundle

Overview

A reusable, config-driven Databricks Asset Bundle that monitors data quality across any catalog (observability, security, network, etc.) and any environment (DEV, NONPROD, PROD). A scheduled job runs DQ checks, writes results to a Delta table in a dedicated monitoring schema, and Databricks-native SQL alerts read from that table to trigger Slack + Email notifications — with full alert history and clickable Alert links.


Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                        DEVELOPER WORKFLOW                               │
│                                                                         │
│   1. Edit config/observability.yml (add/remove tables or thresholds)    │
│   2. git push                                                           │
│   3. CI/CD: databricks bundle deploy --target <env>                     │
│                                                                         │
└──────────────────────────────────┬──────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                     DATABRICKS WORKSPACE                                │
│                                                                         │
│  ┌────────────────────────────────────────────────────────────────┐     │
│  │  JOB (Serverless, every 15 min)                                │     │
│  │  Identity: Service Principal                                   │     │
│  │                                                                │     │
│  │  1. Read config YAML (catalog_name + table list + thresholds)  │     │
│  │  2. Construct full catalog: {catalog_name}_{environment}       │     │
│  │  3. Run 5 DQ checks (UNION ALL SQL across all tables)          │     │
│  │  4. Compare with previous run → calculate state_change         │     │
│  │  5. Write ALL results to Delta table                           │     │
│  └────────────────────────────────┬───────────────────────────────┘     │
│                                   │ writes                              │
│                                   ▼                                     │
│  ┌────────────────────────────────────────────────────────────────┐     │
│  │  DELTA TABLE: {catalog}.monitoring.dq_results                  │     │
│  │  (Dedicated monitoring schema — separate from bronze/silver)   │     │
│  │                                                                │     │
│  │  Stores ALL results: PASSED + FAILED + RESOLVED                │     │
│  └────────────────────────────────┬───────────────────────────────┘     │
│                                   │ reads (5 min offset)                │
│                                   ▼                                     │
│  ┌────────────────────────────────────────────────────────────────┐     │
│  │  6 SQL ALERTS (static, NEVER change regardless of table count) │     │
│  │  SQL Serverless Warehouse                                      │     │
│  │                                                                │     │
│  │  SELECT FROM monitoring.dq_results                             │     │
│  │  WHERE check_type = '...' AND status = 'FAILED'                │     │
│  └────────────────────────────────┬───────────────────────────────┘     │
│                                   │ triggers                            │
└───────────────────────────────────┼─────────────────────────────────────┘
                                    │
                     ┌──────────────┼──────────────┐
                     ▼              ▼              ▼
              ┌───────────┐  ┌──────────┐  ┌──────────────┐
              │   SLACK   │  │  EMAIL   │  │  ALERT UI    │
              │  (native) │  │ (native) │  │  (history)   │
              └───────────┘  └──────────┘  └──────────────┘

Catalog & Schema Structure

observability_dev (catalog)
├── bronze (schema)              ← Source data (ingested from AppD, Dynatrace, EOP, etc.)
│   ├── appd_metrics
│   ├── dynatrace_metrics
│   ├── eop_logs
│   ├── eop_metrics
│   ├── eop_traces
│   ├── incident
│   ├── incident_alert
│   └── entra_logs_eventhub_bronze
│
├── silver (schema)              ← Transformed data
│   ├── appd_metrics
│   ├── dynatrace_metrics
│   ├── eop_logs
│   ├── eop_metrics
│   └── eop_traces
│
├── gold (schema)                ← Business-ready data
│   └── ...
│
└── monitoring (schema)          ← DEDICATED DQ schema (separate from data layers)
    └── dq_results               ← ONE table stores ALL check results

security_dev (catalog)
├── bronze/
│   ├── authentication
│   ├── akeyless_kv
│   └── entra_logs_eventhub_bronze
├── silver/
│   └── authentication
└── monitoring/
    └── dq_results               ← ONE table stores ALL check results

Why monitoring is a Separate Schema

Benefit Explanation
Clean separation Business data in bronze/silver/gold, monitoring data in monitoring
Independent permissions Grant monitoring access without granting bronze/silver access
No interference DQ writes don't affect bronze/silver table performance
Team ownership Platform team owns monitoring, data eng owns bronze/silver
Alert isolation SQL alerts only need SELECT on monitoring (minimal permissions)
Future-proof Add more tables later (audit_log, sla_metrics, etc.)

Bundle File Structure

observability-alerts/
├── databricks.yml                      ← Bundle entry point (multi-catalog targets)
├── config/
│   ├── observability.yml               ← Tables + thresholds for observability catalog
│   └── security.yml                    ← Tables + thresholds for security catalog
├── notebooks/
│   └── dq_monitor.py                   ← All check logic inline (runs checks → writes Delta)
└── resources/
    ├── variables.yml                   ← All bundle variables
    ├── dq_job.yml                      ← Scheduled job definition (every 15 min)
    └── alerts.yml                      ← 6 STATIC SQL alerts (NEVER changes)

7 files total. No utils/ folder, no external dependencies.


Components

1. databricks.yml — Bundle Entry Point

Defines targets (environments + catalogs) and includes resource files. Each target sets config_file, environment, and pause_status.

2. config/{catalog}.yml — Developer Config (ONLY file developer edits)

Single source of truth for which tables to monitor + thresholds. Includes catalog_name so the config is self-documenting.

catalog_name: observability

bronze_tables:
  - appd_metrics
  - dynatrace_metrics
  - eop_logs
  ...

silver_tables:
  - appd_metrics
  - dynatrace_metrics
  ...

bronze_null_columns:
  - elt_uuid
  - bronze_observed_timestamp
  - bronze_processed_timestamp
  - raw

silver_null_columns:
  - elt_uuid
  - bronze_observed_timestamp
  - bronze_processed_timestamp
  - silver_processed_timestamp

thresholds:
  null_pct: 1.0
  corrupt_pct: 5.0
  window_minutes: 30

3. notebooks/dq_monitor.py — Check Engine

Runs 5 DQ checks and writes ALL results (PASSED + FAILED) to Delta table. Does NOT send notifications — SQL alerts handle that.

What it does every 15 minutes:

  1. Read parameters (environment, config_path, bundle_name)
  2. Load config YAML (catalog_name + table list + thresholds)
  3. Construct full catalog: {catalog_name}_{environment.lower()}
  4. Run 5 checks via spark.sql() UNION ALL
  5. Compare with previous run → calculate state_change
  6. Write all results to {catalog}.monitoring.dq_results
  7. Exit with status JSON

4. resources/dq_job.yml — Job Definition

Scheduled job: every 15 min, serverless compute, retries on failure, timeout 15 min.

5. resources/alerts.yml — SQL Alerts (STATIC, NEVER CHANGES)

6 SQL alerts that read from monitoring.dq_results. Each alert is a simple SELECT filtered by check_type and status. Never changes regardless of how many tables are monitored.

6. resources/variables.yml — Bundle Variables

All configurable values: config_file, environment, warehouse_id, notification_destination_id, etc.


DQ Checks

# Check Layer Severity Fails when
1 Null Values (Bronze) Bronze WARNING >1% nulls in critical columns
2 Null Values (Silver) Silver WARNING >1% nulls in critical columns
3 Row Count (Bronze) Bronze CRITICAL 0 records in window
4 Row Count (Silver) Silver CRITICAL 0 records in window
5 Corrupt Records Bronze WARNING >5% corrupt records

SQL Alerts

# Alert Fires when Schedule
1 Bronze Null Values Any bronze table > 1% nulls Every 15 min
2 Silver Null Values Any silver table > 1% nulls Every 15 min
3 Bronze Row Count Any bronze table = 0 records Every 30 min
4 Silver Row Count Any silver table = 0 records Every 30 min
5 Corrupt Records Any bronze table > 5% corrupt Every 30 min
6 ✅ Tables Recovered Any table state_change = RESOLVED Every 15 min

State Tracking

The notebook compares current run results with the previous run to determine state transitions:

state_change Meaning Notification?
NEW_FAILURE Was PASSED last run, now FAILED ✅ Failure alert fires
STILL_FAILING Was FAILED last run, still FAILED ✅ Failure alert fires
RESOLVED Was FAILED last run, now PASSED ✅ Resolved alert fires
STILL_OK Was PASSED last run, still PASSED ❌ No notification

What Slack looks like:

:00 — 10 tables fail → Alert: "10 tables broken" (all NEW_FAILURE)
:15 — 3 still fail, 7 recovered → Two alerts:
       🔴 "3 tables broken" (STILL_FAILING)
       ✅ "7 tables recovered" (RESOLVED)
:30 — All pass → One alert:
       ✅ "3 tables recovered" (RESOLVED)
:45 — All healthy → No alerts (STILL_OK, empty result)

Delta Table Schema: {catalog}.monitoring.dq_results

Column Type Description
check_id STRING UUID per result row
check_type STRING null_values_bronze, bronze_row_count, etc.
table_name STRING Which table was checked
status STRING PASSED / FAILED / ERROR
severity STRING CRITICAL / WARNING / OK
state_change STRING NEW_FAILURE / STILL_FAILING / RESOLVED / STILL_OK
metric_value DOUBLE The number that decides pass/fail
threshold_value DOUBLE What it was compared against
metric_name STRING What metric_value represents
details STRING Full JSON with all check-specific data
check_timestamp TIMESTAMP When the check ran
catalog_name STRING Full catalog name (e.g., observability_dev)
environment STRING DEV / NONPROD / PROD
bundle_name STRING observability-alerts
run_id STRING Groups all results from one job run

Partitioned by: check_type Table properties: delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true


Permission Model

Service Principal (job runner): mi-corp-eodl-dev-use2-dev-etl-dabcicd
  READS:  catalog.bronze.*  (SELECT — run DQ checks)
  READS:  catalog.silver.*  (SELECT — run DQ checks)
  WRITES: catalog.monitoring.dq_results (CREATE TABLE + MODIFY)

SQL Alert Owner (same SP or dedicated):
  READS:  catalog.monitoring.dq_results (SELECT only)
  Does NOT need access to bronze/silver/gold

Developer Team: observability-platform-team
  READS:  catalog.monitoring.dq_results (SELECT — view results, debug)
  Does NOT need bronze access — job handles it

One-Time GRANT Setup (per catalog, run by admin)

-- SP reads source tables
GRANT USE SCHEMA ON SCHEMA {catalog}.bronze TO `mi-corp-eodl-dev-use2-dev-etl-dabcicd`;
GRANT SELECT ON SCHEMA {catalog}.bronze TO `mi-corp-eodl-dev-use2-dev-etl-dabcicd`;
GRANT USE SCHEMA ON SCHEMA {catalog}.silver TO `mi-corp-eodl-dev-use2-dev-etl-dabcicd`;
GRANT SELECT ON SCHEMA {catalog}.silver TO `mi-corp-eodl-dev-use2-dev-etl-dabcicd`;

-- SP creates and writes monitoring schema
GRANT CREATE SCHEMA ON CATALOG {catalog} TO `mi-corp-eodl-dev-use2-dev-etl-dabcicd`;

-- Dev team reads monitoring results
GRANT USE SCHEMA ON SCHEMA {catalog}.monitoring TO `observability-platform-team`;
GRANT SELECT ON SCHEMA {catalog}.monitoring TO `observability-platform-team`;

Multi-Catalog Support

Same bundle deploys to any catalog — only the target and config file differ:

Command Config Full Catalog Results Table
--target dev observability.yml observability_dev observability_dev.monitoring.dq_results
--target prod observability.yml observability_prod observability_prod.monitoring.dq_results
--target security-dev security.yml security_dev security_dev.monitoring.dq_results
--target security-prod security.yml security_prod security_prod.monitoring.dq_results

Adding a New Catalog

  1. Create config/{new_catalog}.yml with catalog_name, tables, thresholds
  2. Add targets in databricks.yml (new-catalog-dev, new-catalog-prod)
  3. git push → CI/CD deploys
  4. Done — no code changes

Adding a New Check

Step What File
1 Add threshold config/{catalog}.yml
2 Add check function notebooks/dq_monitor.py
3 Call add_results("new_check", ...) notebooks/dq_monitor.py
4 Add one alert block (~20 lines) resources/alerts.yml
5 Delta table schema? NO CHANGE

Notification Behavior

How many alerts fire?

Alerts fire per check type, NOT per table:

Tables failing Checks affected Slack messages Emails
1 table, 1 check 1 1 1
10 tables, same check 1 1 1
10 tables, 3 checks 3 3 3
100 tables, all 5 checks 5 5 5
Max (all checks + resolved) 6 6 6

What user sees in Slack:

🔔 Alert: [PROD] observability-alerts (observability) - No Data in Bronze

[PROD] 🔴 NO DATA IN BRONZE | Catalog: observability
Bundle: observability-alerts
Check ingestion job logs. Click Alert link to see which tables.
Runbook: https://cvsdigital.atlassian.net/wiki/...

[🔗 View Alert Results] ← clicks → sees table_name column with failed tables

Reliability

Feature Protection
safe_check() wrapper One failed check doesn't kill the run
max_retries: 2 Transient failures auto-recover
timeout_seconds: 900 Hung queries get killed and retried
max_concurrent_runs: 1 No duplicate runs
on_failure email Team knows if the job itself crashes
Delta write before alert eval Data never lost even if alert service fails
5-min offset Alerts always have fresh data to read
empty_result_state: OK No false positives when everything is healthy
Decoupled design Job failure ≠ notification failure

Scalability

Scale Tables Runtime Status
Current 8 bronze + 5 silver ~30s
Medium 50 bronze + 30 silver ~3 min
Large 100+ tables ~5 min
Massive 500+ tables ~15 min ⚠️ Consider chunking

Key Design Principles

Principle How
Config-driven Only config YAML changes when tables are added/removed
Reusable Same bundle deploys to any catalog
Decoupled Job writes data, alerts handle notifications — independent failure domains
Static alerts alerts.yml NEVER changes regardless of table count
Generic schema Delta table handles any check type without ALTER
No external infra Zero Azure Functions, Logic Apps, or custom APIs
Native Databricks Uses only platform components (Jobs, Alerts, Delta, UC)

Databricks Components Used

Component Purpose
Declarative Automation Bundles Deploy job + alerts as code via CI/CD
Lakeflow Jobs Scheduled notebook execution (serverless)
Serverless Compute Run notebook without managing clusters
SQL Serverless Warehouse Execute SQL alert queries
SQL Alerts Read Delta table → trigger notifications
Notification Destinations Route alerts to Slack + Email (Databricks-managed)
Delta Table Store check results (bridge: job → alerts)
Unity Catalog Table governance, multi-catalog support

Deploy Commands

# Validate
databricks bundle validate --target dev

# Deploy (creates job + 6 alerts + monitoring schema/table)
databricks bundle deploy --target dev

# Test manually (one-time run)
databricks bundle run dq_monitoring_job --target dev

# Deploy to another catalog
databricks bundle deploy --target security-dev

One-Time Setup (per workspace)

# 1. Create Slack notification destination in Databricks UI
#    Settings → Notification Destinations → Add → Slack webhook
#    Note the destination ID → set in variables.yml

# 2. (Optional) Create Email notification destination
#    Settings → Notification Destinations → Add → Email

# 3. Run GRANT statements (see Permission Model section)

Timing Diagram

:00  :05  :15  :20  :30  :35  :45  :50  :60
 │    │    │    │    │    │    │    │    │
 ▼    ▼    ▼    ▼    ▼    ▼    ▼    ▼    ▼
 JOB  ALT  JOB  ALT  JOB  ALT  JOB  ALT  JOB
 run  eval run  eval run  eval run  eval run

JOB: Runs checks, writes to Delta (serverless compute)
ALT: SQL alerts evaluate, trigger Slack/Email if FAILED rows exist (SQL warehouse)

Roadmap

Phase Features
Phase 1 (Now) 5 DQ checks, state tracking, multi-catalog, 6 SQL alerts
Phase 2 Add gold layer checks, freshness lag, latency SLA, record ratio
Phase 3 Dashboard on DQ trends (point BI at dq_results)
Phase 4 Custom thresholds per table, anomaly detection
Phase 5 Self-service: teams add tables via PR to config YAML

About

multiple repos will be added in this repo

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors