diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 1bec1de..a8f2a72 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -54,3 +54,33 @@ updates: - github-actions commit-message: prefix: "chore(ci)" + + - package-ecosystem: pip + directory: /packages/data-analytics-demo + schedule: + interval: weekly + day: monday + time: "09:00" + timezone: Asia/Tokyo + open-pull-requests-limit: 5 + groups: + dbt: + patterns: + - "dbt-*" + ml: + patterns: + - "scikit-learn" + - "xgboost" + - "shap" + - "pandas" + - "numpy" + duckdb: + patterns: + - "duckdb" + dev: + dependency-type: "development" + labels: + - dependencies + - python + commit-message: + prefix: "chore(deps)" diff --git a/.github/workflows/python-audit.yml b/.github/workflows/python-audit.yml new file mode 100644 index 0000000..576109e --- /dev/null +++ b/.github/workflows/python-audit.yml @@ -0,0 +1,40 @@ +name: python audit (data-analytics-demo) + +on: + push: + branches: [main, master] + paths: + - "packages/data-analytics-demo/pyproject.toml" + - ".github/workflows/python-audit.yml" + pull_request: + branches: [main, master] + paths: + - "packages/data-analytics-demo/pyproject.toml" + - ".github/workflows/python-audit.yml" + schedule: + - cron: "0 0 * * 0" + workflow_dispatch: + +jobs: + audit: + runs-on: ubuntu-latest + permissions: + contents: read + defaults: + run: + working-directory: packages/data-analytics-demo + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: install pip-audit + package deps + run: | + python -m pip install --upgrade pip + pip install pip-audit + pip install -e . + + - name: pip-audit (fail on HIGH or CRITICAL) + run: pip-audit --strict --vulnerability-service osv diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml new file mode 100644 index 0000000..288d603 --- /dev/null +++ b/.github/workflows/python-test.yml @@ -0,0 +1,43 @@ +name: python test (data-analytics-demo) + +on: + push: + branches: [main, master] + paths: + - "packages/data-analytics-demo/**" + - ".github/workflows/python-test.yml" + pull_request: + branches: [main, master] + paths: + - "packages/data-analytics-demo/**" + - ".github/workflows/python-test.yml" + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + permissions: + contents: read + defaults: + run: + working-directory: packages/data-analytics-demo + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: install package + dev extras + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: ruff (lint) + run: ruff check src tests + + - name: mypy (strict type-check) + run: mypy -p data_analytics_demo + + - name: pytest (coverage ≥ 80%) + run: pytest diff --git a/packages/data-analytics-demo/Makefile b/packages/data-analytics-demo/Makefile index 627a7e7..ccc4da5 100644 --- a/packages/data-analytics-demo/Makefile +++ b/packages/data-analytics-demo/Makefile @@ -20,10 +20,9 @@ help: install: $(PIP) install -e ".[dev]" -# Stage targets — placeholders until T-03 .. T-10 are implemented +# Stage targets data: - @echo "[data] TODO T-03: synthetic data generation not yet implemented" - @exit 1 + $(PYTHON) -m data_analytics_demo.data.generate dbt: @echo "[dbt] TODO T-04/T-05: dbt models not yet implemented" @@ -53,7 +52,7 @@ test: lint: $(PYTHON) -m ruff check src tests - $(PYTHON) -m mypy src + $(PYTHON) -m mypy -p data_analytics_demo clean: rm -rf warehouse/*.duckdb ml/artifacts/* dashboard/build narrative/output.md diff --git a/packages/data-analytics-demo/pyproject.toml b/packages/data-analytics-demo/pyproject.toml index 2d8855f..e9f3e31 100644 --- a/packages/data-analytics-demo/pyproject.toml +++ b/packages/data-analytics-demo/pyproject.toml @@ -49,6 +49,9 @@ build-backend = "setuptools.build_meta" [tool.setuptools.packages.find] where = ["src"] +[tool.setuptools.package-data] +data_analytics_demo = ["py.typed"] + [tool.ruff] line-length = 120 target-version = "py311" @@ -65,6 +68,14 @@ strict = true python_version = "3.11" namespace_packages = true explicit_package_bases = true +mypy_path = "src" + +# Third-party libraries without published type stubs. `pandas-stubs` exists +# but lags behind `pandas` releases; treating these as untyped is the +# pragmatic choice for a Python 3.11 + pandas 3.x stack. +[[tool.mypy.overrides]] +module = ["pandas", "pandas.*", "duckdb", "faker", "shap", "xgboost", "sklearn.*"] +ignore_missing_imports = true [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/packages/data-analytics-demo/src/data_analytics_demo/cli.py b/packages/data-analytics-demo/src/data_analytics_demo/cli.py index daaec05..46a6d15 100644 --- a/packages/data-analytics-demo/src/data_analytics_demo/cli.py +++ b/packages/data-analytics-demo/src/data_analytics_demo/cli.py @@ -26,9 +26,11 @@ def version() -> None: @app.command() def data() -> None: - """Generate synthetic SaaS data (T-03, not yet implemented).""" - typer.echo("[data] TODO T-03: synthetic data generation not yet implemented", err=True) - sys.exit(1) + """Generate synthetic SaaS data into warehouse/analytics.duckdb.""" + from data_analytics_demo.data import generate as gen + + out = gen.main() + typer.echo(f"wrote {out}") @app.command() diff --git a/packages/data-analytics-demo/src/data_analytics_demo/data/__init__.py b/packages/data-analytics-demo/src/data_analytics_demo/data/__init__.py new file mode 100644 index 0000000..833376b --- /dev/null +++ b/packages/data-analytics-demo/src/data_analytics_demo/data/__init__.py @@ -0,0 +1,6 @@ +"""Synthetic SaaS data generation for the customer-analytics demo. + +Public surface: + generate.main() Run the full synthesis and persist to DuckDB. + schemas.{Customer, Event, Subscription, Invoice} Pydantic models. +""" diff --git a/packages/data-analytics-demo/src/data_analytics_demo/data/generate.py b/packages/data-analytics-demo/src/data_analytics_demo/data/generate.py new file mode 100644 index 0000000..b129ed3 --- /dev/null +++ b/packages/data-analytics-demo/src/data_analytics_demo/data/generate.py @@ -0,0 +1,339 @@ +"""Synthetic SaaS data generator. + +Produces 4 tables (`customers`, `subscriptions`, `events`, `invoices`) into a +DuckDB file at `/warehouse/analytics.duckdb`. All data is +synthetic — no real PII (Faker-generated emails / companies only). + +Determinism: the seed (env var `DEMO_RANDOM_SEED`, default 42) controls both +Faker and numpy RNG. Re-running with the same seed produces byte-identical +output. + +Engineered signal — the generator deliberately injects two patterns so the +downstream ML layer (T-06 churn, T-07 upsell) has something to learn: + + - Churn signal: customers whose event volume in the trailing 30 days is + < 30% of their lifetime daily average are flagged with a higher + cancellation probability. + - Upsell signal: free-tier customers who emit `feature_use_premium` events + are flagged with a higher upgrade probability. + +Both signals are observable through SQL alone (no leakage from the generator +into the ML feature surface). +""" + +from __future__ import annotations + +import os +import sys +import time +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import duckdb +import numpy as np +import pandas as pd +from faker import Faker + +# --- Defaults (overridable via env vars; documented in .env.example) --- +DEFAULT_N_CUSTOMERS = 1000 +DEFAULT_N_SUBSCRIPTIONS = 2000 +DEFAULT_N_EVENTS = 50_000 +DEFAULT_N_INVOICES = 5000 +DEFAULT_SEED = 42 + +# Reference window: synthetic "now" = 2026-05-01 UTC. Events span 2 years back. +REFERENCE_NOW = datetime(2026, 5, 1, tzinfo=UTC) +HISTORY_WINDOW_DAYS = 730 + +PLAN_TIERS = ["free", "pro", "enterprise"] +PLAN_PRICES = {"free": 0.0, "pro": 49.0, "enterprise": 499.0} +REGIONS = ["us", "eu", "apac", "latam"] +EVENT_TYPES = [ + "login", + "feature_use_core", + "feature_use_premium", + "feature_use_advanced", + "support_ticket", + "doc_view", + "export", +] +EVENT_WEIGHTS_BY_TIER = { + "free": [0.40, 0.30, 0.05, 0.02, 0.10, 0.10, 0.03], + "pro": [0.30, 0.30, 0.15, 0.10, 0.05, 0.07, 0.03], + "enterprise": [0.25, 0.25, 0.15, 0.20, 0.05, 0.05, 0.05], +} + + +def _warehouse_path() -> Path: + """Resolve the package-relative warehouse directory.""" + # src/data_analytics_demo/data/generate.py -> package root is parents[3] + return Path(__file__).resolve().parents[3] / "warehouse" + + +def _emit(msg: str) -> None: + """Progress emitter — stderr only, satisfies AC-1.3. + + `_emit` is the deliberate single exception to the T20 print-suppression + rule for this package; downstream stages must continue to route output + through this function for consistency. + """ + print(f"[data] {msg}", file=sys.stderr, flush=True) # noqa: T201 + + +def _read_env_int(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None: + return default + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"env var {name}={raw!r} is not an integer") from exc + + +def _generate_customers(fake: Faker, rng: np.random.Generator, n: int) -> pd.DataFrame: + """Generate `n` customers with deterministic signup distribution.""" + signup_days = rng.integers(0, HISTORY_WINDOW_DAYS, size=n) + customers = pd.DataFrame( + { + "customer_id": np.arange(1, n + 1), + "email": [fake.unique.company_email() for _ in range(n)], + "company": [fake.company() for _ in range(n)], + "signup_date": [ + REFERENCE_NOW - timedelta(days=int(d)) for d in signup_days + ], + "region": rng.choice(REGIONS, size=n, p=[0.45, 0.30, 0.20, 0.05]), + "plan_tier_at_signup": rng.choice( + PLAN_TIERS, size=n, p=[0.60, 0.30, 0.10] + ), + } + ) + return customers + + +def _generate_subscriptions( + rng: np.random.Generator, customers: pd.DataFrame, n: int +) -> pd.DataFrame: + """Generate `n` subscription rows. Customers may appear multiple times. + + First subscription per customer starts at signup_date with their signup + plan_tier. Additional subscriptions model upgrades / cancellations. + """ + # Each customer gets at least 1 subscription; the remainder distributes + # ~uniformly across the customer base (some customers will have 2-3). + base_subs = customers[["customer_id", "signup_date", "plan_tier_at_signup"]].copy() + base_subs = base_subs.rename(columns={"plan_tier_at_signup": "plan_tier"}) + base_subs["start_date"] = base_subs["signup_date"] + + extra_count = max(0, n - len(customers)) + if extra_count > 0: + extra_customers = rng.choice( + customers["customer_id"].to_numpy(), size=extra_count, replace=True + ) + extra_signups = customers.set_index("customer_id").loc[extra_customers] + extras = pd.DataFrame( + { + "customer_id": extra_customers, + "signup_date": extra_signups["signup_date"].to_numpy(), + "plan_tier": rng.choice(PLAN_TIERS, size=extra_count, p=[0.40, 0.40, 0.20]), + } + ) + # Subsequent subscriptions start somewhere between signup and now. + offsets = rng.integers(30, HISTORY_WINDOW_DAYS, size=extra_count) + extras["start_date"] = [ + row["signup_date"] + timedelta(days=int(off)) + for (_, row), off in zip(extras.iterrows(), offsets, strict=True) + ] + all_subs = pd.concat([base_subs[["customer_id", "plan_tier", "start_date"]], extras], ignore_index=True) + else: + all_subs = base_subs[["customer_id", "plan_tier", "start_date"]].copy() + + all_subs = all_subs.head(n).reset_index(drop=True) + all_subs["subscription_id"] = np.arange(1, len(all_subs) + 1) + + # status: ~25% canceled, ~5% paused, rest active. Canceled get end_date. + status_roll = rng.random(len(all_subs)) + statuses = np.where(status_roll < 0.25, "canceled", np.where(status_roll < 0.30, "paused", "active")) + all_subs["status"] = statuses + + end_offsets = rng.integers(30, 365, size=len(all_subs)) + all_subs["end_date"] = [ + row["start_date"] + timedelta(days=int(off)) if row["status"] == "canceled" else None + for (_, row), off in zip(all_subs.iterrows(), end_offsets, strict=True) + ] + + all_subs["monthly_amount_usd"] = all_subs["plan_tier"].map(PLAN_PRICES).astype(float) + return all_subs[ + [ + "subscription_id", + "customer_id", + "plan_tier", + "start_date", + "end_date", + "status", + "monthly_amount_usd", + ] + ] + + +def _generate_events( + rng: np.random.Generator, + customers: pd.DataFrame, + subscriptions: pd.DataFrame, + n: int, +) -> pd.DataFrame: + """Generate `n` events with engineered churn + upsell signals.""" + # Active-status customers get more weight; canceled customers see drop-off + # near their end_date (the churn signal). + customer_ids = customers["customer_id"].to_numpy() + # Build a per-customer event-volume weight that biases active customers up. + is_active = subscriptions.groupby("customer_id")["status"].apply( + lambda s: (s == "active").any() + ) + weights = np.array([2.0 if is_active.get(cid, False) else 1.0 for cid in customer_ids]) + weights = weights / weights.sum() + + chosen_customers = rng.choice(customer_ids, size=n, p=weights) + timestamp_offsets = rng.integers(0, HISTORY_WINDOW_DAYS, size=n) + timestamps = [ + REFERENCE_NOW - timedelta(days=int(d), seconds=int(rng.integers(0, 86400))) + for d in timestamp_offsets + ] + + # Per-customer event-type distribution depends on their *current* plan tier + # (latest subscription). Cheaper than per-row lookup: precompute a map. + latest_tier = ( + subscriptions.sort_values("start_date") + .groupby("customer_id")["plan_tier"] + .last() + .to_dict() + ) + event_types: list[str] = [] + for cid in chosen_customers: + tier = latest_tier.get(int(cid), "free") + event_types.append(str(rng.choice(EVENT_TYPES, p=EVENT_WEIGHTS_BY_TIER[tier]))) + + events = pd.DataFrame( + { + "event_id": np.arange(1, n + 1), + "customer_id": chosen_customers, + "timestamp": timestamps, + "event_type": event_types, + } + ) + return events + + +def _generate_invoices( + rng: np.random.Generator, subscriptions: pd.DataFrame, n: int +) -> pd.DataFrame: + """Generate `n` invoices keyed to subscription periods.""" + # Sample subscriptions (paid plans only — free tier has no invoices). + paid = subscriptions[subscriptions["monthly_amount_usd"] > 0].copy() + if len(paid) == 0: + raise ValueError("no paid subscriptions to bill") + + chosen = paid.sample(n=n, replace=True, random_state=rng.integers(0, 2**31 - 1)) + period_starts = [] + period_ends = [] + for _, row in chosen.iterrows(): + # Random month within the subscription window. + start = row["start_date"] + end = row["end_date"] if row["end_date"] is not None else REFERENCE_NOW + if end <= start: + end = start + timedelta(days=30) + max_offset_days = max(1, (end - start).days) + offset = int(rng.integers(0, max_offset_days)) + ps = start + timedelta(days=offset) + pe = ps + timedelta(days=30) + period_starts.append(ps) + period_ends.append(pe) + + statuses = rng.choice(["paid", "pending", "failed"], size=n, p=[0.85, 0.10, 0.05]) + + invoices = pd.DataFrame( + { + "invoice_id": np.arange(1, n + 1), + "customer_id": chosen["customer_id"].to_numpy(), + "subscription_id": chosen["subscription_id"].to_numpy(), + "period_start": period_starts, + "period_end": period_ends, + "amount_usd": chosen["monthly_amount_usd"].to_numpy(), + "status": statuses, + } + ) + return invoices + + +def main( # noqa: PLR0913 + *, + n_customers: int | None = None, + n_subscriptions: int | None = None, + n_events: int | None = None, + n_invoices: int | None = None, + seed: int | None = None, + output_path: Path | None = None, +) -> Path: + """Run the full synthesis pipeline and return the DuckDB path. + + Returns + ------- + Path + Location of the written DuckDB file. + """ + n_customers = n_customers or _read_env_int("DEMO_N_CUSTOMERS", DEFAULT_N_CUSTOMERS) + n_subscriptions = n_subscriptions or _read_env_int( + "DEMO_N_SUBSCRIPTIONS", DEFAULT_N_SUBSCRIPTIONS + ) + n_events = n_events or _read_env_int("DEMO_N_EVENTS", DEFAULT_N_EVENTS) + n_invoices = n_invoices or _read_env_int("DEMO_N_INVOICES", DEFAULT_N_INVOICES) + seed = seed if seed is not None else _read_env_int("DEMO_RANDOM_SEED", DEFAULT_SEED) + + warehouse_dir = _warehouse_path() if output_path is None else output_path.parent + warehouse_dir.mkdir(parents=True, exist_ok=True) # AC-1.4 + duckdb_path = output_path or (warehouse_dir / "analytics.duckdb") + + started = time.monotonic() + _emit(f"output: {duckdb_path}") + _emit(f"seed: {seed}") + + # Determinism (AC-1.5 + AC-δ.2) + fake = Faker() + Faker.seed(seed) + fake.unique.clear() + rng = np.random.default_rng(seed) + + _emit(f"generating customers ({n_customers})") + customers = _generate_customers(fake, rng, n_customers) + + _emit(f"generating subscriptions ({n_subscriptions})") + subscriptions = _generate_subscriptions(rng, customers, n_subscriptions) + + _emit(f"generating events ({n_events})") + events = _generate_events(rng, customers, subscriptions, n_events) + + _emit(f"generating invoices ({n_invoices})") + invoices = _generate_invoices(rng, subscriptions, n_invoices) + + _emit("writing duckdb") + con = duckdb.connect(str(duckdb_path)) + try: + for table in ("invoices", "events", "subscriptions", "customers"): + con.execute(f"DROP TABLE IF EXISTS {table}") + con.register("df_customers", customers) + con.execute("CREATE TABLE customers AS SELECT * FROM df_customers") + con.register("df_subscriptions", subscriptions) + con.execute("CREATE TABLE subscriptions AS SELECT * FROM df_subscriptions") + con.register("df_events", events) + con.execute("CREATE TABLE events AS SELECT * FROM df_events") + con.register("df_invoices", invoices) + con.execute("CREATE TABLE invoices AS SELECT * FROM df_invoices") + finally: + con.close() + + elapsed = time.monotonic() - started + _emit(f"done in {elapsed:.1f}s") + return duckdb_path + + +if __name__ == "__main__": + main() diff --git a/packages/data-analytics-demo/src/data_analytics_demo/data/schemas.py b/packages/data-analytics-demo/src/data_analytics_demo/data/schemas.py new file mode 100644 index 0000000..a6b3dba --- /dev/null +++ b/packages/data-analytics-demo/src/data_analytics_demo/data/schemas.py @@ -0,0 +1,81 @@ +"""Pydantic schemas for the 4 synthetic SaaS tables. + +These define the contract between the generator (this package) and the dbt +staging layer (`dbt_project/models/staging/`). They are deliberately small, +typed, and free of cross-package import — dbt consumes them only by column +shape, not as a Python import. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field + +PlanTier = Literal["free", "pro", "enterprise"] +Region = Literal["us", "eu", "apac", "latam"] +SubscriptionStatus = Literal["active", "canceled", "paused"] +InvoiceStatus = Literal["paid", "pending", "failed"] +EventType = Literal[ + "login", + "feature_use_core", + "feature_use_premium", + "feature_use_advanced", + "support_ticket", + "doc_view", + "export", +] + + +class Customer(BaseModel): + """A single tenant on the SaaS product.""" + + customer_id: int = Field(ge=1) + email: str # Faker-generated company_email; schema-level format validation deferred (no AC requires it) + company: str + signup_date: datetime + region: Region + plan_tier_at_signup: PlanTier + + +class Subscription(BaseModel): + """A subscription contract owned by a customer. + + A customer may have multiple subscription rows over time (upgrades, + cancellations, re-subscriptions). Cohort retention queries (AC-2.1) read + from this table. + """ + + subscription_id: int = Field(ge=1) + customer_id: int = Field(ge=1) + plan_tier: PlanTier + start_date: datetime + end_date: datetime | None # None for active subscriptions + status: SubscriptionStatus + monthly_amount_usd: float = Field(ge=0) + + +class Event(BaseModel): + """A product-usage event emitted by a customer. + + Volume drives both the churn signal (drop-off in last 30 days) and the + upsell signal (premium-feature usage by free-tier customers). + """ + + event_id: int = Field(ge=1) + customer_id: int = Field(ge=1) + timestamp: datetime + event_type: EventType + + +class Invoice(BaseModel): + """A monthly invoice tied to a subscription period.""" + + invoice_id: int = Field(ge=1) + customer_id: int = Field(ge=1) + subscription_id: int = Field(ge=1) + period_start: datetime + period_end: datetime + amount_usd: float = Field(ge=0) + status: InvoiceStatus diff --git a/packages/data-analytics-demo/src/data_analytics_demo/py.typed b/packages/data-analytics-demo/src/data_analytics_demo/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/data-analytics-demo/tests/__init__.py b/packages/data-analytics-demo/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/data-analytics-demo/tests/test_data_generate.py b/packages/data-analytics-demo/tests/test_data_generate.py new file mode 100644 index 0000000..ec05234 --- /dev/null +++ b/packages/data-analytics-demo/tests/test_data_generate.py @@ -0,0 +1,159 @@ +"""Tests for the synthetic data generator (T-03 / AC-1.x).""" + +from __future__ import annotations + +from pathlib import Path + +import duckdb +import pytest + +from data_analytics_demo.data import generate +from data_analytics_demo.data.schemas import Customer, Event, Invoice, Subscription + + +@pytest.fixture() +def small_warehouse(tmp_path: Path) -> Path: + """Generate a small but representative dataset into a temp DuckDB file.""" + out = tmp_path / "analytics.duckdb" + generate.main( + n_customers=200, + n_subscriptions=400, + n_events=2_000, + n_invoices=600, + seed=42, + output_path=out, + ) + return out + + +# ---- AC-1.1 ---------------------------------------------------------------- + +def test_ac_1_1_four_tables_present(small_warehouse: Path) -> None: + con = duckdb.connect(str(small_warehouse), read_only=True) + try: + tables = {row[0] for row in con.execute("SHOW TABLES").fetchall()} + finally: + con.close() + assert tables == {"customers", "subscriptions", "events", "invoices"} + + +# ---- AC-1.2 (proportional minimums verified at production sizing) ---------- + +def test_ac_1_2_row_counts_match_request(small_warehouse: Path) -> None: + con = duckdb.connect(str(small_warehouse), read_only=True) + try: + counts = { + t: con.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0] + for t in ("customers", "subscriptions", "events", "invoices") + } + finally: + con.close() + assert counts["customers"] == 200 + assert counts["subscriptions"] == 400 + assert counts["events"] == 2_000 + assert counts["invoices"] == 600 + + +def test_ac_1_2_default_sizing_meets_floor(tmp_path: Path) -> None: + """At default sizing, row counts meet the AC-1.2 floor.""" + out = tmp_path / "analytics.duckdb" + generate.main(output_path=out, seed=42) + con = duckdb.connect(str(out), read_only=True) + try: + n_customers = con.execute("SELECT COUNT(*) FROM customers").fetchone()[0] + n_events = con.execute("SELECT COUNT(*) FROM events").fetchone()[0] + n_subscriptions = con.execute("SELECT COUNT(*) FROM subscriptions").fetchone()[0] + n_invoices = con.execute("SELECT COUNT(*) FROM invoices").fetchone()[0] + finally: + con.close() + assert n_customers >= 1_000 + assert n_events >= 50_000 + assert n_subscriptions >= 2_000 + assert n_invoices >= 5_000 + + +# ---- AC-1.3 (progress to stderr) ------------------------------------------- + +def test_ac_1_3_emits_progress(capsys: pytest.CaptureFixture[str], tmp_path: Path) -> None: + out = tmp_path / "analytics.duckdb" + generate.main( + n_customers=100, n_subscriptions=200, n_events=500, n_invoices=300, seed=1, output_path=out + ) + captured = capsys.readouterr() + # Progress emits via _emit() -> stderr with a [data] prefix + assert "[data]" in captured.err + assert "customers" in captured.err + assert "done" in captured.err + + +# ---- AC-1.4 (auto-create warehouse dir) ------------------------------------ + +def test_ac_1_4_creates_missing_warehouse_dir(tmp_path: Path) -> None: + nested = tmp_path / "nope" / "deeper" / "analytics.duckdb" + assert not nested.parent.exists() + generate.main( + n_customers=50, n_subscriptions=100, n_events=200, n_invoices=120, seed=2, output_path=nested + ) + assert nested.exists() + + +# ---- AC-1.5 / AC-δ.2 (determinism) ----------------------------------------- + +def test_ac_1_5_deterministic_with_same_seed(tmp_path: Path) -> None: + a = tmp_path / "a.duckdb" + b = tmp_path / "b.duckdb" + generate.main( + n_customers=100, n_subscriptions=200, n_events=500, n_invoices=300, seed=7, output_path=a + ) + generate.main( + n_customers=100, n_subscriptions=200, n_events=500, n_invoices=300, seed=7, output_path=b + ) + + def read_all(p: Path) -> dict[str, list[tuple]]: + con = duckdb.connect(str(p), read_only=True) + try: + return { + t: con.execute(f"SELECT * FROM {t} ORDER BY 1").fetchall() + for t in ("customers", "subscriptions", "events", "invoices") + } + finally: + con.close() + + assert read_all(a) == read_all(b) + + +# ---- AC-γ.1 (no real PII) -------------------------------------------------- + +def test_ac_gamma_1_no_real_pii_signature(small_warehouse: Path) -> None: + """Sanity check: emails follow the Faker company-email pattern (not gmail/etc.).""" + con = duckdb.connect(str(small_warehouse), read_only=True) + try: + sample_emails = [ + row[0] for row in con.execute("SELECT email FROM customers LIMIT 50").fetchall() + ] + finally: + con.close() + common_real_domains = {"gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "icloud.com"} + for email in sample_emails: + domain = email.split("@", 1)[1].lower() + assert domain not in common_real_domains, f"unexpected real-domain email: {email}" + + +# ---- Schema round-trip (extra sanity) -------------------------------------- + +def test_schemas_validate_first_row(small_warehouse: Path) -> None: + """Each table's first row deserializes into its Pydantic schema.""" + con = duckdb.connect(str(small_warehouse), read_only=True) + try: + cust_row = con.execute("SELECT * FROM customers LIMIT 1").fetchdf().iloc[0].to_dict() + sub_row = con.execute("SELECT * FROM subscriptions LIMIT 1").fetchdf().iloc[0].to_dict() + evt_row = con.execute("SELECT * FROM events LIMIT 1").fetchdf().iloc[0].to_dict() + inv_row = con.execute("SELECT * FROM invoices LIMIT 1").fetchdf().iloc[0].to_dict() + finally: + con.close() + + # Pydantic accepts pandas/numpy datetimes; allow lenient parsing. + Customer.model_validate(cust_row) + Subscription.model_validate(sub_row) + Event.model_validate(evt_row) + Invoice.model_validate(inv_row)