diff --git a/packages/data-analytics-demo/.gitignore b/packages/data-analytics-demo/.gitignore index a474d46..b83e29c 100644 --- a/packages/data-analytics-demo/.gitignore +++ b/packages/data-analytics-demo/.gitignore @@ -31,3 +31,4 @@ venv/ dbt_project/target/ dbt_project/dbt_packages/ dbt_project/logs/ +dbt_project/.user.yml diff --git a/packages/data-analytics-demo/Makefile b/packages/data-analytics-demo/Makefile index ccc4da5..8388694 100644 --- a/packages/data-analytics-demo/Makefile +++ b/packages/data-analytics-demo/Makefile @@ -25,8 +25,8 @@ data: $(PYTHON) -m data_analytics_demo.data.generate dbt: - @echo "[dbt] TODO T-04/T-05: dbt models not yet implemented" - @exit 1 + cd dbt_project && DBT_PROFILES_DIR=. dbt run + cd dbt_project && DBT_PROFILES_DIR=. dbt test ml: @echo "[ml] TODO T-06/T-07: ML pipelines not yet implemented" diff --git a/packages/data-analytics-demo/dbt_project/dbt_project.yml b/packages/data-analytics-demo/dbt_project/dbt_project.yml new file mode 100644 index 0000000..19b79f5 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/dbt_project.yml @@ -0,0 +1,25 @@ +name: data_analytics_demo +version: 0.1.0 +config-version: 2 + +profile: data_analytics_demo + +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +clean-targets: + - target + - dbt_packages + +models: + data_analytics_demo: + staging: + +materialized: view + intermediate: + +materialized: view + marts: + +materialized: table diff --git a/packages/data-analytics-demo/dbt_project/models/intermediate/int_customer_features.sql b/packages/data-analytics-demo/dbt_project/models/intermediate/int_customer_features.sql new file mode 100644 index 0000000..deaf49f --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/intermediate/int_customer_features.sql @@ -0,0 +1,58 @@ +{{ config(materialized='view') }} + +-- Per-customer base features built once and reused by all mart models. +-- Joins customers + their lifetime event activity + total monetary value. + +with customer_event_stats as ( + select + customer_id, + count(*) as event_count_total, + count(distinct event_type) as distinct_event_types, + min(event_at) as first_event_at, + max(event_at) as last_event_at, + date_diff('day', min(event_at), max(event_at)) + 1 as active_days + from {{ ref('stg_events') }} + group by customer_id +), + +customer_invoice_stats as ( + select + customer_id, + sum(case when status = 'paid' then amount_usd else 0 end) as lifetime_paid_usd, + sum(case when status = 'failed' then 1 else 0 end) as failed_invoice_count, + count(*) as invoice_count + from {{ ref('stg_invoices') }} + group by customer_id +), + +latest_subscription as ( + select + customer_id, + plan_tier as current_plan_tier, + status as current_status, + row_number() over (partition by customer_id order by start_date desc) as rn + from {{ ref('stg_subscriptions') }} + qualify rn = 1 +) + +select + c.customer_id, + c.email, + c.company, + c.signup_date, + c.region, + c.plan_tier_at_signup, + coalesce(ls.current_plan_tier, c.plan_tier_at_signup) as current_plan_tier, + coalesce(ls.current_status, 'unknown') as current_status, + coalesce(ces.event_count_total, 0) as event_count_total, + coalesce(ces.distinct_event_types, 0) as distinct_event_types, + ces.first_event_at, + ces.last_event_at, + coalesce(ces.active_days, 0) as active_days, + coalesce(cis.lifetime_paid_usd, 0) as lifetime_paid_usd, + coalesce(cis.failed_invoice_count, 0) as failed_invoice_count, + coalesce(cis.invoice_count, 0) as invoice_count +from {{ ref('stg_customers') }} c +left join customer_event_stats ces on c.customer_id = ces.customer_id +left join customer_invoice_stats cis on c.customer_id = cis.customer_id +left join latest_subscription ls on c.customer_id = ls.customer_id diff --git a/packages/data-analytics-demo/dbt_project/models/intermediate/int_event_aggregates.sql b/packages/data-analytics-demo/dbt_project/models/intermediate/int_event_aggregates.sql new file mode 100644 index 0000000..6d0fe82 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/intermediate/int_event_aggregates.sql @@ -0,0 +1,14 @@ +{{ config(materialized='view') }} + +-- Per-customer × event_type counts. Powers both the upsell mart +-- (premium / advanced feature usage) and the churn mart (support +-- ticket volume, recent activity). + +select + customer_id, + event_type, + count(*) as event_count, + min(event_at) as first_event_at, + max(event_at) as last_event_at +from {{ ref('stg_events') }} +group by customer_id, event_type diff --git a/packages/data-analytics-demo/dbt_project/models/marts/churn_features.sql b/packages/data-analytics-demo/dbt_project/models/marts/churn_features.sql new file mode 100644 index 0000000..ab32ffe --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/marts/churn_features.sql @@ -0,0 +1,55 @@ +{{ config(materialized='table') }} + +-- Churn-prediction feature table. One row per customer. +-- Label: is_churned = 1 when the customer's latest subscription is canceled. +-- Feature engineering deliberately mirrors the synthetic-data churn signal +-- (trailing-30d event drop-off vs lifetime daily average). + +with reference_point as ( + select max(event_at)::date as as_of_date from {{ ref('stg_events') }} +), + +trailing_30d as ( + select + e.customer_id, + count(*) as events_last_30d + from {{ ref('stg_events') }} e + cross join reference_point r + where e.event_at >= r.as_of_date - interval 30 day + group by e.customer_id +), + +support_volume as ( + select + customer_id, + sum(event_count) as support_ticket_count + from {{ ref('int_event_aggregates') }} + where event_type = 'support_ticket' + group by customer_id +) + +select + f.customer_id, + f.plan_tier_at_signup, + f.current_plan_tier, + f.region, + f.event_count_total, + f.distinct_event_types, + f.lifetime_paid_usd, + f.failed_invoice_count, + f.invoice_count, + coalesce(t.events_last_30d, 0) as events_last_30d, + -- Daily lifetime average; guards against divide-by-zero with NULLIF. + f.event_count_total::double / NULLIF(f.active_days, 0) as lifetime_daily_avg_events, + -- Trailing-30d rate vs lifetime daily avg. < 1.0 means slowing down. + case + when f.active_days > 0 and f.event_count_total > 0 + then (coalesce(t.events_last_30d, 0) / 30.0) + / (f.event_count_total::double / f.active_days) + else null + end as recent_to_lifetime_ratio, + coalesce(s.support_ticket_count, 0) as support_ticket_count, + case when f.current_status = 'canceled' then 1 else 0 end as is_churned +from {{ ref('int_customer_features') }} f +left join trailing_30d t on f.customer_id = t.customer_id +left join support_volume s on f.customer_id = s.customer_id diff --git a/packages/data-analytics-demo/dbt_project/models/marts/cohort_retention.sql b/packages/data-analytics-demo/dbt_project/models/marts/cohort_retention.sql new file mode 100644 index 0000000..b3b2526 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/marts/cohort_retention.sql @@ -0,0 +1,47 @@ +{{ config(materialized='table') }} + +-- Monthly signup cohort × months-since-signup retention grid. +-- "Active" at offset M = the customer emitted at least one event in the +-- month starting M months after signup. Cohort size is the count of +-- customers in the signup month. + +with customer_signup as ( + select + customer_id, + date_trunc('month', signup_date) as cohort_month + from {{ ref('stg_customers') }} +), + +monthly_activity as ( + select distinct + customer_id, + date_trunc('month', event_at) as active_month + from {{ ref('stg_events') }} +), + +cohort_offsets as ( + select + c.cohort_month, + c.customer_id, + date_diff('month', c.cohort_month, m.active_month) as months_since_signup + from customer_signup c + join monthly_activity m on c.customer_id = m.customer_id + where m.active_month >= c.cohort_month +), + +cohort_sizes as ( + select cohort_month, count(distinct customer_id) as cohort_size + from customer_signup + group by cohort_month +) + +select + o.cohort_month, + cs.cohort_size, + o.months_since_signup, + count(distinct o.customer_id) as active_customers, + round(count(distinct o.customer_id) * 100.0 / cs.cohort_size, 2) as retention_pct +from cohort_offsets o +join cohort_sizes cs on o.cohort_month = cs.cohort_month +group by o.cohort_month, cs.cohort_size, o.months_since_signup +order by o.cohort_month, o.months_since_signup diff --git a/packages/data-analytics-demo/dbt_project/models/marts/rfm_segments.sql b/packages/data-analytics-demo/dbt_project/models/marts/rfm_segments.sql new file mode 100644 index 0000000..743f37d --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/marts/rfm_segments.sql @@ -0,0 +1,56 @@ +{{ config(materialized='table') }} + +-- RFM segmentation built from event recency, event frequency, and lifetime +-- paid amount. Quintile scores in {1, 2, 3, 4, 5} on each axis; the +-- composite label maps the (R, F, M) triple to a coarse 4-bucket segment. +-- +-- Reference window: max(event_at) across all customers (so the data set +-- self-anchors and the mart is reproducible for any synthetic seed). + +with reference_point as ( + select max(event_at)::date as as_of_date from {{ ref('stg_events') }} +), + +rfm_raw as ( + select + f.customer_id, + date_diff( + 'day', + cast(f.last_event_at as date), + (select as_of_date from reference_point) + ) as recency_days, + f.event_count_total as frequency_events, + f.lifetime_paid_usd as monetary_usd + from {{ ref('int_customer_features') }} f + where f.last_event_at is not null +), + +rfm_scored as ( + select + customer_id, + recency_days, + frequency_events, + monetary_usd, + -- Recency: lower is better, so reverse the quintile. + 6 - ntile(5) over (order by recency_days) as r_score, + ntile(5) over (order by frequency_events) as f_score, + ntile(5) over (order by monetary_usd) as m_score + from rfm_raw +) + +select + customer_id, + recency_days, + frequency_events, + monetary_usd, + r_score, + f_score, + m_score, + case + when r_score >= 4 and f_score >= 4 and m_score >= 4 then 'champions' + when r_score >= 4 and f_score >= 3 then 'loyal' + when r_score >= 3 and m_score >= 4 then 'big_spenders' + when r_score <= 2 and f_score <= 2 then 'at_risk' + else 'regular' + end as rfm_segment +from rfm_scored diff --git a/packages/data-analytics-demo/dbt_project/models/marts/schema.yml b/packages/data-analytics-demo/dbt_project/models/marts/schema.yml new file mode 100644 index 0000000..2337f56 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/marts/schema.yml @@ -0,0 +1,81 @@ +version: 2 + +models: + - name: rfm_segments + description: RFM (Recency / Frequency / Monetary) segmentation, one row per active customer. + columns: + - name: customer_id + description: Primary key; foreign key to stg_customers. + tests: + - not_null + - unique + - name: r_score + description: Recency score in {1..5}; 5 = most recent. + tests: + - not_null + - name: f_score + description: Frequency score in {1..5}. + tests: + - not_null + - name: m_score + description: Monetary score in {1..5}. + tests: + - not_null + - name: rfm_segment + description: Coarse 5-bucket label derived from the scores. + tests: + - not_null + - accepted_values: + arguments: + values: + ["champions", "loyal", "big_spenders", "at_risk", "regular"] + + - name: churn_features + description: One-row-per-customer feature table for churn prediction. + columns: + - name: customer_id + tests: + - not_null + - unique + - name: is_churned + description: Binary label (1 if latest subscription canceled). + tests: + - not_null + - accepted_values: + arguments: + values: [0, 1] + + - name: upsell_opportunities + description: One-row-per-eligible-customer feature table for upsell propensity. + columns: + - name: customer_id + tests: + - not_null + - unique + - name: upgraded + description: Binary label (1 if customer reached a tier higher than initial plan). + tests: + - not_null + - accepted_values: + arguments: + values: [0, 1] + - name: plan_tier_at_signup + tests: + - not_null + - accepted_values: + arguments: + values: ["free", "pro"] + + - name: cohort_retention + description: Monthly cohort × months-since-signup retention grid. + columns: + - name: cohort_month + tests: + - not_null + - name: months_since_signup + tests: + - not_null + - name: retention_pct + description: 0–100 retention percentage for this (cohort, offset) cell. + tests: + - not_null diff --git a/packages/data-analytics-demo/dbt_project/models/marts/upsell_opportunities.sql b/packages/data-analytics-demo/dbt_project/models/marts/upsell_opportunities.sql new file mode 100644 index 0000000..e296db1 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/marts/upsell_opportunities.sql @@ -0,0 +1,56 @@ +{{ config(materialized='table') }} + +-- Upsell propensity feature table. One row per customer currently on a +-- non-enterprise plan (free or pro). Label: upgraded = 1 when the customer +-- has any subscription on a tier strictly higher than their initial plan. +-- Premium / advanced feature events are the engineered upsell signal. + +with plan_rank as ( + -- Numeric rank so we can compare tiers (`enterprise` > `pro` > `free`). + select 'free' as plan, 1 as rank union all + select 'pro', 2 union all + select 'enterprise', 3 +), + +initial_tier as ( + select + f.customer_id, + pr.rank as initial_rank + from {{ ref('int_customer_features') }} f + join plan_rank pr on f.plan_tier_at_signup = pr.plan +), + +max_tier as ( + select + s.customer_id, + max(pr.rank) as max_rank + from {{ ref('stg_subscriptions') }} s + join plan_rank pr on s.plan_tier = pr.plan + group by s.customer_id +), + +premium_signal as ( + select + customer_id, + sum(case when event_type = 'feature_use_premium' then event_count else 0 end) as premium_event_count, + sum(case when event_type = 'feature_use_advanced' then event_count else 0 end) as advanced_event_count + from {{ ref('int_event_aggregates') }} + group by customer_id +) + +select + f.customer_id, + f.plan_tier_at_signup, + f.current_plan_tier, + f.region, + f.event_count_total, + coalesce(p.premium_event_count, 0) as premium_event_count, + coalesce(p.advanced_event_count, 0) as advanced_event_count, + f.active_days, + f.lifetime_paid_usd, + case when mt.max_rank > it.initial_rank then 1 else 0 end as upgraded +from {{ ref('int_customer_features') }} f +join initial_tier it on f.customer_id = it.customer_id +left join max_tier mt on f.customer_id = mt.customer_id +left join premium_signal p on f.customer_id = p.customer_id +where f.plan_tier_at_signup in ('free', 'pro') diff --git a/packages/data-analytics-demo/dbt_project/models/staging/_sources.yml b/packages/data-analytics-demo/dbt_project/models/staging/_sources.yml new file mode 100644 index 0000000..973dd92 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/staging/_sources.yml @@ -0,0 +1,15 @@ +version: 2 + +sources: + - name: raw + description: Synthetic SaaS tables produced by `make data` (src/data_analytics_demo/data/generate.py). + schema: main + tables: + - name: customers + description: Tenant accounts. + - name: subscriptions + description: Subscription history per customer (may include multiple rows per customer). + - name: events + description: Product-usage events. Drives churn-signal and upsell-signal feature engineering. + - name: invoices + description: Monthly billing records for paid subscriptions. diff --git a/packages/data-analytics-demo/dbt_project/models/staging/stg_customers.sql b/packages/data-analytics-demo/dbt_project/models/staging/stg_customers.sql new file mode 100644 index 0000000..a93e5a7 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/staging/stg_customers.sql @@ -0,0 +1,10 @@ +{{ config(materialized='view') }} + +select + customer_id, + email, + company, + cast(signup_date as date) as signup_date, + region, + plan_tier_at_signup +from {{ source('raw', 'customers') }} diff --git a/packages/data-analytics-demo/dbt_project/models/staging/stg_events.sql b/packages/data-analytics-demo/dbt_project/models/staging/stg_events.sql new file mode 100644 index 0000000..18c7877 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/staging/stg_events.sql @@ -0,0 +1,8 @@ +{{ config(materialized='view') }} + +select + event_id, + customer_id, + cast(timestamp as timestamp) as event_at, + event_type +from {{ source('raw', 'events') }} diff --git a/packages/data-analytics-demo/dbt_project/models/staging/stg_invoices.sql b/packages/data-analytics-demo/dbt_project/models/staging/stg_invoices.sql new file mode 100644 index 0000000..162094d --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/staging/stg_invoices.sql @@ -0,0 +1,11 @@ +{{ config(materialized='view') }} + +select + invoice_id, + customer_id, + subscription_id, + cast(period_start as date) as period_start, + cast(period_end as date) as period_end, + amount_usd, + status +from {{ source('raw', 'invoices') }} diff --git a/packages/data-analytics-demo/dbt_project/models/staging/stg_subscriptions.sql b/packages/data-analytics-demo/dbt_project/models/staging/stg_subscriptions.sql new file mode 100644 index 0000000..8fa9da1 --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/models/staging/stg_subscriptions.sql @@ -0,0 +1,11 @@ +{{ config(materialized='view') }} + +select + subscription_id, + customer_id, + plan_tier, + cast(start_date as date) as start_date, + cast(end_date as date) as end_date, + status, + monthly_amount_usd +from {{ source('raw', 'subscriptions') }} diff --git a/packages/data-analytics-demo/dbt_project/profiles.yml b/packages/data-analytics-demo/dbt_project/profiles.yml new file mode 100644 index 0000000..17bcd0e --- /dev/null +++ b/packages/data-analytics-demo/dbt_project/profiles.yml @@ -0,0 +1,11 @@ +data_analytics_demo: + target: dev + outputs: + dev: + type: duckdb + # Resolved relative to the directory dbt is invoked from. The Makefile + # invokes dbt from `dbt_project/`, so `../warehouse/analytics.duckdb` + # points at the file produced by `make data` (src/data_analytics_demo/ + # data/generate.py). + path: "{{ env_var('DBT_DUCKDB_PATH', '../warehouse/analytics.duckdb') }}" + threads: 4