diff --git a/QUICKSTART.md b/QUICKSTART.md index 5317e6b3..45b65d1c 100644 --- a/QUICKSTART.md +++ b/QUICKSTART.md @@ -29,6 +29,21 @@ O Data Boar **não substitui** assessoria jurídica; produz **sinais técnicos** --- +## Caminho 0 — Zero-config (recomendado no Windows após `pip install data-boar`) + +Sem `config.yaml`, sem Docker, sem YAML — corpus **sintético** embutido: + +```powershell +pip install data-boar +data-boar --demo +``` + +Abra [http://127.0.0.1:8088/pt-br/](http://127.0.0.1:8088/pt-br/) — achados de demonstração já carregados. + +**No clone (desenvolvimento):** `uv sync` na raiz, depois `uv run python main.py --demo` ou `.\scripts\demo.sh`. + +--- + ## Caminho A — Docker (menos fricção para não desenvolvedores) Execute na **raiz do clone** (ajuste o caminho do repositório): diff --git a/README.md b/README.md index 925c3d78..1e582f8f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Data Boar -> **Try it in 30 seconds (no real data) — any OS (Docker):** `docker run --rm -p 8088:8088 fabioleitao/data_boar:latest demo` → open `http://127.0.0.1:8088/pt-br/`. **Linux/macOS shell (local `uv`, no Docker):** `./scripts/demo.sh` (requires `uv` — [install](https://docs.astral.sh/uv/getting-started/installation/)). **Windows or step-by-step:** [5-min QuickStart](QUICKSTART.md). All demo paths use synthetic data and plaintext loopback (`--allow-insecure-http`). +> **Try it in 30 seconds (no real data):** `data-boar --demo` (or `python main.py --demo`) → open `http://127.0.0.1:8088/pt-br/`. **Docker:** `docker run --rm -p 8088:8088 fabioleitao/data_boar:latest demo`. **Shell wrapper:** `./scripts/demo.sh` (requires `uv`). **Windows step-by-step:** [5-min QuickStart](QUICKSTART.md). Synthetic data only; loopback plaintext (`--allow-insecure-http`). **Data Boar** — enterprise data discovery and risk governance: compliance-aware mapping of personal and sensitive data across your data soup (intelligence engine, not a single-jurisdiction “audit app”). diff --git a/api/templates/help.html b/api/templates/help.html index 5723b6ab..c8afec21 100644 --- a/api/templates/help.html +++ b/api/templates/help.html @@ -34,7 +34,9 @@
{{ t('help.run_cli_p1') }}
{{ t('help.run_cli_oneshot') }}
-uv run python main.py --config config.yaml
+ uv run python main.py --demo
+data-boar --demo
+uv run python main.py --config config.yaml
uv run python main.py --config config.yaml --validate-config
uv run python main.py --config config.yaml --diff <session_a> <session_b>
uv run python main.py --config config.yaml --diff <session_a> <session_b> --fail-on-new-high
diff --git a/core/demo/__init__.py b/core/demo/__init__.py
new file mode 100644
index 00000000..8dabc40c
--- /dev/null
+++ b/core/demo/__init__.py
@@ -0,0 +1,21 @@
+"""Installable demo corpus and workspace helpers (#1113)."""
+
+from core.demo.runtime import (
+ prepare_demo_workspace,
+ print_demo_banner,
+ register_demo_cleanup,
+)
+from core.demo.synthetic_corpus import (
+ ALL_SCENARIOS,
+ generate_corpus,
+ main as generate_corpus_cli,
+)
+
+__all__ = [
+ "ALL_SCENARIOS",
+ "generate_corpus",
+ "generate_corpus_cli",
+ "prepare_demo_workspace",
+ "print_demo_banner",
+ "register_demo_cleanup",
+]
diff --git a/core/demo/runtime.py b/core/demo/runtime.py
new file mode 100644
index 00000000..c0f6958a
--- /dev/null
+++ b/core/demo/runtime.py
@@ -0,0 +1,111 @@
+"""Demo workspace preparation for ``data-boar --demo`` (#1113, #834)."""
+
+from __future__ import annotations
+
+import atexit
+import os
+import tempfile
+from pathlib import Path
+from typing import Any
+
+from core.demo.synthetic_corpus import ALL_SCENARIOS, generate_corpus
+
+_DEFAULT_SCENARIOS = "happy,unhappy,false_positive"
+_DEMO_DIRNAME = "data_boar_demo"
+_registered_cleanup: Path | None = None
+
+
+def _default_demo_root() -> Path:
+ return Path(tempfile.gettempdir()) / _DEMO_DIRNAME
+
+
+def _write_demo_config(demo_dir: Path, port: int) -> Path:
+ corpus = demo_dir / "corpus"
+ reports = demo_dir / "reports"
+ reports.mkdir(parents=True, exist_ok=True)
+ config_path = demo_dir / "demo.config.yaml"
+ config_path.write_text(
+ (
+ "targets:\n"
+ " - name: demo-corpus\n"
+ " type: filesystem\n"
+ f" path: {corpus}\n"
+ " recursive: true\n"
+ "\n"
+ "report:\n"
+ f" output_dir: {reports}\n"
+ "\n"
+ f"sqlite_path: {demo_dir / 'audit_results.db'}\n"
+ "\n"
+ "api:\n"
+ f" port: {port}\n"
+ " host: 127.0.0.1\n"
+ " allow_insecure_http: true\n"
+ ),
+ encoding="utf-8",
+ )
+ return config_path
+
+
+def _cleanup_demo_dir(demo_dir: Path) -> None:
+ import shutil
+
+ if demo_dir.exists():
+ shutil.rmtree(demo_dir, ignore_errors=True)
+
+
+def register_demo_cleanup(demo_dir: Path) -> None:
+ """Register atexit cleanup for a single-process ``--demo`` run."""
+ global _registered_cleanup
+ if _registered_cleanup is not None:
+ return
+ _registered_cleanup = demo_dir
+
+ def _on_exit() -> None:
+ _cleanup_demo_dir(demo_dir)
+
+ atexit.register(_on_exit)
+
+
+def print_demo_banner(port: int, demo_dir: Path) -> None:
+ print("")
+ print("╔══════════════════════════════════════════════════════════╗")
+ print("║ Data Boar — Demo (synthetic corpus, zero real data) ║")
+ print("╚══════════════════════════════════════════════════════════╝")
+ print(f"[demo] Workspace: {demo_dir}")
+ print(f"[demo] Dashboard: http://127.0.0.1:{port}/pt-br/")
+ print("[demo] Press Ctrl+C to stop (temp files removed on exit).")
+ print("")
+
+
+def prepare_demo_workspace(
+ *,
+ port: int = 8088,
+ scenarios: str = _DEFAULT_SCENARIOS,
+ demo_root: Path | None = None,
+ register_cleanup: bool = True,
+) -> tuple[Path, Path, dict[str, Any]]:
+ """
+ Generate synthetic corpus + minimal config under a temp directory.
+
+ Returns ``(demo_dir, config_path, config_dict)`` where ``config_dict`` is
+ ready to pass to ``load_config``-equivalent flows (after YAML load).
+ """
+ from config.loader import load_config
+
+ demo_dir = (demo_root or _default_demo_root()).resolve()
+ demo_dir.mkdir(parents=True, exist_ok=True)
+ corpus_dir = demo_dir / "corpus"
+ corpus_dir.mkdir(parents=True, exist_ok=True)
+
+ selected = [s.strip() for s in scenarios.split(",") if s.strip()]
+ generate_corpus(corpus_dir, selected or ALL_SCENARIOS[:3])
+
+ config_path = _write_demo_config(demo_dir, port)
+ os.environ["CONFIG_PATH"] = str(config_path)
+ config = load_config(str(config_path))
+
+ if register_cleanup:
+ register_demo_cleanup(demo_dir)
+
+ return demo_dir, config_path, config
diff --git a/core/demo/synthetic_corpus.py b/core/demo/synthetic_corpus.py
new file mode 100644
index 00000000..c5bb2f1b
--- /dev/null
+++ b/core/demo/synthetic_corpus.py
@@ -0,0 +1,939 @@
+#!/usr/bin/env python3
+"""
+generate_synthetic_poc_corpus.py
+================================
+Generates a synthetic test corpus for Data Boar POC validation.
+
+Covers seven test scenarios:
+ 1. happy -- clear PII in plain-text formats (should be found)
+ 2. unhappy -- PII with OCR noise, encoding quirks (should be found, harder)
+ 3. catastrophic -- nested archives, password-protected zips (may be missed)
+ 4. false_positive -- data that LOOKS like PII but is invalid (should NOT trigger)
+ 5. manual_review -- ambiguous / partial data (flag for human review)
+ 6. stego -- CPF/RG hidden in image LSB (NOT found without stego module)
+ 7. extensions -- one file per supported extension, all containing a CPF
+
+Usage:
+ uv run python scripts/generate_synthetic_poc_corpus.py
+ uv run python scripts/generate_synthetic_poc_corpus.py --scenario happy,stego
+ uv run python scripts/generate_synthetic_poc_corpus.py --output /tmp/poc_corpus
+
+Collaborator note:
+ After generating, point Data Boar at each sub-folder and compare findings
+ against the expected results in EXPECTED.txt (each sub-folder) and
+ docs/TESTING_POC_GUIDE.md (full validation checklist).
+"""
+
+from __future__ import annotations
+
+import argparse
+import csv
+import io
+import json
+import sqlite3
+import tarfile
+import textwrap
+import zipfile
+from pathlib import Path
+from typing import Callable
+
+# ---------------------------------------------------------------------------
+# Synthetic PII (deterministic, never real persons)
+# ---------------------------------------------------------------------------
+_CPFS = [
+ "123.456.789-09",
+ "987.654.321-00",
+ "111.222.333-96",
+ "000.000.001-91",
+ "529.982.247-25",
+]
+_CNPJS = ["11.222.333/0001-81", "00.000.000/0001-91", "12.345.678/0001-95"]
+_RGS = ["12.345.678-9", "98.765.432-1", "00.111.222-3"]
+_NAMES = [
+ "Ana Paula Souza",
+ "Carlos Eduardo Lima",
+ "Fernanda Beatriz Costa",
+ "Joao Roberto Colleague-E",
+ "Maria Oliveira Santos",
+]
+_EMAILS = [
+ "ana.souza@example-test.com",
+ "carlos.lima@demo.invalid",
+ "f.costa@poc-databoar.test",
+]
+_PHONES = ["(11) 99999-0001", "+55 21 98888-0002", "0800 123 4567"]
+_DATES = ["15/03/1985", "1990-07-22", "01/01/1970"]
+_ADDRS = [
+ "Rua das Flores, 123, Sao Paulo - SP, CEP 01234-567",
+ "Av. Brasil, 4500, Rio de Janeiro - RJ",
+]
+
+EXPECTED: dict[str, str] = {
+ "1_happy": "DEVE ENCONTRAR -- PII em claro, sem ofuscacao",
+ "2_unhappy": "DEVE ENCONTRAR -- mas pode requerer OCR ou tolerancia a ruido",
+ "3_catastrophic": "PODE NAO ENCONTRAR -- dados em arquivos aninhados ou com senha",
+ "4_false_positive": "NAO DEVE ENCONTRAR -- strings similares a PII mas invalidas",
+ "5_manual_review": "DEVE SINALIZAR PARA REVISAO MANUAL -- dados parcialmente mascarados",
+ "6_stego": "NAO DEVE ENCONTRAR sem modulo estego -- CPF em LSB de imagem PNG",
+ "7_extensions": "DEVE ENCONTRAR em todos os formatos suportados",
+}
+
+
+def _p(lst: list[str], i: int = 0) -> str:
+ return lst[i % len(lst)]
+
+
+def _w(path: Path, content: str | bytes, enc: str = "utf-8") -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ if isinstance(content, bytes):
+ path.write_bytes(content)
+ else:
+ path.write_text(content, encoding=enc)
+
+
+# ---------------------------------------------------------------------------
+# Scenario 1 - Happy path
+# ---------------------------------------------------------------------------
+def gen_scenario_1(base: Path) -> None:
+ out = base / "1_happy"
+
+ _w(
+ out / "employees.txt",
+ textwrap.dedent(f"""\
+ RELATORIO DE FUNCIONARIOS -- FICTICIO -- APENAS PARA TESTES POC
+ Nome: {_p(_NAMES, 0)} CPF: {_p(_CPFS, 0)} RG: {_p(_RGS, 0)}
+ Email: {_p(_EMAILS, 0)} Tel: {_p(_PHONES, 0)}
+ Nasc: {_p(_DATES, 0)} End: {_p(_ADDRS, 0)}
+ Nome: {_p(_NAMES, 1)} CPF: {_p(_CPFS, 1)} CNPJ: {_p(_CNPJS, 0)}
+ """),
+ )
+
+ buf = io.StringIO()
+ csv.writer(buf).writerows(
+ [["nome", "cpf", "rg", "email"]]
+ + [[_p(_NAMES, i), _p(_CPFS, i), _p(_RGS, i), _p(_EMAILS, i)] for i in range(5)]
+ )
+ _w(out / "employees.csv", buf.getvalue())
+
+ _w(
+ out / "employees.json",
+ json.dumps(
+ [
+ {"nome": _p(_NAMES, i), "cpf": _p(_CPFS, i), "rg": _p(_RGS, i)}
+ for i in range(4)
+ ],
+ ensure_ascii=False,
+ indent=2,
+ ),
+ )
+
+ try:
+ from reportlab.pdfgen import canvas as rc
+
+ c = rc.Canvas(str(out / "employees.pdf"))
+ y = 780
+ c.drawString(50, y, "DADOS FICTICIOS -- POC Data Boar")
+ y -= 20
+ for i in range(3):
+ for lbl, v in [
+ ("Nome", _p(_NAMES, i)),
+ ("CPF", _p(_CPFS, i)),
+ ("Email", _p(_EMAILS, i)),
+ ]:
+ c.drawString(50, y, f"{lbl}: {v}")
+ y -= 15
+ c.save()
+ except ImportError:
+ _w(out / "employees_pdf_fallback.txt", f"PDF nao gerado. CPF: {_p(_CPFS, 0)}")
+
+ try:
+ import docx as _d
+
+ doc = _d.Document()
+ doc.add_heading("Dados Ficticios POC", 0)
+ for i in range(3):
+ doc.add_paragraph(
+ f"Nome: {_p(_NAMES, i)}\nCPF: {_p(_CPFS, i)}\nRG: {_p(_RGS, i)}\n"
+ )
+ doc.save(str(out / "employees.docx"))
+ except ImportError:
+ _w(out / "employees_docx_fallback.txt", f"DOCX nao gerado. CPF: {_p(_CPFS, 1)}")
+
+ try:
+ import openpyxl
+
+ wb = openpyxl.Workbook()
+ ws = wb.active
+ ws.title = "Funcionarios"
+ ws.append(["Nome", "CPF", "RG", "Email", "Tel"])
+ for i in range(5):
+ ws.append(
+ [
+ _p(_NAMES, i),
+ _p(_CPFS, i),
+ _p(_RGS, i),
+ _p(_EMAILS, i),
+ _p(_PHONES, i),
+ ]
+ )
+ wb.save(str(out / "employees.xlsx"))
+ except ImportError:
+ _w(out / "employees_xlsx_fallback.txt", f"XLSX nao gerado. CPF: {_p(_CPFS, 2)}")
+
+ conn = sqlite3.connect(str(out / "employees.db"))
+ conn.execute(
+ "CREATE TABLE IF NOT EXISTS emp (id INTEGER PRIMARY KEY,nome TEXT,cpf TEXT,rg TEXT,email TEXT)"
+ )
+ for i in range(5):
+ conn.execute(
+ "INSERT INTO emp(nome,cpf,rg,email) VALUES(?,?,?,?)",
+ (_p(_NAMES, i), _p(_CPFS, i), _p(_RGS, i), _p(_EMAILS, i)),
+ )
+ conn.commit()
+ conn.close()
+
+ try:
+ from PIL import Image, ImageDraw
+
+ img = Image.new("RGB", (400, 120), (255, 255, 255))
+ draw = ImageDraw.Draw(img)
+ draw.text((10, 20), f"CPF: {_p(_CPFS, 0)}", (0, 0, 0))
+ draw.text((10, 50), f"Nome: {_p(_NAMES, 0)}", (0, 0, 0))
+ img.save(str(out / "id_card_visible.png"))
+ except ImportError:
+ _w(out / "image_fallback.txt", f"PNG nao gerado. CPF: {_p(_CPFS, 0)}")
+
+ _w(out / "EXPECTED.txt", EXPECTED["1_happy"])
+ print(f" v Scenario 1 (happy) -> {out}")
+
+
+# ---------------------------------------------------------------------------
+# Scenario 2 - Unhappy path
+# ---------------------------------------------------------------------------
+def gen_scenario_2(base: Path) -> None:
+ out = base / "2_unhappy"
+ import base64 as _b64
+
+ _w(
+ out / "ocr_noisy.txt",
+ f"N0me: {_p(_NAMES, 2).replace('a', '@').replace('e', '3')}\n"
+ f"CPF: {_p(_CPFS, 2).replace('.', ',')} (possivel ruido OCR)\n"
+ f"RG: {_p(_RGS, 2).replace('-', '_')}\n"
+ f"Email: {_p(_EMAILS, 2).replace('@', '[at]')}\n",
+ )
+
+ _w(
+ out / "latin1_encoded.txt",
+ f"Nome: {_p(_NAMES, 0)}\nCPF: {_p(_CPFS, 0)}\nObservacao: dado em latin-1\n",
+ enc="latin-1",
+ )
+
+ _w(
+ out / "bom_utf8.csv",
+ f"\ufeffNome;CPF;RG\n{_p(_NAMES, 1)};{_p(_CPFS, 1)};{_p(_RGS, 1)}\n",
+ enc="utf-8-sig",
+ )
+
+ _w(
+ out / "crlf_endings.txt",
+ f"CPF: {_p(_CPFS, 3)}\r\nTel: {_p(_PHONES, 0)}\r\nEnd: {_p(_ADDRS, 0)}\r\n",
+ )
+
+ _w(
+ out / "partial_redaction.txt",
+ f"Nome: {_p(_NAMES, 0)}\n"
+ f"CPF: ***.{_p(_CPFS, 0)[4:7]}.***-** (parcialmente redactado)\n"
+ f"Email: {_p(_EMAILS, 0)}\nRG: {_p(_RGS, 0)}\n",
+ )
+
+ blob = _b64.b64encode(f"CPF:{_p(_CPFS, 1)},Nome:{_p(_NAMES, 1)}".encode()).decode()
+ _w(out / "base64_embedded.txt", f"campo_documento: {blob}\n# dado acima e base64\n")
+
+ _w(out / "EXPECTED.txt", EXPECTED["2_unhappy"])
+ print(f" v Scenario 2 (unhappy) -> {out}")
+
+
+# ---------------------------------------------------------------------------
+# Scenario 3 - Catastrophic
+# ---------------------------------------------------------------------------
+def gen_scenario_3(base: Path) -> None:
+ out = base / "3_catastrophic"
+ out.mkdir(parents=True, exist_ok=True)
+ pii = f"DADOS FICTICIOS\nCPF: {_p(_CPFS, 4)}\nNome: {_p(_NAMES, 4)}\nRG: {_p(_RGS, 2)}\n".encode()
+
+ # nested zip
+ inner_buf = io.BytesIO()
+ with zipfile.ZipFile(inner_buf, "w", zipfile.ZIP_DEFLATED) as z:
+ z.writestr("pii.txt", pii)
+ with zipfile.ZipFile(out / "nested.zip", "w", zipfile.ZIP_DEFLATED) as z:
+ z.writestr("inner.zip", inner_buf.getvalue())
+
+ # password-protected zip
+ with zipfile.ZipFile(out / "password_protected.zip", "w", zipfile.ZIP_STORED) as z:
+ z.setpassword(b"poc-test-123")
+ z.writestr("secret.txt", pii)
+
+ # tar.gz
+ with tarfile.open(str(out / "archive.tar.gz"), "w:gz") as t:
+ info = tarfile.TarInfo("pii.txt")
+ info.size = len(pii)
+ t.addfile(info, io.BytesIO(pii))
+
+ # tar.bz2
+ with tarfile.open(str(out / "archive.tar.bz2"), "w:bz2") as t:
+ info = tarfile.TarInfo("pii.txt")
+ info.size = len(pii)
+ t.addfile(info, io.BytesIO(pii))
+
+ # disguised extension (text file named .jpg)
+ _w(
+ out / "report_2026.jpg",
+ pii.decode() + "\n# Arquivo de texto mascarado como .jpg\n",
+ )
+
+ # very long line stress test
+ _w(
+ out / "long_line_stress.txt",
+ "x" * 5000 + f" CPF: {_p(_CPFS, 0)} " + "y" * 5000 + "\n",
+ )
+
+ _w(out / "EXPECTED.txt", EXPECTED["3_catastrophic"])
+ _w(
+ out / "PASSWORD_HINT.txt",
+ "Senha: poc-test-123\nConfiguracao: zip_password no config.yaml\n",
+ )
+ print(f" v Scenario 3 (catastrophic) -> {out}")
+
+
+# ---------------------------------------------------------------------------
+# Scenario 4 - False positive pressure
+# ---------------------------------------------------------------------------
+def gen_scenario_4(base: Path) -> None:
+ import random as _r
+
+ out = base / "4_false_positive"
+
+ def _invalid_cpf_shaped() -> str:
+ d = [_r.randint(0, 9) for _ in range(9)]
+ return f"{d[0]}{d[1]}{d[2]}.{d[3]}{d[4]}{d[5]}.{d[6]}{d[7]}{d[8]}-{(d[0] + 1) % 10}{(d[1] + 1) % 10}"
+
+ _w(
+ out / "serial_numbers.txt",
+ "CATALOGO -- FICÇÃO\n"
+ + "\n".join(f"Serial: {_invalid_cpf_shaped()}" for _ in range(10)),
+ )
+ _w(out / "cnpj_shaped_refs.txt", "Ref: 00.111.222/0099-00\n" * 5) # invalid CNPJ
+ _w(
+ out / "random_codes.txt",
+ "\n".join(f"Cod: {_r.randint(10000000000, 99999999999)}" for _ in range(20)),
+ )
+ _w(
+ out / "ip_addresses.txt",
+ "\n".join(f"IP: 10.0.{i}.{j}" for i in range(5) for j in range(5)),
+ )
+ _w(
+ out / "version_strings.txt",
+ "\n".join(f"v: 1.{i}.{i + 1}-{i + 2}" for i in range(10)),
+ )
+
+ _w(out / "EXPECTED.txt", EXPECTED["4_false_positive"])
+ print(f" v Scenario 4 (false_positive) -> {out}")
+
+
+# ---------------------------------------------------------------------------
+# Scenario 5 - Manual review triggers
+# ---------------------------------------------------------------------------
+def gen_scenario_5(base: Path) -> None:
+ out = base / "5_manual_review"
+ _w(
+ out / "masked_pii.txt",
+ textwrap.dedent("""\
+ CPF: ***.456.789-** (mascarado -- padrao parcial visivel)
+ CPF: 123.***.***-09 (mascarado -- inicio e fim visiveis)
+ RG: 12.345.***-*
+ Email: a***.s***@example.com
+ Tel: (11) 9****-0001
+ Nome: Ana P. S. (iniciais -- identificacao possivel com contexto)
+ """),
+ )
+ _w(
+ out / "pii_in_prose.txt",
+ textwrap.dedent("""\
+ O documento de CPF terminado em 09 foi verificado.
+ O numero de registro e 123456789 (sem pontuacao -- validacao manual necessaria).
+ O titular nasceu em quinze de marco de 1985.
+ """),
+ )
+ _w(
+ out / "foreign_pii.txt",
+ textwrap.dedent("""\
+ DNI: 12345678A (Espanha -- nao e CPF brasileiro)
+ SSN: 123-45-6789 (EUA -- nao e CPF)
+ NIF: X1234567L (Espanha -- estrangeiro)
+ """),
+ )
+ _w(
+ out / "anonymized_columns.csv",
+ "cpf,nome,email\n[ANONIMIZADO],[ANONIMIZADO],[ANONIMIZADO]\n" * 5,
+ )
+
+ _w(out / "EXPECTED.txt", EXPECTED["5_manual_review"])
+ print(f" v Scenario 5 (manual_review) -> {out}")
+
+
+# ---------------------------------------------------------------------------
+# Scenario 6 - Steganography (LSB + EXIF metadata)
+# ---------------------------------------------------------------------------
+def _embed_lsb(img_path: Path, secret: str) -> None:
+ from PIL import Image
+
+ img = Image.new("RGB", (200, 200), (200, 200, 200))
+ pixels = list(img.getdata())
+ bits = "".join(f"{ord(c):08b}" for c in secret) + "00000000"
+ new_pixels = []
+ for i, (r, g, b) in enumerate(pixels):
+ if i < len(bits):
+ r = (r & 0xFE) | int(bits[i])
+ new_pixels.append((r, g, b))
+ out_img = Image.new("RGB", (200, 200))
+ out_img.putdata(new_pixels)
+ out_img.save(str(img_path), format="PNG")
+
+
+def _extract_lsb(img_path: Path) -> str:
+ from PIL import Image
+
+ pixels = list(Image.open(str(img_path)).getdata())
+ bits = [str(r & 1) for r, g, b in pixels]
+ chars = []
+ for i in range(0, len(bits) - 8, 8):
+ c = chr(int("".join(bits[i : i + 8]), 2))
+ if c == "\x00":
+ break
+ chars.append(c)
+ return "".join(chars)
+
+
+def gen_scenario_6(base: Path) -> None:
+ out = base / "6_stego"
+ out.mkdir(parents=True, exist_ok=True)
+ try:
+ from PIL import Image
+ from PIL.PngImagePlugin import PngInfo
+
+ secret = f"CPF:{_p(_CPFS, 0)};Nome:{_p(_NAMES, 0)}"
+ stego_path = out / "innocent_photo.png"
+ _embed_lsb(stego_path, secret)
+ recovered = _extract_lsb(stego_path)
+ assert recovered == secret, f"LSB mismatch: {recovered!r}"
+
+ _w(
+ out / "STEGO_KEY.txt",
+ f"Arquivo: innocent_photo.png\n"
+ f"Dado oculto (LSB canal R): {secret}\n"
+ f"Metodo: LSB -- canal R da imagem PNG (1 bit por pixel)\n"
+ f"Para extrair manualmente: use stegosuite, steghide, ou a funcao _extract_lsb() neste script.\n"
+ f"Verificacao OK: dado recuperado = {recovered!r}\n",
+ )
+
+ # EXIF / PNG metadata injection
+ img = Image.new("RGB", (200, 200), (180, 200, 220))
+ meta = PngInfo()
+ meta.add_text("Comment", f"CPF:{_p(_CPFS, 1)} Nome:{_p(_NAMES, 1)}")
+ meta.add_text("Author", _p(_NAMES, 1))
+ img.save(str(out / "photo_with_exif_pii.png"), pnginfo=meta)
+
+ _w(
+ out / "EXPECTED.txt",
+ EXPECTED["6_stego"] + "\n\n"
+ "VALIDACAO MANUAL:\n"
+ "1. innocent_photo.png -- CPF em LSB. Scanner padrao NAO detecta.\n"
+ ' Extrair: uv run python -c "'
+ "from scripts.generate_synthetic_poc_corpus import _extract_lsb;"
+ "from pathlib import Path; print(_extract_lsb(Path('tests/synthetic_corpus/6_stego/innocent_photo.png')))\"\n"
+ "2. photo_with_exif_pii.png -- CPF em metadado PNG (Comment). Scanner PODE detectar se ler metadata.\n",
+ )
+
+ print(f" v Scenario 6 (stego) -> {out} [LSB OK, recovered={recovered!r}]")
+
+ except ImportError:
+ _w(
+ out / "EXPECTED.txt",
+ "Pillow nao disponivel -- cenario 6 nao gerado.\nInstale: pip install pillow\n"
+ + EXPECTED["6_stego"],
+ )
+ print(
+ " ! Scenario 6 (stego) -> Pillow indisponivel, documentado sem gerar imagem"
+ )
+
+
+# ---------------------------------------------------------------------------
+# Scenario 7 - Extension coverage (one file per supported extension)
+# ---------------------------------------------------------------------------
+def gen_all_extensions(base: Path) -> None:
+ out = base / "7_extensions"
+ out.mkdir(parents=True, exist_ok=True)
+ pii = f"CPF: {_p(_CPFS, 0)}\nNome: {_p(_NAMES, 0)}\n"
+ pii_b = pii.encode()
+
+ for ext in [
+ ".txt",
+ ".log",
+ ".md",
+ ".rst",
+ ".cfg",
+ ".ini",
+ ".env",
+ ".yml",
+ ".yaml",
+ ".sql",
+ ]:
+ _w(out / f"sample{ext}", pii)
+
+ _w(
+ out / "sample.json",
+ json.dumps({"cpf": _p(_CPFS, 0), "nome": _p(_NAMES, 0)}, ensure_ascii=False),
+ )
+ _w(
+ out / "sample.xml",
+ f'{_p(_CPFS, 0)} {_p(_NAMES, 0)} ',
+ )
+ _w(out / "sample.csv", f"cpf,nome\n{_p(_CPFS, 0)},{_p(_NAMES, 0)}\n")
+ _w(out / "sample.tsv", f"cpf\tnome\n{_p(_CPFS, 0)}\t{_p(_NAMES, 0)}\n")
+
+ with zipfile.ZipFile(out / "sample.zip", "w") as z:
+ z.writestr("pii.txt", pii)
+ with tarfile.open(str(out / "sample.tar.gz"), "w:gz") as t:
+ i = tarfile.TarInfo("pii.txt")
+ i.size = len(pii_b)
+ t.addfile(i, io.BytesIO(pii_b))
+ with tarfile.open(str(out / "sample.tar.bz2"), "w:bz2") as t:
+ i = tarfile.TarInfo("pii.txt")
+ i.size = len(pii_b)
+ t.addfile(i, io.BytesIO(pii_b))
+
+ conn = sqlite3.connect(str(out / "sample.db"))
+ conn.execute("CREATE TABLE t(cpf TEXT,nome TEXT)")
+ conn.execute("INSERT INTO t VALUES(?,?)", (_p(_CPFS, 0), _p(_NAMES, 0)))
+ conn.commit()
+ conn.close()
+
+ try:
+ import openpyxl
+
+ wb = openpyxl.Workbook()
+ wb.active.append(["cpf", "nome"])
+ wb.active.append([_p(_CPFS, 0), _p(_NAMES, 0)])
+ wb.save(str(out / "sample.xlsx"))
+ except ImportError:
+ pass
+ try:
+ import docx as _d
+
+ doc = _d.Document()
+ doc.add_paragraph(pii)
+ doc.save(str(out / "sample.docx"))
+ except ImportError:
+ pass
+ try:
+ from reportlab.pdfgen import canvas as rc
+
+ c = rc.Canvas(str(out / "sample.pdf"))
+ c.drawString(50, 750, f"CPF: {_p(_CPFS, 0)}")
+ c.save()
+ except ImportError:
+ pass
+ try:
+ from PIL import Image, ImageDraw
+
+ img = Image.new("RGB", (300, 80), (255, 255, 255))
+ draw = ImageDraw.Draw(img)
+ draw.text((10, 20), f"CPF: {_p(_CPFS, 0)}", (0, 0, 0))
+ img.save(str(out / "sample.png"))
+ img.save(str(out / "sample.jpg"))
+ except ImportError:
+ pass
+
+ _w(
+ out / "EXPECTED.txt",
+ "Todos os arquivos contem CPF 123.456.789-09.\n"
+ "O scanner DEVE encontrar em todos os formatos suportados.\n"
+ "Formatos NAO encontrados = gap de cobertura para documentar.\n",
+ )
+ print(f" v Scenario 7 (extensions) -> {out}")
+
+
+# ---------------------------------------------------------------------------
+# Main
+
+
+# ---------------------------------------------------------------------------
+# Scenario 8 - Stress / Load (OOM, large files, high concurrency corpus)
+# ---------------------------------------------------------------------------
+def gen_stress_load(base: Path) -> None:
+ """
+ Generates files designed to stress the scanner:
+ - Very large text file with PII scattered at known offsets
+ - Many small files (directory flood)
+ - Deeply nested directory tree
+ - File with millions of lines (minimal PII density)
+ - Binary file with PII embedded in non-printable bytes
+ All are expected to be found; OOM or timeout = reportable failure.
+ """
+ out = base / "8_stress_load"
+ out.mkdir(parents=True, exist_ok=True)
+
+ # Large file: 50 MB of padding with 10 CPF instances
+ large = out / "large_50mb.txt"
+ chunk = "x" * 1000 + "\n"
+ with large.open("w", encoding="utf-8") as f:
+ for i in range(50000): # ~50 MB
+ f.write(chunk)
+ if i % 5000 == 0 and i > 0:
+ f.write(f"CPF: {_p(_CPFS, i // 5000)}\nNome: {_p(_NAMES, i // 5000)}\n")
+ print(f" -> large file: {large} ({large.stat().st_size // 1024 // 1024} MB)")
+
+ # Directory flood: 500 tiny files
+ flood_dir = out / "directory_flood"
+ flood_dir.mkdir(exist_ok=True)
+ for i in range(500):
+ (flood_dir / f"file_{i:04d}.txt").write_text(
+ f"ref:{i}\nCPF: {_p(_CPFS, i)}\n"
+ if i % 50 == 0
+ else f"ref:{i}\nnada aqui\n",
+ encoding="utf-8",
+ )
+ print(f" -> directory flood: 500 files (10 with PII) -> {flood_dir}")
+
+ # Deep nesting: 10 levels, PII at the bottom
+ deep = out / "deep_nesting"
+ current = deep
+ for lvl in range(10):
+ current = current / f"level_{lvl:02d}"
+ current.mkdir(parents=True, exist_ok=True)
+ (current / "hidden_pii.txt").write_text(
+ f"CPF: {_p(_CPFS, 0)}\nNome: {_p(_NAMES, 0)}\n# 10 levels deep\n",
+ encoding="utf-8",
+ )
+ print(f" -> deep nesting (10 levels): {current / 'hidden_pii.txt'}")
+
+ # High line count: 1 million lines, PII on lines 100000, 500000, 999999
+ million_lines = out / "million_lines.txt"
+ with million_lines.open("w", encoding="utf-8") as f:
+ for i in range(1_000_000):
+ if i in {100_000, 500_000, 999_999}:
+ f.write(f"CPF: {_p(_CPFS, i % len(_CPFS))}\n")
+ else:
+ f.write(f"linha {i}\n")
+ print(f" -> million lines: {million_lines}")
+
+ _w(
+ out / "EXPECTED.txt",
+ "STRESS TEST -- OBJETIVO: scanner nao deve crashar nem perder PIIs.\n"
+ "Esperado: CPF encontrado em large_50mb.txt (10x), directory_flood (10 arquivos),\n"
+ "deep_nesting/hidden_pii.txt, e million_lines.txt (3x).\n"
+ "Falha: OOM, timeout, crash, ou PII nao encontrado.\n"
+ "Metrica: tempo de scan, memoria maxima (medir com /usr/bin/time -v ou psutil).\n",
+ )
+
+ _w(
+ out / "STRESS_TEST_COMMANDS.sh",
+ "#!/bin/bash\n"
+ "# Medir tempo e memoria do scan de stress\n"
+ "/usr/bin/time -v uv run python main.py \\\n"
+ " --config config.yaml \\\n"
+ " --scan --target tests/synthetic_corpus/8_stress_load \\\n"
+ " --report 2> stress_metrics.txt\n"
+ "echo 'Metricas em stress_metrics.txt'\n"
+ "grep -E 'Maximum resident|Elapsed|Exit code' stress_metrics.txt\n",
+ )
+ print(f" v Scenario 8 (stress/load) -> {out}")
+
+
+# ---------------------------------------------------------------------------
+# Scenario 9 - Config errors (intentional misconfigs for UX/error message QA)
+# ---------------------------------------------------------------------------
+def gen_config_errors(base: Path) -> None:
+ """
+ Generates intentionally broken config files + a test script to run each.
+ The goal is NOT to scan PII — it is to evaluate:
+ - Quality of error messages (stdout/stderr)
+ - Dashboard troubleshooting recommendations
+ - Recovery / retry behavior
+ Each config has a documented EXPECTED_ERROR and TROUBLESHOOT hint.
+ """
+ out = base / "9_config_errors"
+ out.mkdir(parents=True, exist_ok=True)
+
+ configs: list[dict] = [
+ {
+ "name": "wrong_db_host",
+ "description": "Database host does not exist (DNS failure)",
+ "config": {
+ "targets": [
+ {
+ "type": "postgresql",
+ "host": "nonexistent-db.local",
+ "port": 5432,
+ "database": "testdb",
+ "user": "admin",
+ "password": "secret",
+ }
+ ],
+ "report": {"output_dir": "./reports"},
+ },
+ "expected_error": "connection refused OR DNS resolution failure",
+ "troubleshoot": "Verifique se o host esta acessivel (ping / nslookup). "
+ "Confirme VPN ativa se DB for interno.",
+ },
+ {
+ "name": "wrong_db_credentials",
+ "description": "Valid host but wrong username/password",
+ "config": {
+ "targets": [
+ {
+ "type": "postgresql",
+ "host": "localhost",
+ "port": 5432,
+ "database": "testdb",
+ "user": "wrong_user",
+ "password": "wrong_pass",
+ }
+ ],
+ "report": {"output_dir": "./reports"},
+ },
+ "expected_error": "authentication failed for user 'wrong_user'",
+ "troubleshoot": "Verifique as credenciais. Use variavel de ambiente DB_PASSWORD "
+ "em vez de senha em texto no config.",
+ },
+ {
+ "name": "missing_output_dir",
+ "description": "Report output_dir does not exist and cannot be created",
+ "config": {
+ "targets": [
+ {"type": "filesystem", "path": "./tests/synthetic_corpus/1_happy"}
+ ],
+ "report": {"output_dir": "/nonexistent/readonly/path"},
+ },
+ "expected_error": "permission denied OR directory not found",
+ "troubleshoot": "Crie o diretorio manualmente ou aponte para um caminho gravavel. "
+ "Em Docker: monte o volume correto.",
+ },
+ {
+ "name": "invalid_target_type",
+ "description": "Unknown connector type specified",
+ "config": {
+ "targets": [
+ {"type": "oracle_xyz_invalid", "host": "localhost", "port": 1521}
+ ],
+ "report": {"output_dir": "./reports"},
+ },
+ "expected_error": "unknown connector type 'oracle_xyz_invalid'",
+ "troubleshoot": "Tipos validos: postgresql, mysql, mssql, oracle, mongodb, redis, "
+ "filesystem. Verifique a documentacao em docs/USAGE.md.",
+ },
+ {
+ "name": "malformed_yaml",
+ "description": "Syntactically invalid YAML config",
+ "raw_content": "targets:\n - type: postgresql\n host: localhost\n bad yaml: [unclosed\n",
+ "expected_error": "YAML parse error",
+ "troubleshoot": "Valide o YAML em https://www.yamllint.com/ ou com: "
+ "python -c \"import yaml; yaml.safe_load(open('config.yaml'))\"",
+ },
+ {
+ "name": "missing_required_field",
+ "description": "Config missing required 'targets' key",
+ "config": {
+ "report": {"output_dir": "./reports"},
+ },
+ "expected_error": "missing required field 'targets' in config",
+ "troubleshoot": "Copie o config de exemplo: cp deploy/config.example.yaml config.yaml "
+ "e edite os targets.",
+ },
+ {
+ "name": "path_not_found",
+ "description": "Filesystem target path does not exist",
+ "config": {
+ "targets": [
+ {"type": "filesystem", "path": "/nonexistent/data/path/12345"}
+ ],
+ "report": {"output_dir": "./reports"},
+ },
+ "expected_error": "path '/nonexistent/data/path/12345' does not exist",
+ "troubleshoot": "Confirme que o caminho existe e que o usuario tem permissao de leitura. "
+ "Em Docker: monte o volume com -v /seu/caminho:/data.",
+ },
+ {
+ "name": "api_key_wrong",
+ "description": "API request with wrong X-API-Key header",
+ "config": {
+ "targets": [
+ {"type": "filesystem", "path": "./tests/synthetic_corpus/1_happy"}
+ ],
+ "api": {"require_api_key": True, "api_key": "correct-key-12345"},
+ "report": {"output_dir": "./reports"},
+ },
+ "expected_error": "HTTP 401 Unauthorized when calling API with wrong key",
+ "troubleshoot": "Use X-API-Key: correct-key-12345 no header. "
+ "Para testar: curl -H 'X-API-Key: wrong-key' http://localhost:8088/api/v1/scan",
+ "test_curl": (
+ "curl -s -o /dev/null -w '%{http_code}' "
+ "-H 'X-API-Key: WRONG-KEY' http://localhost:8088/api/v1/status"
+ ),
+ },
+ ]
+
+ import yaml as _yaml # may not be available; fall back to json dump
+
+ test_script_lines = [
+ "#!/bin/bash",
+ "# Auto-generated: test each broken config and capture exit code + output",
+ "# Usage: bash 9_config_errors/run_error_tests.sh 2>&1 | tee error_test_results.txt",
+ "",
+ "PASS=0; FAIL=0; SKIP=0",
+ "",
+ ]
+
+ for cfg in configs:
+ cfg_path = out / f"config_{cfg['name']}.yaml"
+ if "raw_content" in cfg:
+ _w(cfg_path, cfg["raw_content"])
+ else:
+ try:
+ import yaml as _yaml
+
+ _w(
+ cfg_path,
+ _yaml.dump(
+ cfg["config"], allow_unicode=True, default_flow_style=False
+ ),
+ )
+ except ImportError:
+ _w(cfg_path, json.dumps(cfg["config"], ensure_ascii=False, indent=2))
+
+ doc_path = out / f"doc_{cfg['name']}.txt"
+ _w(
+ doc_path,
+ (
+ f"Config: {cfg['name']}\n"
+ f"Descricao: {cfg['description']}\n"
+ f"Erro esperado: {cfg['expected_error']}\n"
+ f"Troubleshoot: {cfg['troubleshoot']}\n"
+ + (f"Teste curl: {cfg.get('test_curl', 'N/A')}\n")
+ ),
+ )
+
+ name_val = cfg["name"]
+ cfg_file = cfg_path.name
+ scan_tgt = "./tests/synthetic_corpus/1_happy"
+ test_script_lines += [
+ f'echo "--- Testing: {name_val} ---"',
+ f"uv run python main.py --config {cfg_file} --scan --target {scan_tgt} 2>&1 | head -20",
+ "RC=$?; if [ $RC -ne 0 ]; then"
+ f' echo "EXPECTED FAILURE (rc=$RC): {name_val} -- OK"; PASS=$((PASS+1));'
+ f' else echo "UNEXPECTED SUCCESS: {name_val} -- REVIEW"; FAIL=$((FAIL+1)); fi',
+ "",
+ ]
+
+ test_script_lines += [
+ 'echo ""',
+ 'echo "Results: PASS=$PASS FAIL=$FAIL SKIP=$SKIP"',
+ 'echo "(PASS = expected failure triggered correctly)"',
+ ]
+
+ _w(out / "run_error_tests.sh", "\n".join(test_script_lines))
+
+ _w(
+ out / "EXPECTED.txt",
+ "CENARIO 9 -- CONFIG ERRORS\n"
+ "Objetivo: avaliar qualidade das mensagens de erro e recomendacoes de troubleshooting.\n"
+ "Cada config_*.yaml e proposital e incorreto.\n\n"
+ "Para cada caso, avaliar:\n"
+ " [ ] Mensagem de erro e clara e actionable?\n"
+ " [ ] Exit code nao-zero (distingue erro de sucesso)?\n"
+ " [ ] Dashboard mostra recomendacao de troubleshooting?\n"
+ " [ ] Nenhum stacktrace interno exposto para usuario final?\n"
+ " [ ] Log tem nivel correto (ERROR vs WARNING vs INFO)?\n\n"
+ "Score qualitativo (1-5 por caso):\n"
+ " 5 = mensagem clara, troubleshoot acionavel, sem stacktrace, exit code correto\n"
+ " 1 = crash sem mensagem, stacktrace exposto, exit 0 em erro\n",
+ )
+ print(f" v Scenario 9 (config_errors) -> {out} ({len(configs)} configs)")
+
+
+# ---------------------------------------------------------------------------
+_SCENARIO_MAP: dict[str, Callable[[Path], None]] = {
+ "happy": gen_scenario_1,
+ "unhappy": gen_scenario_2,
+ "catastrophic": gen_scenario_3,
+ "false_positive": gen_scenario_4,
+ "manual_review": gen_scenario_5,
+ "stego": gen_scenario_6,
+ "extensions": gen_all_extensions,
+ "stress_load": gen_stress_load,
+ "config_errors": gen_config_errors,
+}
+ALL_SCENARIOS = list(_SCENARIO_MAP)
+
+
+def generate_corpus(base: Path, scenarios: list[str] | None = None) -> Path:
+ """Generate synthetic POC files under ``base``; return ``base``."""
+ base = Path(base)
+ base.mkdir(parents=True, exist_ok=True)
+ selected = scenarios or list(ALL_SCENARIOS)
+ unknown = [s for s in selected if s not in _SCENARIO_MAP]
+ if unknown:
+ raise ValueError(f"Unknown scenarios: {unknown}")
+ for name in selected:
+ _SCENARIO_MAP[name](base)
+ manifest = {
+ "generated_by": "core.demo.synthetic_corpus",
+ "scenarios": {
+ name: EXPECTED.get(f"{i + 1}_{name}", "see EXPECTED.txt")
+ for i, name in enumerate(ALL_SCENARIOS)
+ },
+ "note": "All PII is synthetic -- generated for testing only. Not real individuals.",
+ }
+ (base / "CORPUS_MANIFEST.json").write_text(
+ json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
+ )
+ return base
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(
+ description="Generate synthetic POC corpus for Data Boar."
+ )
+ parser.add_argument(
+ "--output",
+ default="tests/synthetic_corpus",
+ help="Output directory (default: tests/synthetic_corpus)",
+ )
+ parser.add_argument(
+ "--scenario",
+ default=",".join(ALL_SCENARIOS),
+ help=f"Comma-separated scenarios. Options: {', '.join(ALL_SCENARIOS)}",
+ )
+ args = parser.parse_args()
+
+ base = Path(args.output)
+ selected = [s.strip() for s in args.scenario.split(",")]
+ unknown = [s for s in selected if s not in _SCENARIO_MAP]
+ if unknown:
+ parser.error(f"Unknown scenarios: {unknown}")
+
+ print("\nData Boar -- Synthetic POC Corpus Generator")
+ print(f"Output: {base.resolve()}")
+ print(f"Scenarios: {selected}\n")
+
+ generate_corpus(base, selected)
+
+ print(f"\nManifest -> {base / 'CORPUS_MANIFEST.json'}")
+ print("Next: uv run python main.py --demo")
+ print(" Or: data-boar --demo (zero-config dashboard on loopback).")
+ print(" Compare findings against EXPECTED.txt in each sub-folder.")
+ print(" See docs/TESTING_POC_GUIDE.md for the full validation checklist.\n")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/docs/data_boar.1 b/docs/data_boar.1
index 0058c743..3403c999 100644
--- a/docs/data_boar.1
+++ b/docs/data_boar.1
@@ -7,6 +7,9 @@ data-boar, data_boar, lgpd_crawler \- enterprise data discovery and risk governa
.SH SYNOPSIS
.B data-boar
[
+.B \-\-demo
+]
+[
.B \-\-config
.I FILE
]
@@ -103,6 +106,16 @@ and
.
.SH OPTIONS
.TP
+.B \-\-demo
+Zero\-config demonstration mode: generate a synthetic filesystem corpus in a temporary directory, run an initial scan, and start the dashboard on loopback (\fB127.0.0.1\fR) with plaintext HTTP. Does not require
+.BR \-\-config .
+Implies
+.BR \-\-web
+and
+.BR \-\-allow\-insecure\-http .
+Temporary files are removed when the process exits.
+.
+.TP
.BI "\-\-config " FILE
Path to the configuration file (YAML or JSON).
Defaults to
@@ -573,6 +586,14 @@ Without TLS certificate and key paths, you must pass
in the configuration) for plaintext HTTP.
.
.TP
+Zero\-config demo (no configuration file):
+.RS
+.B data-boar \-\-demo
+.br
+.B python main.py \-\-demo
+.RE
+.
+.TP
Start the API on port 8088 (plaintext, explicit opt\-in):
.RS
.B python main.py \-\-config config.yaml \-\-web \-\-allow\-insecure\-http \-\-port 8088
diff --git a/docs/plans/PLANS_HUB.md b/docs/plans/PLANS_HUB.md
index 1f743f98..b0336ea5 100644
--- a/docs/plans/PLANS_HUB.md
+++ b/docs/plans/PLANS_HUB.md
@@ -49,6 +49,7 @@ Do **not** edit the table manually; refresh with `python scripts/plans_hub_sync.
| **Open** | [PLAN_ADR_GOVERNANCE_LIFECYCLE.md](PLAN_ADR_GOVERNANCE_LIFECYCLE.md) | Plan: ADR governance lifecycle (ADR 0045 amendment) | UMADR constitution — append-only Status history, Obsolete/Quarantined/Duplicate statuses, en_US ADRs; GitHub #803 | — |
| **Open** | [PLAN_BUILD_IDENTITY_RELEASE_INTEGRITY.md](PLAN_BUILD_IDENTITY_RELEASE_INTEGRITY.md) | Plan: Build identity, runtime version display, and release integrity | **Status:** In progress — Phase E core landed (#856): SQLite integrity anchor (`core/integrity_anchor.py`), startup re-verify in any mode, TINTED/`-alpha` trust surfaces, `integrity_events`, open-mode worker clamp. Signe | — |
| **Open** | [PLAN_CLAIMS_CONSISTENCY_AND_ANTI_OVERCLAIM.md](PLAN_CLAIMS_CONSISTENCY_AND_ANTI_OVERCLAIM.md) | PLAN: Claims consistency and anti-overclaim gate | gate determinístico offline anti-overclaim — invariante connector↔tier (build-time do #854) + manifesto docs/CLAIMS.yml com backed_by verificável; contraparte light do auditor on-demand claim-audit (lab-op) | [PLAN_CONNECTOR_TIER_GATING.md](PLAN_CONNECTOR_TIER_GATING.md) [PLAN_PRODUCT_TIERS_AND_OPEN_CORE.md](PLAN_PRODUCT_TIERS_AND_OPEN_CORE.md) |
+| **Open** | [PLAN_CLI_DEMO_SUBCOMMAND.md](PLAN_CLI_DEMO_SUBCOMMAND.md) | PLAN: CLI `--demo` subcommand (#1113) | **Status:** In progress **Issue:** [#1113](https://github.com/DataBoar/data-boar/issues/1113) | — |
| **Open** | [PLAN_CLOJURE_AUGMENTATION.md](PLAN_CLOJURE_AUGMENTATION.md) | Plan: Clojure/Lisp augmentation feasibility for Data Boar | Evaluate whether a Clojure sidecar adds measurable value for policy logic and temporal evidence without regressing Rust/Python baseline. | [PLAN_LATO_SENSU_THESIS.md](PLAN_LATO_SENSU_THESIS.md) [PLAN_STRICTO_SENSU_RESEARCH_PATH.md](PLAN_STRICTO_SENSU_RESEARCH_PATH.md) [PLAN_NEXT_WAVE_PLATFORM_AND_GTM.md](PLAN_NEXT_WAVE_PLATFORM_AND_GTM.md) |
| **Open** | [PLAN_CLOJURE_AUGMENTATION.pt_BR.md](PLAN_CLOJURE_AUGMENTATION.pt_BR.md) | Plano: viabilidade de augmentação Clojure/Lisp no Data Boar | Avaliar se um sidecar em Clojure agrega valor mensurável para lógica de políticas e evidência temporal sem regredir a base Rust/Python. | [PLAN_LATO_SENSU_THESIS.md](PLAN_LATO_SENSU_THESIS.md) [PLAN_STRICTO_SENSU_RESEARCH_PATH.md](PLAN_STRICTO_SENSU_RESEARCH_PATH.md) [PLAN_NEXT_WAVE_PLATFORM_AND_GTM.md](PLAN_NEXT_WAVE_PLATFORM_AND_GTM.md) |
| **Open** | [PLAN_COMPLIANCE_EVIDENCE_MAPPING.md](PLAN_COMPLIANCE_EVIDENCE_MAPPING.md) | Plan: Compliance evidence mapping – regulations to app features and reports | Remember **where** the product can **honestly** help (inventory, metadata-only findings, config-led labels) versus **what** requires **specialist tools**, **certified cryptography**, or **legal/sector counsel**. This sec | — |
diff --git a/docs/plans/PLAN_CLI_DEMO_SUBCOMMAND.md b/docs/plans/PLAN_CLI_DEMO_SUBCOMMAND.md
new file mode 100644
index 00000000..18a0ad87
--- /dev/null
+++ b/docs/plans/PLAN_CLI_DEMO_SUBCOMMAND.md
@@ -0,0 +1,31 @@
+# PLAN: CLI `--demo` subcommand (#1113)
+
+**Status:** In progress
+**Issue:** [#1113](https://github.com/DataBoar/data-boar/issues/1113)
+
+## Goal
+
+Turnkey `data-boar --demo` for Windows operators (Estela): zero-config synthetic corpus, initial scan, loopback dashboard on port 8088.
+
+## Scope
+
+| Item | Status |
+| ---- | ------ |
+| `core/demo/synthetic_corpus.py` (installable generator) | Done |
+| `core/demo/runtime.py` (workspace + atexit) | Done |
+| `main.py --demo` | Done |
+| `scripts/demo.sh` thin wrapper | Done |
+| Excel praise sheet sanitization | Done |
+| Tests (`test_cli_demo`, excel sheet) | Done |
+| QUICKSTART / README / operator help | Done |
+
+## Steering (locked)
+
+- **Cleanup:** single owner — `atexit` for `main.py --demo`; bash `trap` + `register_cleanup=False` for `demo.sh --headless`.
+- **Loopback:** `--demo` forces `127.0.0.1` bind.
+- **Excel:** `_SHEET_PRAISE_CONTROLS` sanitizes `/` in sheet title; headless test expects `returncode == 0`.
+
+## Follow-up
+
+- PyPI publish after PR merge (operator).
+- #1112 Windows quickstart docs alignment after land.
diff --git a/main.py b/main.py
index e5abb3a0..3e1e1e8e 100644
--- a/main.py
+++ b/main.py
@@ -310,6 +310,10 @@ def main() -> None:
" python main.py --config config.yaml --web --allow-insecure-http --port 9090\n"
" python main.py --config config.yaml --web --allow-insecure-http --host 0.0.0.0\n"
"\n"
+ " # Zero-config demo (synthetic corpus, loopback dashboard — no config.yaml)\n"
+ " python main.py --demo\n"
+ " data-boar --demo\n"
+ "\n"
"Once a one-shot scan finishes, an Excel report and heatmap PNG are written under\n"
"the configured report.output_dir (default: current directory). When the API is\n"
"running, you can navigate to the documented endpoints (see README.md) to trigger\n"
@@ -323,6 +327,16 @@ def main() -> None:
version=_cli_public_version_line(),
help="Show the public product version and exit (no scan or API startup).",
)
+ parser.add_argument(
+ "--demo",
+ action="store_true",
+ help=(
+ "Zero-config demo: generate a synthetic filesystem corpus in a temp directory, "
+ "run an initial scan, and start the dashboard on loopback (127.0.0.1) with "
+ "plaintext HTTP (--allow-insecure-http). Does not require --config. "
+ "Temp files are removed when the process exits."
+ ),
+ )
parser.add_argument(
"--config",
default="config.yaml",
@@ -529,6 +543,41 @@ def main() -> None:
)
args = parser.parse_args()
+ demo_mode = bool(getattr(args, "demo", False))
+ demo_dir: Path | None = None
+
+ if demo_mode:
+ demo_incompatible = (
+ args.validate_config
+ or args.reset_data
+ or args.export_audit_trail is not None
+ or args.export_dsar is not None
+ or args.diff_sessions
+ )
+ if demo_incompatible:
+ print(
+ "Cannot combine --demo with --validate-config, --reset-data, "
+ "--export-audit-trail, --export-dsar, or --diff.",
+ file=sys.stderr,
+ )
+ sys.exit(2)
+ from core.demo.runtime import prepare_demo_workspace, print_demo_banner
+
+ demo_dir, config_path, _preloaded = prepare_demo_workspace(
+ port=args.port,
+ register_cleanup=True,
+ )
+ args.config = str(config_path)
+ args.web = True
+ args.allow_insecure_http = True
+ if args.host and args.host not in ("127.0.0.1", "localhost", "::1"):
+ print(
+ f"[demo] Ignoring --host {args.host!r}; demo binds loopback only.",
+ file=sys.stderr,
+ )
+ args.host = "127.0.0.1"
+ print_demo_banner(args.port, demo_dir)
+
if args.validate_config and (
args.web
or args.reset_data
@@ -582,6 +631,11 @@ def main() -> None:
config = load_config(args.config)
except FileNotFoundError as e:
print(f"Config not found: {e}")
+ if not demo_mode:
+ print(
+ "Tip: run `data-boar --demo` for a zero-config synthetic demo "
+ "(no config.yaml required)."
+ )
print("Probable cause: The config file path is wrong or the file was moved.")
print(
"What to do: Check the path, use --config to point to your YAML/JSON, or create config.yaml in the current directory."
@@ -693,6 +747,28 @@ def main() -> None:
return
if args.web and not args.reset_data:
+ if demo_mode:
+ from core.validation import sanitize_tenant_technician
+
+ engine = AuditEngine(config)
+ try:
+ _emit_runtime_trust_info(runtime_trust, to_stdout=True, to_stderr=True)
+ tenant = sanitize_tenant_technician(args.tenant)
+ technician = sanitize_tenant_technician(args.technician)
+ session_id = engine.start_audit(
+ tenant_name=tenant,
+ technician_name=technician,
+ jurisdiction_hint=bool(args.jurisdiction_hint),
+ )
+ print(f"[demo] Scan session: {session_id}")
+ report_path = engine.generate_final_reports(session_id)
+ if report_path:
+ print(f"[demo] Report written: {report_path}")
+ else:
+ print("[demo] No findings to report.")
+ finally:
+ engine.db_manager.dispose()
+
_emit_runtime_trust_info(runtime_trust, to_stdout=True, to_stderr=True)
import uvicorn
from api.routes import app
@@ -707,6 +783,9 @@ def main() -> None:
)
api_cfg = config.get("api", {})
+ if demo_mode:
+ api_cfg = {**api_cfg, "host": "127.0.0.1", "allow_insecure_http": True}
+ config["api"] = api_cfg
if bool(api_cfg.get("require_api_key")) and not effective_api_key_configured(
api_cfg
):
diff --git a/report/generator.py b/report/generator.py
index 4583d11a..9adb5e7a 100644
--- a/report/generator.py
+++ b/report/generator.py
@@ -268,6 +268,7 @@ def _create_heatmap(
_SHEET_DATA_SOURCE_INVENTORY = "Data source inventory"
# LOW findings persisted for ID-like column names (FN reduction); see core.suggested_review
_SHEET_SUGGESTED_REVIEW = "Suggested review (LOW)"
+_SHEET_PRAISE_CONTROLS = _excel_safe_sheet_title("Praise / existing controls")
_REPORT_INFO_CNPJ_FORMAT_COMPAT = "CNPJ format compatibility"
@@ -1042,7 +1043,7 @@ def _write_excel_sheets(
praise = _praise_rows(db_rows_for_sheets, fs_rows_for_sheets)
if praise:
_excel_safe_dataframe(praise).to_excel(
- writer, sheet_name="Praise / existing controls", index=False
+ writer, sheet_name=_SHEET_PRAISE_CONTROLS, index=False
)
trends = _trends_rows(
db_manager, session_id, current_db, current_fs, current_fail, current_started_at
diff --git a/scripts/demo.sh b/scripts/demo.sh
index ab1cb1b3..7096b38b 100755
--- a/scripts/demo.sh
+++ b/scripts/demo.sh
@@ -1,22 +1,16 @@
#!/usr/bin/env bash
-# scripts/demo.sh — zero-config demo entrypoint for Data Boar (#834)
+# scripts/demo.sh — thin wrapper for ``data-boar --demo`` (#834, #1113)
#
# Usage:
-# ./scripts/demo.sh # generates corpus, starts dashboard
-# ./scripts/demo.sh --no-web # generates corpus only (no dashboard)
-# ./scripts/demo.sh --headless # generates corpus + runs CLI scan (non-interactive)
-#
-# No real data required. All synthetic files are written to /tmp/data_boar_demo/
-# and cleaned up on exit (Ctrl+C).
+# ./scripts/demo.sh # dashboard (default)
+# ./scripts/demo.sh --no-web # corpus + config only (headless scan, then exit)
+# ./scripts/demo.sh --headless # alias for --no-web
#
# Docker variant (no local Python needed):
# docker run --rm -p 8088:8088 fabioleitao/data_boar:latest demo
-# (passes "demo" arg → container runs this script via entrypoint)
set -euo pipefail
-DEMO_DIR="${TMPDIR:-/tmp}/data_boar_demo"
-CONFIG_FILE="$DEMO_DIR/demo.config.yaml"
PORT="${DATA_BOAR_DEMO_PORT:-8088}"
NO_WEB=false
HEADLESS=false
@@ -26,73 +20,49 @@ for arg in "$@"; do
--no-web) NO_WEB=true ;;
--headless) HEADLESS=true; NO_WEB=true ;;
--help|-h)
- grep '^#' "$0" | head -15 | sed 's/^# \?//'
+ grep '^#' "$0" | head -18 | sed 's/^# \?//'
exit 0
;;
esac
done
-cleanup() {
- echo ""
- echo "[demo] Limpando $DEMO_DIR ..."
- rm -rf "$DEMO_DIR"
- echo "[demo] Pronto. Até logo!"
-}
-trap cleanup EXIT INT TERM
-
-echo ""
-echo "╔══════════════════════════════════════════════════════════╗"
-echo "║ Data Boar — Demo (corpus sintético, zero dados reais) ║"
-echo "╚══════════════════════════════════════════════════════════╝"
-echo ""
-
-# 1. Gera corpus sintético
-echo "[demo] Gerando corpus sintético em $DEMO_DIR/corpus ..."
-mkdir -p "$DEMO_DIR/corpus"
-uv run python scripts/generate_synthetic_poc_corpus.py \
- --output "$DEMO_DIR/corpus" \
- --scenario "happy,unhappy,false_positive"
-echo "[demo] Corpus gerado com sucesso."
-echo ""
+REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
+cd "$REPO_ROOT"
-# 2. Gera config mínimo apontando para o corpus
-cat > "$CONFIG_FILE" < str:
- return lst[i % len(lst)]
-
-
-def _w(path: Path, content: str | bytes, enc: str = "utf-8") -> None:
- path.parent.mkdir(parents=True, exist_ok=True)
- if isinstance(content, bytes):
- path.write_bytes(content)
- else:
- path.write_text(content, encoding=enc)
-
-
-# ---------------------------------------------------------------------------
-# Scenario 1 - Happy path
-# ---------------------------------------------------------------------------
-def gen_scenario_1(base: Path) -> None:
- out = base / "1_happy"
-
- _w(
- out / "employees.txt",
- textwrap.dedent(f"""\
- RELATORIO DE FUNCIONARIOS -- FICTICIO -- APENAS PARA TESTES POC
- Nome: {_p(_NAMES, 0)} CPF: {_p(_CPFS, 0)} RG: {_p(_RGS, 0)}
- Email: {_p(_EMAILS, 0)} Tel: {_p(_PHONES, 0)}
- Nasc: {_p(_DATES, 0)} End: {_p(_ADDRS, 0)}
- Nome: {_p(_NAMES, 1)} CPF: {_p(_CPFS, 1)} CNPJ: {_p(_CNPJS, 0)}
- """),
- )
-
- buf = io.StringIO()
- csv.writer(buf).writerows(
- [["nome", "cpf", "rg", "email"]]
- + [[_p(_NAMES, i), _p(_CPFS, i), _p(_RGS, i), _p(_EMAILS, i)] for i in range(5)]
- )
- _w(out / "employees.csv", buf.getvalue())
-
- _w(
- out / "employees.json",
- json.dumps(
- [
- {"nome": _p(_NAMES, i), "cpf": _p(_CPFS, i), "rg": _p(_RGS, i)}
- for i in range(4)
- ],
- ensure_ascii=False,
- indent=2,
- ),
- )
-
- try:
- from reportlab.pdfgen import canvas as rc
-
- c = rc.Canvas(str(out / "employees.pdf"))
- y = 780
- c.drawString(50, y, "DADOS FICTICIOS -- POC Data Boar")
- y -= 20
- for i in range(3):
- for lbl, v in [
- ("Nome", _p(_NAMES, i)),
- ("CPF", _p(_CPFS, i)),
- ("Email", _p(_EMAILS, i)),
- ]:
- c.drawString(50, y, f"{lbl}: {v}")
- y -= 15
- c.save()
- except ImportError:
- _w(out / "employees_pdf_fallback.txt", f"PDF nao gerado. CPF: {_p(_CPFS, 0)}")
-
- try:
- import docx as _d
-
- doc = _d.Document()
- doc.add_heading("Dados Ficticios POC", 0)
- for i in range(3):
- doc.add_paragraph(
- f"Nome: {_p(_NAMES, i)}\nCPF: {_p(_CPFS, i)}\nRG: {_p(_RGS, i)}\n"
- )
- doc.save(str(out / "employees.docx"))
- except ImportError:
- _w(out / "employees_docx_fallback.txt", f"DOCX nao gerado. CPF: {_p(_CPFS, 1)}")
-
- try:
- import openpyxl
-
- wb = openpyxl.Workbook()
- ws = wb.active
- ws.title = "Funcionarios"
- ws.append(["Nome", "CPF", "RG", "Email", "Tel"])
- for i in range(5):
- ws.append(
- [
- _p(_NAMES, i),
- _p(_CPFS, i),
- _p(_RGS, i),
- _p(_EMAILS, i),
- _p(_PHONES, i),
- ]
- )
- wb.save(str(out / "employees.xlsx"))
- except ImportError:
- _w(out / "employees_xlsx_fallback.txt", f"XLSX nao gerado. CPF: {_p(_CPFS, 2)}")
-
- conn = sqlite3.connect(str(out / "employees.db"))
- conn.execute(
- "CREATE TABLE IF NOT EXISTS emp (id INTEGER PRIMARY KEY,nome TEXT,cpf TEXT,rg TEXT,email TEXT)"
- )
- for i in range(5):
- conn.execute(
- "INSERT INTO emp(nome,cpf,rg,email) VALUES(?,?,?,?)",
- (_p(_NAMES, i), _p(_CPFS, i), _p(_RGS, i), _p(_EMAILS, i)),
- )
- conn.commit()
- conn.close()
-
- try:
- from PIL import Image, ImageDraw
-
- img = Image.new("RGB", (400, 120), (255, 255, 255))
- draw = ImageDraw.Draw(img)
- draw.text((10, 20), f"CPF: {_p(_CPFS, 0)}", (0, 0, 0))
- draw.text((10, 50), f"Nome: {_p(_NAMES, 0)}", (0, 0, 0))
- img.save(str(out / "id_card_visible.png"))
- except ImportError:
- _w(out / "image_fallback.txt", f"PNG nao gerado. CPF: {_p(_CPFS, 0)}")
-
- _w(out / "EXPECTED.txt", EXPECTED["1_happy"])
- print(f" v Scenario 1 (happy) -> {out}")
-
-
-# ---------------------------------------------------------------------------
-# Scenario 2 - Unhappy path
-# ---------------------------------------------------------------------------
-def gen_scenario_2(base: Path) -> None:
- out = base / "2_unhappy"
- import base64 as _b64
-
- _w(
- out / "ocr_noisy.txt",
- f"N0me: {_p(_NAMES, 2).replace('a', '@').replace('e', '3')}\n"
- f"CPF: {_p(_CPFS, 2).replace('.', ',')} (possivel ruido OCR)\n"
- f"RG: {_p(_RGS, 2).replace('-', '_')}\n"
- f"Email: {_p(_EMAILS, 2).replace('@', '[at]')}\n",
- )
-
- _w(
- out / "latin1_encoded.txt",
- f"Nome: {_p(_NAMES, 0)}\nCPF: {_p(_CPFS, 0)}\nObservacao: dado em latin-1\n",
- enc="latin-1",
- )
-
- _w(
- out / "bom_utf8.csv",
- f"\ufeffNome;CPF;RG\n{_p(_NAMES, 1)};{_p(_CPFS, 1)};{_p(_RGS, 1)}\n",
- enc="utf-8-sig",
- )
-
- _w(
- out / "crlf_endings.txt",
- f"CPF: {_p(_CPFS, 3)}\r\nTel: {_p(_PHONES, 0)}\r\nEnd: {_p(_ADDRS, 0)}\r\n",
- )
-
- _w(
- out / "partial_redaction.txt",
- f"Nome: {_p(_NAMES, 0)}\n"
- f"CPF: ***.{_p(_CPFS, 0)[4:7]}.***-** (parcialmente redactado)\n"
- f"Email: {_p(_EMAILS, 0)}\nRG: {_p(_RGS, 0)}\n",
- )
-
- blob = _b64.b64encode(f"CPF:{_p(_CPFS, 1)},Nome:{_p(_NAMES, 1)}".encode()).decode()
- _w(out / "base64_embedded.txt", f"campo_documento: {blob}\n# dado acima e base64\n")
-
- _w(out / "EXPECTED.txt", EXPECTED["2_unhappy"])
- print(f" v Scenario 2 (unhappy) -> {out}")
-
-
-# ---------------------------------------------------------------------------
-# Scenario 3 - Catastrophic
-# ---------------------------------------------------------------------------
-def gen_scenario_3(base: Path) -> None:
- out = base / "3_catastrophic"
- out.mkdir(parents=True, exist_ok=True)
- pii = f"DADOS FICTICIOS\nCPF: {_p(_CPFS, 4)}\nNome: {_p(_NAMES, 4)}\nRG: {_p(_RGS, 2)}\n".encode()
-
- # nested zip
- inner_buf = io.BytesIO()
- with zipfile.ZipFile(inner_buf, "w", zipfile.ZIP_DEFLATED) as z:
- z.writestr("pii.txt", pii)
- with zipfile.ZipFile(out / "nested.zip", "w", zipfile.ZIP_DEFLATED) as z:
- z.writestr("inner.zip", inner_buf.getvalue())
-
- # password-protected zip
- with zipfile.ZipFile(out / "password_protected.zip", "w", zipfile.ZIP_STORED) as z:
- z.setpassword(b"poc-test-123")
- z.writestr("secret.txt", pii)
-
- # tar.gz
- with tarfile.open(str(out / "archive.tar.gz"), "w:gz") as t:
- info = tarfile.TarInfo("pii.txt")
- info.size = len(pii)
- t.addfile(info, io.BytesIO(pii))
-
- # tar.bz2
- with tarfile.open(str(out / "archive.tar.bz2"), "w:bz2") as t:
- info = tarfile.TarInfo("pii.txt")
- info.size = len(pii)
- t.addfile(info, io.BytesIO(pii))
-
- # disguised extension (text file named .jpg)
- _w(
- out / "report_2026.jpg",
- pii.decode() + "\n# Arquivo de texto mascarado como .jpg\n",
- )
-
- # very long line stress test
- _w(
- out / "long_line_stress.txt",
- "x" * 5000 + f" CPF: {_p(_CPFS, 0)} " + "y" * 5000 + "\n",
- )
-
- _w(out / "EXPECTED.txt", EXPECTED["3_catastrophic"])
- _w(
- out / "PASSWORD_HINT.txt",
- "Senha: poc-test-123\nConfiguracao: zip_password no config.yaml\n",
- )
- print(f" v Scenario 3 (catastrophic) -> {out}")
-
-
-# ---------------------------------------------------------------------------
-# Scenario 4 - False positive pressure
-# ---------------------------------------------------------------------------
-def gen_scenario_4(base: Path) -> None:
- import random as _r
-
- out = base / "4_false_positive"
-
- def _invalid_cpf_shaped() -> str:
- d = [_r.randint(0, 9) for _ in range(9)]
- return f"{d[0]}{d[1]}{d[2]}.{d[3]}{d[4]}{d[5]}.{d[6]}{d[7]}{d[8]}-{(d[0] + 1) % 10}{(d[1] + 1) % 10}"
-
- _w(
- out / "serial_numbers.txt",
- "CATALOGO -- FICÇÃO\n"
- + "\n".join(f"Serial: {_invalid_cpf_shaped()}" for _ in range(10)),
- )
- _w(out / "cnpj_shaped_refs.txt", "Ref: 00.111.222/0099-00\n" * 5) # invalid CNPJ
- _w(
- out / "random_codes.txt",
- "\n".join(f"Cod: {_r.randint(10000000000, 99999999999)}" for _ in range(20)),
- )
- _w(
- out / "ip_addresses.txt",
- "\n".join(f"IP: 10.0.{i}.{j}" for i in range(5) for j in range(5)),
- )
- _w(
- out / "version_strings.txt",
- "\n".join(f"v: 1.{i}.{i + 1}-{i + 2}" for i in range(10)),
- )
-
- _w(out / "EXPECTED.txt", EXPECTED["4_false_positive"])
- print(f" v Scenario 4 (false_positive) -> {out}")
-
-
-# ---------------------------------------------------------------------------
-# Scenario 5 - Manual review triggers
-# ---------------------------------------------------------------------------
-def gen_scenario_5(base: Path) -> None:
- out = base / "5_manual_review"
- _w(
- out / "masked_pii.txt",
- textwrap.dedent("""\
- CPF: ***.456.789-** (mascarado -- padrao parcial visivel)
- CPF: 123.***.***-09 (mascarado -- inicio e fim visiveis)
- RG: 12.345.***-*
- Email: a***.s***@example.com
- Tel: (11) 9****-0001
- Nome: Ana P. S. (iniciais -- identificacao possivel com contexto)
- """),
- )
- _w(
- out / "pii_in_prose.txt",
- textwrap.dedent("""\
- O documento de CPF terminado em 09 foi verificado.
- O numero de registro e 123456789 (sem pontuacao -- validacao manual necessaria).
- O titular nasceu em quinze de marco de 1985.
- """),
- )
- _w(
- out / "foreign_pii.txt",
- textwrap.dedent("""\
- DNI: 12345678A (Espanha -- nao e CPF brasileiro)
- SSN: 123-45-6789 (EUA -- nao e CPF)
- NIF: X1234567L (Espanha -- estrangeiro)
- """),
- )
- _w(
- out / "anonymized_columns.csv",
- "cpf,nome,email\n[ANONIMIZADO],[ANONIMIZADO],[ANONIMIZADO]\n" * 5,
- )
-
- _w(out / "EXPECTED.txt", EXPECTED["5_manual_review"])
- print(f" v Scenario 5 (manual_review) -> {out}")
-
-
-# ---------------------------------------------------------------------------
-# Scenario 6 - Steganography (LSB + EXIF metadata)
-# ---------------------------------------------------------------------------
-def _embed_lsb(img_path: Path, secret: str) -> None:
- from PIL import Image
-
- img = Image.new("RGB", (200, 200), (200, 200, 200))
- pixels = list(img.getdata())
- bits = "".join(f"{ord(c):08b}" for c in secret) + "00000000"
- new_pixels = []
- for i, (r, g, b) in enumerate(pixels):
- if i < len(bits):
- r = (r & 0xFE) | int(bits[i])
- new_pixels.append((r, g, b))
- out_img = Image.new("RGB", (200, 200))
- out_img.putdata(new_pixels)
- out_img.save(str(img_path), format="PNG")
-
-
-def _extract_lsb(img_path: Path) -> str:
- from PIL import Image
-
- pixels = list(Image.open(str(img_path)).getdata())
- bits = [str(r & 1) for r, g, b in pixels]
- chars = []
- for i in range(0, len(bits) - 8, 8):
- c = chr(int("".join(bits[i : i + 8]), 2))
- if c == "\x00":
- break
- chars.append(c)
- return "".join(chars)
-
-
-def gen_scenario_6(base: Path) -> None:
- out = base / "6_stego"
- out.mkdir(parents=True, exist_ok=True)
- try:
- from PIL import Image
- from PIL.PngImagePlugin import PngInfo
-
- secret = f"CPF:{_p(_CPFS, 0)};Nome:{_p(_NAMES, 0)}"
- stego_path = out / "innocent_photo.png"
- _embed_lsb(stego_path, secret)
- recovered = _extract_lsb(stego_path)
- assert recovered == secret, f"LSB mismatch: {recovered!r}"
-
- _w(
- out / "STEGO_KEY.txt",
- f"Arquivo: innocent_photo.png\n"
- f"Dado oculto (LSB canal R): {secret}\n"
- f"Metodo: LSB -- canal R da imagem PNG (1 bit por pixel)\n"
- f"Para extrair manualmente: use stegosuite, steghide, ou a funcao _extract_lsb() neste script.\n"
- f"Verificacao OK: dado recuperado = {recovered!r}\n",
- )
-
- # EXIF / PNG metadata injection
- img = Image.new("RGB", (200, 200), (180, 200, 220))
- meta = PngInfo()
- meta.add_text("Comment", f"CPF:{_p(_CPFS, 1)} Nome:{_p(_NAMES, 1)}")
- meta.add_text("Author", _p(_NAMES, 1))
- img.save(str(out / "photo_with_exif_pii.png"), pnginfo=meta)
-
- _w(
- out / "EXPECTED.txt",
- EXPECTED["6_stego"] + "\n\n"
- "VALIDACAO MANUAL:\n"
- "1. innocent_photo.png -- CPF em LSB. Scanner padrao NAO detecta.\n"
- ' Extrair: uv run python -c "'
- "from scripts.generate_synthetic_poc_corpus import _extract_lsb;"
- "from pathlib import Path; print(_extract_lsb(Path('tests/synthetic_corpus/6_stego/innocent_photo.png')))\"\n"
- "2. photo_with_exif_pii.png -- CPF em metadado PNG (Comment). Scanner PODE detectar se ler metadata.\n",
- )
-
- print(f" v Scenario 6 (stego) -> {out} [LSB OK, recovered={recovered!r}]")
-
- except ImportError:
- _w(
- out / "EXPECTED.txt",
- "Pillow nao disponivel -- cenario 6 nao gerado.\nInstale: pip install pillow\n"
- + EXPECTED["6_stego"],
- )
- print(
- " ! Scenario 6 (stego) -> Pillow indisponivel, documentado sem gerar imagem"
- )
-
-
-# ---------------------------------------------------------------------------
-# Scenario 7 - Extension coverage (one file per supported extension)
-# ---------------------------------------------------------------------------
-def gen_all_extensions(base: Path) -> None:
- out = base / "7_extensions"
- out.mkdir(parents=True, exist_ok=True)
- pii = f"CPF: {_p(_CPFS, 0)}\nNome: {_p(_NAMES, 0)}\n"
- pii_b = pii.encode()
-
- for ext in [
- ".txt",
- ".log",
- ".md",
- ".rst",
- ".cfg",
- ".ini",
- ".env",
- ".yml",
- ".yaml",
- ".sql",
- ]:
- _w(out / f"sample{ext}", pii)
-
- _w(
- out / "sample.json",
- json.dumps({"cpf": _p(_CPFS, 0), "nome": _p(_NAMES, 0)}, ensure_ascii=False),
- )
- _w(
- out / "sample.xml",
- f'{_p(_CPFS, 0)} {_p(_NAMES, 0)} ',
- )
- _w(out / "sample.csv", f"cpf,nome\n{_p(_CPFS, 0)},{_p(_NAMES, 0)}\n")
- _w(out / "sample.tsv", f"cpf\tnome\n{_p(_CPFS, 0)}\t{_p(_NAMES, 0)}\n")
-
- with zipfile.ZipFile(out / "sample.zip", "w") as z:
- z.writestr("pii.txt", pii)
- with tarfile.open(str(out / "sample.tar.gz"), "w:gz") as t:
- i = tarfile.TarInfo("pii.txt")
- i.size = len(pii_b)
- t.addfile(i, io.BytesIO(pii_b))
- with tarfile.open(str(out / "sample.tar.bz2"), "w:bz2") as t:
- i = tarfile.TarInfo("pii.txt")
- i.size = len(pii_b)
- t.addfile(i, io.BytesIO(pii_b))
-
- conn = sqlite3.connect(str(out / "sample.db"))
- conn.execute("CREATE TABLE t(cpf TEXT,nome TEXT)")
- conn.execute("INSERT INTO t VALUES(?,?)", (_p(_CPFS, 0), _p(_NAMES, 0)))
- conn.commit()
- conn.close()
-
- try:
- import openpyxl
-
- wb = openpyxl.Workbook()
- wb.active.append(["cpf", "nome"])
- wb.active.append([_p(_CPFS, 0), _p(_NAMES, 0)])
- wb.save(str(out / "sample.xlsx"))
- except ImportError:
- pass
- try:
- import docx as _d
-
- doc = _d.Document()
- doc.add_paragraph(pii)
- doc.save(str(out / "sample.docx"))
- except ImportError:
- pass
- try:
- from reportlab.pdfgen import canvas as rc
-
- c = rc.Canvas(str(out / "sample.pdf"))
- c.drawString(50, 750, f"CPF: {_p(_CPFS, 0)}")
- c.save()
- except ImportError:
- pass
- try:
- from PIL import Image, ImageDraw
-
- img = Image.new("RGB", (300, 80), (255, 255, 255))
- draw = ImageDraw.Draw(img)
- draw.text((10, 20), f"CPF: {_p(_CPFS, 0)}", (0, 0, 0))
- img.save(str(out / "sample.png"))
- img.save(str(out / "sample.jpg"))
- except ImportError:
- pass
-
- _w(
- out / "EXPECTED.txt",
- "Todos os arquivos contem CPF 123.456.789-09.\n"
- "O scanner DEVE encontrar em todos os formatos suportados.\n"
- "Formatos NAO encontrados = gap de cobertura para documentar.\n",
- )
- print(f" v Scenario 7 (extensions) -> {out}")
-
-
-# ---------------------------------------------------------------------------
-# Main
-
-
-# ---------------------------------------------------------------------------
-# Scenario 8 - Stress / Load (OOM, large files, high concurrency corpus)
-# ---------------------------------------------------------------------------
-def gen_stress_load(base: Path) -> None:
- """
- Generates files designed to stress the scanner:
- - Very large text file with PII scattered at known offsets
- - Many small files (directory flood)
- - Deeply nested directory tree
- - File with millions of lines (minimal PII density)
- - Binary file with PII embedded in non-printable bytes
- All are expected to be found; OOM or timeout = reportable failure.
- """
- out = base / "8_stress_load"
- out.mkdir(parents=True, exist_ok=True)
-
- # Large file: 50 MB of padding with 10 CPF instances
- large = out / "large_50mb.txt"
- chunk = "x" * 1000 + "\n"
- with large.open("w", encoding="utf-8") as f:
- for i in range(50000): # ~50 MB
- f.write(chunk)
- if i % 5000 == 0 and i > 0:
- f.write(f"CPF: {_p(_CPFS, i // 5000)}\nNome: {_p(_NAMES, i // 5000)}\n")
- print(f" -> large file: {large} ({large.stat().st_size // 1024 // 1024} MB)")
-
- # Directory flood: 500 tiny files
- flood_dir = out / "directory_flood"
- flood_dir.mkdir(exist_ok=True)
- for i in range(500):
- (flood_dir / f"file_{i:04d}.txt").write_text(
- f"ref:{i}\nCPF: {_p(_CPFS, i)}\n"
- if i % 50 == 0
- else f"ref:{i}\nnada aqui\n",
- encoding="utf-8",
- )
- print(f" -> directory flood: 500 files (10 with PII) -> {flood_dir}")
-
- # Deep nesting: 10 levels, PII at the bottom
- deep = out / "deep_nesting"
- current = deep
- for lvl in range(10):
- current = current / f"level_{lvl:02d}"
- current.mkdir(parents=True, exist_ok=True)
- (current / "hidden_pii.txt").write_text(
- f"CPF: {_p(_CPFS, 0)}\nNome: {_p(_NAMES, 0)}\n# 10 levels deep\n",
- encoding="utf-8",
- )
- print(f" -> deep nesting (10 levels): {current / 'hidden_pii.txt'}")
-
- # High line count: 1 million lines, PII on lines 100000, 500000, 999999
- million_lines = out / "million_lines.txt"
- with million_lines.open("w", encoding="utf-8") as f:
- for i in range(1_000_000):
- if i in {100_000, 500_000, 999_999}:
- f.write(f"CPF: {_p(_CPFS, i % len(_CPFS))}\n")
- else:
- f.write(f"linha {i}\n")
- print(f" -> million lines: {million_lines}")
-
- _w(
- out / "EXPECTED.txt",
- "STRESS TEST -- OBJETIVO: scanner nao deve crashar nem perder PIIs.\n"
- "Esperado: CPF encontrado em large_50mb.txt (10x), directory_flood (10 arquivos),\n"
- "deep_nesting/hidden_pii.txt, e million_lines.txt (3x).\n"
- "Falha: OOM, timeout, crash, ou PII nao encontrado.\n"
- "Metrica: tempo de scan, memoria maxima (medir com /usr/bin/time -v ou psutil).\n",
- )
-
- _w(
- out / "STRESS_TEST_COMMANDS.sh",
- "#!/bin/bash\n"
- "# Medir tempo e memoria do scan de stress\n"
- "/usr/bin/time -v uv run python main.py \\\n"
- " --config config.yaml \\\n"
- " --scan --target tests/synthetic_corpus/8_stress_load \\\n"
- " --report 2> stress_metrics.txt\n"
- "echo 'Metricas em stress_metrics.txt'\n"
- "grep -E 'Maximum resident|Elapsed|Exit code' stress_metrics.txt\n",
- )
- print(f" v Scenario 8 (stress/load) -> {out}")
-
-
-# ---------------------------------------------------------------------------
-# Scenario 9 - Config errors (intentional misconfigs for UX/error message QA)
-# ---------------------------------------------------------------------------
-def gen_config_errors(base: Path) -> None:
- """
- Generates intentionally broken config files + a test script to run each.
- The goal is NOT to scan PII — it is to evaluate:
- - Quality of error messages (stdout/stderr)
- - Dashboard troubleshooting recommendations
- - Recovery / retry behavior
- Each config has a documented EXPECTED_ERROR and TROUBLESHOOT hint.
- """
- out = base / "9_config_errors"
- out.mkdir(parents=True, exist_ok=True)
-
- configs: list[dict] = [
- {
- "name": "wrong_db_host",
- "description": "Database host does not exist (DNS failure)",
- "config": {
- "targets": [
- {
- "type": "postgresql",
- "host": "nonexistent-db.local",
- "port": 5432,
- "database": "testdb",
- "user": "admin",
- "password": "secret",
- }
- ],
- "report": {"output_dir": "./reports"},
- },
- "expected_error": "connection refused OR DNS resolution failure",
- "troubleshoot": "Verifique se o host esta acessivel (ping / nslookup). "
- "Confirme VPN ativa se DB for interno.",
- },
- {
- "name": "wrong_db_credentials",
- "description": "Valid host but wrong username/password",
- "config": {
- "targets": [
- {
- "type": "postgresql",
- "host": "localhost",
- "port": 5432,
- "database": "testdb",
- "user": "wrong_user",
- "password": "wrong_pass",
- }
- ],
- "report": {"output_dir": "./reports"},
- },
- "expected_error": "authentication failed for user 'wrong_user'",
- "troubleshoot": "Verifique as credenciais. Use variavel de ambiente DB_PASSWORD "
- "em vez de senha em texto no config.",
- },
- {
- "name": "missing_output_dir",
- "description": "Report output_dir does not exist and cannot be created",
- "config": {
- "targets": [
- {"type": "filesystem", "path": "./tests/synthetic_corpus/1_happy"}
- ],
- "report": {"output_dir": "/nonexistent/readonly/path"},
- },
- "expected_error": "permission denied OR directory not found",
- "troubleshoot": "Crie o diretorio manualmente ou aponte para um caminho gravavel. "
- "Em Docker: monte o volume correto.",
- },
- {
- "name": "invalid_target_type",
- "description": "Unknown connector type specified",
- "config": {
- "targets": [
- {"type": "oracle_xyz_invalid", "host": "localhost", "port": 1521}
- ],
- "report": {"output_dir": "./reports"},
- },
- "expected_error": "unknown connector type 'oracle_xyz_invalid'",
- "troubleshoot": "Tipos validos: postgresql, mysql, mssql, oracle, mongodb, redis, "
- "filesystem. Verifique a documentacao em docs/USAGE.md.",
- },
- {
- "name": "malformed_yaml",
- "description": "Syntactically invalid YAML config",
- "raw_content": "targets:\n - type: postgresql\n host: localhost\n bad yaml: [unclosed\n",
- "expected_error": "YAML parse error",
- "troubleshoot": "Valide o YAML em https://www.yamllint.com/ ou com: "
- "python -c \"import yaml; yaml.safe_load(open('config.yaml'))\"",
- },
- {
- "name": "missing_required_field",
- "description": "Config missing required 'targets' key",
- "config": {
- "report": {"output_dir": "./reports"},
- },
- "expected_error": "missing required field 'targets' in config",
- "troubleshoot": "Copie o config de exemplo: cp deploy/config.example.yaml config.yaml "
- "e edite os targets.",
- },
- {
- "name": "path_not_found",
- "description": "Filesystem target path does not exist",
- "config": {
- "targets": [
- {"type": "filesystem", "path": "/nonexistent/data/path/12345"}
- ],
- "report": {"output_dir": "./reports"},
- },
- "expected_error": "path '/nonexistent/data/path/12345' does not exist",
- "troubleshoot": "Confirme que o caminho existe e que o usuario tem permissao de leitura. "
- "Em Docker: monte o volume com -v /seu/caminho:/data.",
- },
- {
- "name": "api_key_wrong",
- "description": "API request with wrong X-API-Key header",
- "config": {
- "targets": [
- {"type": "filesystem", "path": "./tests/synthetic_corpus/1_happy"}
- ],
- "api": {"require_api_key": True, "api_key": "correct-key-12345"},
- "report": {"output_dir": "./reports"},
- },
- "expected_error": "HTTP 401 Unauthorized when calling API with wrong key",
- "troubleshoot": "Use X-API-Key: correct-key-12345 no header. "
- "Para testar: curl -H 'X-API-Key: wrong-key' http://localhost:8088/api/v1/scan",
- "test_curl": (
- "curl -s -o /dev/null -w '%{http_code}' "
- "-H 'X-API-Key: WRONG-KEY' http://localhost:8088/api/v1/status"
- ),
- },
- ]
-
- import yaml as _yaml # may not be available; fall back to json dump
-
- test_script_lines = [
- "#!/bin/bash",
- "# Auto-generated: test each broken config and capture exit code + output",
- "# Usage: bash 9_config_errors/run_error_tests.sh 2>&1 | tee error_test_results.txt",
- "",
- "PASS=0; FAIL=0; SKIP=0",
- "",
- ]
-
- for cfg in configs:
- cfg_path = out / f"config_{cfg['name']}.yaml"
- if "raw_content" in cfg:
- _w(cfg_path, cfg["raw_content"])
- else:
- try:
- import yaml as _yaml
-
- _w(
- cfg_path,
- _yaml.dump(
- cfg["config"], allow_unicode=True, default_flow_style=False
- ),
- )
- except ImportError:
- _w(cfg_path, json.dumps(cfg["config"], ensure_ascii=False, indent=2))
-
- doc_path = out / f"doc_{cfg['name']}.txt"
- _w(
- doc_path,
- (
- f"Config: {cfg['name']}\n"
- f"Descricao: {cfg['description']}\n"
- f"Erro esperado: {cfg['expected_error']}\n"
- f"Troubleshoot: {cfg['troubleshoot']}\n"
- + (f"Teste curl: {cfg.get('test_curl', 'N/A')}\n")
- ),
- )
-
- name_val = cfg["name"]
- cfg_file = cfg_path.name
- scan_tgt = "./tests/synthetic_corpus/1_happy"
- test_script_lines += [
- f'echo "--- Testing: {name_val} ---"',
- f"uv run python main.py --config {cfg_file} --scan --target {scan_tgt} 2>&1 | head -20",
- "RC=$?; if [ $RC -ne 0 ]; then"
- f' echo "EXPECTED FAILURE (rc=$RC): {name_val} -- OK"; PASS=$((PASS+1));'
- f' else echo "UNEXPECTED SUCCESS: {name_val} -- REVIEW"; FAIL=$((FAIL+1)); fi',
- "",
- ]
-
- test_script_lines += [
- 'echo ""',
- 'echo "Results: PASS=$PASS FAIL=$FAIL SKIP=$SKIP"',
- 'echo "(PASS = expected failure triggered correctly)"',
- ]
-
- _w(out / "run_error_tests.sh", "\n".join(test_script_lines))
-
- _w(
- out / "EXPECTED.txt",
- "CENARIO 9 -- CONFIG ERRORS\n"
- "Objetivo: avaliar qualidade das mensagens de erro e recomendacoes de troubleshooting.\n"
- "Cada config_*.yaml e proposital e incorreto.\n\n"
- "Para cada caso, avaliar:\n"
- " [ ] Mensagem de erro e clara e actionable?\n"
- " [ ] Exit code nao-zero (distingue erro de sucesso)?\n"
- " [ ] Dashboard mostra recomendacao de troubleshooting?\n"
- " [ ] Nenhum stacktrace interno exposto para usuario final?\n"
- " [ ] Log tem nivel correto (ERROR vs WARNING vs INFO)?\n\n"
- "Score qualitativo (1-5 por caso):\n"
- " 5 = mensagem clara, troubleshoot acionavel, sem stacktrace, exit code correto\n"
- " 1 = crash sem mensagem, stacktrace exposto, exit 0 em erro\n",
- )
- print(f" v Scenario 9 (config_errors) -> {out} ({len(configs)} configs)")
-
-
-# ---------------------------------------------------------------------------
-_SCENARIO_MAP: dict[str, Callable[[Path], None]] = {
- "happy": gen_scenario_1,
- "unhappy": gen_scenario_2,
- "catastrophic": gen_scenario_3,
- "false_positive": gen_scenario_4,
- "manual_review": gen_scenario_5,
- "stego": gen_scenario_6,
- "extensions": gen_all_extensions,
- "stress_load": gen_stress_load,
- "config_errors": gen_config_errors,
-}
-ALL_SCENARIOS = list(_SCENARIO_MAP)
-
-
-def main() -> None:
- parser = argparse.ArgumentParser(
- description="Generate synthetic POC corpus for Data Boar."
- )
- parser.add_argument(
- "--output",
- default="tests/synthetic_corpus",
- help="Output directory (default: tests/synthetic_corpus)",
- )
- parser.add_argument(
- "--scenario",
- default=",".join(ALL_SCENARIOS),
- help=f"Comma-separated scenarios. Options: {', '.join(ALL_SCENARIOS)}",
- )
- args = parser.parse_args()
-
- base = Path(args.output)
- base.mkdir(parents=True, exist_ok=True)
- selected = [s.strip() for s in args.scenario.split(",")]
- unknown = [s for s in selected if s not in _SCENARIO_MAP]
- if unknown:
- parser.error(f"Unknown scenarios: {unknown}")
-
- print("\nData Boar -- Synthetic POC Corpus Generator")
- print(f"Output: {base.resolve()}")
- print(f"Scenarios: {selected}\n")
-
- for name in selected:
- _SCENARIO_MAP[name](base)
-
- manifest = {
- "generated_by": "generate_synthetic_poc_corpus.py",
- "scenarios": {
- name: EXPECTED.get(f"{i + 1}_{name}", "see EXPECTED.txt")
- for i, name in enumerate(ALL_SCENARIOS)
- },
- "note": "All PII is synthetic -- generated for testing only. Not real individuals.",
- }
- (base / "CORPUS_MANIFEST.json").write_text(
- json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- print(f"\nManifest -> {base / 'CORPUS_MANIFEST.json'}")
- print(
- f"Next: uv run python main.py --config config.yaml --scan --target {base.resolve()}"
- )
- print(" Compare findings against EXPECTED.txt in each sub-folder.")
- print(" See docs/TESTING_POC_GUIDE.md for the full validation checklist.\n")
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from core.demo.synthetic_corpus import main
if __name__ == "__main__":
main()
diff --git a/tests/operator_help_sync_manifest.py b/tests/operator_help_sync_manifest.py
index dce9d243..0fc3f6ad 100644
--- a/tests/operator_help_sync_manifest.py
+++ b/tests/operator_help_sync_manifest.py
@@ -53,9 +53,16 @@ class OperatorHelpMarker:
_MAN_EXPORT_DSAR = r"\-\-export\-dsar"
_MAN_DSAR_OUTPUT = r"\-\-dsar\-output"
_MAN_DSAR_INCLUDE_SAMPLES = r"\-\-dsar\-include\-samples"
+_MAN_DEMO = r"\-\-demo"
OPERATOR_HELP_MARKERS: tuple[OperatorHelpMarker, ...] = (
OperatorHelpMarker("config", "--config", "config.yaml", _MAN_CONFIG),
+ OperatorHelpMarker(
+ "demo",
+ "--demo",
+ "--demo",
+ _MAN_DEMO,
+ ),
OperatorHelpMarker("web", "--web", "--web", _MAN_WEB),
OperatorHelpMarker("host", "--host", "--host", _MAN_HOST),
OperatorHelpMarker(
diff --git a/tests/test_cli_demo.py b/tests/test_cli_demo.py
new file mode 100644
index 00000000..b39c7c70
--- /dev/null
+++ b/tests/test_cli_demo.py
@@ -0,0 +1,93 @@
+"""CLI ``--demo`` contract (#1113)."""
+
+from __future__ import annotations
+
+import subprocess
+import sys
+from pathlib import Path
+
+import pytest
+
+REPO_ROOT = Path(__file__).resolve().parents[1]
+MAIN = REPO_ROOT / "main.py"
+
+
+def _run_demo_args(*extra: str, timeout: int = 180) -> subprocess.CompletedProcess[str]:
+ cmd = [sys.executable, str(MAIN), "--demo", *extra]
+ return subprocess.run(
+ cmd,
+ cwd=REPO_ROOT,
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ check=False,
+ )
+
+
+def test_main_py_demo_flag_in_help() -> None:
+ proc = subprocess.run(
+ [sys.executable, str(MAIN), "--help"],
+ cwd=REPO_ROOT,
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+ assert "--demo" in proc.stdout
+
+
+def test_demo_headless_scan_completes() -> None:
+ """Headless demo path must finish scan with exit 0 and write a report."""
+ proc = subprocess.run(
+ ["/bin/bash", str(REPO_ROOT / "scripts" / "demo.sh"), "--headless"],
+ cwd=REPO_ROOT,
+ capture_output=True,
+ text=True,
+ timeout=300,
+ check=False,
+ )
+ assert proc.returncode == 0, proc.stderr or proc.stdout
+ assert "Report written:" in proc.stdout or "Report written:" in proc.stderr
+
+
+def test_demo_sh_multi_step_disables_python_atexit_cleanup() -> None:
+ """Headless demo.sh must not register Python atexit cleanup (bash trap owns it)."""
+ script = (REPO_ROOT / "scripts" / "demo.sh").read_text(encoding="utf-8")
+ assert "register_cleanup=False" in script
+
+
+def test_config_not_found_suggests_demo() -> None:
+ proc = subprocess.run(
+ [
+ sys.executable,
+ str(MAIN),
+ "--config",
+ "/nonexistent/data_boar_config_missing.yaml",
+ ],
+ cwd=REPO_ROOT,
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ assert proc.returncode == 1
+ combined = proc.stdout + proc.stderr
+ assert "--demo" in combined
+
+
+@pytest.mark.skipif(
+ not (REPO_ROOT / "core" / "demo" / "synthetic_corpus.py").exists(),
+ reason="core.demo package required",
+)
+def test_prepare_demo_workspace_loopback_host() -> None:
+ from core.demo.runtime import prepare_demo_workspace
+
+ demo_dir, config_path, config = prepare_demo_workspace(
+ port=18088, register_cleanup=False
+ )
+ try:
+ assert config_path.exists()
+ assert config["api"]["host"] == "127.0.0.1"
+ assert (demo_dir / "corpus").is_dir()
+ finally:
+ import shutil
+
+ shutil.rmtree(demo_dir, ignore_errors=True)
diff --git a/tests/test_demo_entrypoint.py b/tests/test_demo_entrypoint.py
index d9c11e1e..e22d662a 100644
--- a/tests/test_demo_entrypoint.py
+++ b/tests/test_demo_entrypoint.py
@@ -24,19 +24,18 @@ def test_demo_sh_exists_and_is_executable() -> None:
def test_demo_sh_uses_synthetic_corpus_generator() -> None:
- """Anti-regression #834: demo.sh must delegate corpus generation to generate_synthetic_poc_corpus.py."""
+ """Anti-regression #834/#1113: demo delegates to ``data-boar --demo`` or core.demo."""
demo = (_repo_root() / "scripts" / "demo.sh").read_text(encoding="utf-8")
- assert "generate_synthetic_poc_corpus.py" in demo, (
- "scripts/demo.sh must call generate_synthetic_poc_corpus.py to produce "
- "the demo corpus without requiring real data (#834)"
+ assert "main.py --demo" in demo or "data-boar --demo" in demo, (
+ "scripts/demo.sh must call main.py --demo (#1113)"
)
def test_demo_sh_starts_web_dashboard() -> None:
- """Anti-regression #834: demo.sh must start the dashboard (main.py --web)."""
+ """Anti-regression #834/#1113: default path uses --demo (implies --web)."""
demo = (_repo_root() / "scripts" / "demo.sh").read_text(encoding="utf-8")
- assert "--web" in demo, (
- "scripts/demo.sh must include main.py --web so the dashboard opens (#834)"
+ assert "--demo" in demo, (
+ "scripts/demo.sh must invoke --demo so the dashboard opens (#1113)"
)
diff --git a/tests/test_report_excel_sheet_names.py b/tests/test_report_excel_sheet_names.py
new file mode 100644
index 00000000..313b5555
--- /dev/null
+++ b/tests/test_report_excel_sheet_names.py
@@ -0,0 +1,10 @@
+"""Excel worksheet titles must be openpyxl-safe (#1113 demo exposed Praise sheet bug)."""
+
+from report.generator import _SHEET_PRAISE_CONTROLS, _excel_safe_sheet_title
+
+
+def test_praise_sheet_title_sanitizes_slash() -> None:
+ raw = "Praise / existing controls"
+ assert "/" not in _SHEET_PRAISE_CONTROLS
+ assert _SHEET_PRAISE_CONTROLS == _excel_safe_sheet_title(raw)
+ assert len(_SHEET_PRAISE_CONTROLS) <= 31