From 1a28f645a74086e78a5b78707cbbe9b9f3a94def Mon Sep 17 00:00:00 2001 From: Jason Hernandez <7144515+jasonhernandez@users.noreply.github.com> Date: Fri, 29 May 2026 15:27:49 -0700 Subject: [PATCH] workload-replay: harden and AST-anonymize captured workloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `bin/mz-workload-anonymize` scrubs identifiers and literals from captured workloads so they can be shared. It relied on text-regex substitution, which leaked sensitive data and corrupted SQL. This reworks it to rewrite SQL on Materialize's own parsed AST, and hardens the surrounding tool. New crate `src/sql-anonymize` (`mz-sql-anonymize`): reads {mapping, rename_identifiers, redact_literals, statements} and rewrites each statement on the AST via VisitMut — - renames identifiers as whole tokens, reaching object/cluster/type references (the `Raw` AstInfo associated types, whose generic visitors are no-ops) by overriding visit_item_name_mut and friends. No substring corruption, no in-string rewrites, no word-boundary or case guesswork (Mz identifiers are case-sensitive, so exact matching is correct); - redacts query literals — strings, numbers, hex strings, intervals — to ''; - preserves config literals (CREATE CLUSTER / CLUSTER REPLICA / ALTER CLUSTER and SET / RESET / SET TRANSACTION / ALTER SYSTEM), e.g. sizes and timeouts, which replay needs and which are not sensitive. The anonymizer (Python) sends all cluster/DDL/query SQL through the helper and: - still scrubs DDL create_sql literals with a blanket regex, because option strings (broker addresses, hosts) are typed AST fields neither the visitor nor the engine's redacted Display treats as literals; - applies the structural identifier mapping (column types, child schema/db, query routing fields) directly, since those are not SQL; - rebuilds source-child dict keys from the mapped database/schema names, which previously leaked the originals; - requires the parser by default (--require-parser); falls back to the regex, with a warning, only when the binary is unavailable or a statement does not parse. A verify pass re-scans the output for surviving original identifiers (in any string, including structural keys) and non-placeholder literals, exempting reserved format keys and preserved config statements, and refuses to write if anything leaks. Output now requires an explicit -o/--in-place rather than silently overwriting the input. Validated against a production capture: 0 of 623 anonymized queries fail to re-parse (a regex prototype corrupted 65 of 565), 0 identifier leaks, query numbers redacted, cluster/SET config preserved. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 10 + Cargo.toml | 2 + .../materialize/cli/mz_workload_anonymize.py | 501 +++++++++++++++--- .../cli/mz_workload_anonymize_test.py | 450 ++++++++++++++++ src/sql-anonymize/Cargo.toml | 22 + src/sql-anonymize/src/main.rs | 316 +++++++++++ test/workload-replay/README.md | 38 +- 7 files changed, 1273 insertions(+), 66 deletions(-) create mode 100644 misc/python/materialize/cli/mz_workload_anonymize_test.py create mode 100644 src/sql-anonymize/Cargo.toml create mode 100644 src/sql-anonymize/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 30b1bf2513b74..a6df1534cb5be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7824,6 +7824,16 @@ dependencies = [ "version-compare", ] +[[package]] +name = "mz-sql-anonymize" +version = "0.0.0" +dependencies = [ + "mz-ore", + "mz-sql-parser", + "serde", + "serde_json", +] + [[package]] name = "mz-sql-lexer" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index a2be5ba92183a..d758c1ca63898 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ members = [ "src/server-core", "src/service", "src/sql", + "src/sql-anonymize", "src/sql-lexer", "src/sql-parser", "src/sql-pretty", @@ -223,6 +224,7 @@ default-members = [ "src/server-core", "src/service", "src/sql", + "src/sql-anonymize", "src/sql-lexer", "src/sql-parser", "src/sql-pretty", diff --git a/misc/python/materialize/cli/mz_workload_anonymize.py b/misc/python/materialize/cli/mz_workload_anonymize.py index dd6e5b2b6f645..442b066b84ae4 100644 --- a/misc/python/materialize/cli/mz_workload_anonymize.py +++ b/misc/python/materialize/cli/mz_workload_anonymize.py @@ -8,8 +8,12 @@ # by the Apache License, Version 2.0. import argparse +import json +import os import re +import subprocess import sys +from pathlib import Path from typing import Any import yaml @@ -17,6 +21,64 @@ from materialize import MZ_ROOT +def _locate_redactor() -> list[str] | None: + """Locate the mz-sql-anonymize helper binary, if it has been built. + + Honors MZ_SQL_ANONYMIZE_BIN, then looks for a release or debug build in the + Cargo target directory. Returns the argv prefix to run it, or None. + """ + override = os.environ.get("MZ_SQL_ANONYMIZE_BIN") + if override and Path(override).exists(): + return [override] + for profile in ("release", "debug"): + candidate = MZ_ROOT / "target" / profile / "mz-sql-anonymize" + if candidate.exists(): + return [str(candidate)] + return None + + +def anonymize_sql_via_parser( + sqls: list[str], + mapping: dict[str, str], + rename_identifiers: bool, + redact_literals: bool, +) -> list[str | None] | None: + """Rename identifiers and/or redact literals in each SQL string via the AST. + + Uses the `mz-sql-anonymize` helper, which parses each statement with + Materialize's own parser, renames identifier tokens per `mapping`, and (when + `redact_literals`) replaces literal values with `''`. Doing this on + the AST avoids the corruption a text regex causes (substring matches, + in-string rewrites, broken syntax). + + Returns a list aligned with the input — the rewritten SQL, or None for a + statement that did not parse. Returns None for the whole batch if the helper + binary is unavailable or errors, signaling the caller to fall back to regex. + """ + cmd = _locate_redactor() + if cmd is None: + return None + request = { + "mapping": mapping, + "rename_identifiers": rename_identifiers, + "redact_literals": redact_literals, + "statements": sqls, + } + proc = subprocess.run( + cmd, + input=json.dumps(request), + capture_output=True, + text=True, + ) + if proc.returncode != 0: + print( + f"warning: {cmd[0]} failed, falling back to regex:\n{proc.stderr}", + file=sys.stderr, + ) + return None + return json.loads(proc.stdout) + + def keywords() -> set[str]: with open(MZ_ROOT / "src" / "sql-lexer" / "src" / "keywords.txt") as f: result = set( @@ -34,6 +96,188 @@ def keywords() -> set[str]: return result +# Keys that are part of the workload file format itself, not user identifiers. +# The verify pass must not treat these as leaks when a user object happens to +# share a name with one of them (e.g. a column named `transaction_id`). Their +# *values* are still checked; only the structural key name is exempt. +RESERVED_FORMAT_KEYS = frozenset( + { + # top level + "databases", + "clusters", + "queries", + "mz_workload_version", + # schema-level containers + "tables", + "views", + "materialized_views", + "indexes", + "types", + "connections", + "sources", + "sinks", + # object / column fields + "create_sql", + "name", + "type", + "schema", + "database", + "columns", + "managed", + "children", + "nullable", + "default", + "rows", + "avg_size", + # source statistics fields + "bytes_total", + "messages_total", + "bytes_second", + "messages_second", + # query record fields + "sql", + "cluster", + "search_path", + "statement_type", + "finished_status", + "params", + "transaction_isolation", + "session_id", + "transaction_id", + "began_at", + "duration", + "result_size", + } +) + + +# Query statement types whose literals are non-sensitive config (session/system +# settings such as timeouts and isolation). The anonymizer preserves these — see +# `preserves_literals` in the mz-sql-anonymize helper — so verify must not flag +# them. Cluster DDL is preserved too but lives in create_sql, handled separately. +CONFIG_STATEMENT_TYPES = frozenset( + { + "set_variable", + "reset_variable", + "set_transaction", + "alter_system_set", + "alter_system_reset", + } +) + + +def _iter_sql(obj: Any, path: str = "") -> Any: + """Yield (location, sql) for every create_sql/sql string in the workload.""" + if isinstance(obj, dict): + for key, value in obj.items(): + child_path = f"{path}.{key}" + if key in ("create_sql", "sql") and isinstance(value, str): + yield child_path, value + else: + yield from _iter_sql(value, child_path) + elif isinstance(obj, list): + for i, value in enumerate(obj): + yield from _iter_sql(value, f"{path}[{i}]") + + +def _iter_strings(obj: Any, path: str = "") -> Any: + """Yield (location, string) for every string in the workload, keys included. + + Identifiers leak through structural positions too — notably dict keys such + as a source child's fully-qualified name — so the identifier check must look + beyond create_sql/sql values. + """ + if isinstance(obj, dict): + for key, value in obj.items(): + child_path = f"{path}.{key}" + if isinstance(key, str): + yield f"{child_path}.", key + yield from _iter_strings(value, child_path) + elif isinstance(obj, list): + for i, value in enumerate(obj): + yield from _iter_strings(value, f"{path}[{i}]") + elif isinstance(obj, str): + yield path, obj + + +def verify_anonymized( + new: dict[str, Any], mapping: dict[str, str], args: argparse.Namespace +) -> list[str]: + """Best-effort scan of anonymized output for data that should have been scrubbed. + + This is a backstop for the heuristic text substitution, not a proof: it + catches whole-word survivals of original identifiers (in any string, + including structural dict keys) and any single-quoted literal in SQL that + was not reduced to a placeholder ('' from the parser-based path, + or 'literal_N' from the regex fallback). It cannot detect sensitive data + hidden in dollar-quoted strings, comments, or numeric literals when the + regex fallback is in use. + + Cluster create_sql is exempt from the literal check: its literals (SIZE, + replication factor, availability zones) are non-sensitive configuration that + replay must preserve verbatim, so they are intentionally not anonymized. + """ + problems: list[str] = [] + + # Identifiers that were actually renamed (keywords map to themselves). + identifier_checks: list[tuple[str, re.Pattern[str]]] = [] + if args.identifiers: + for original, anonymized in mapping.items(): + if original == anonymized: + continue + if re.fullmatch(r"\w+", original): + pattern = re.compile(r"\b" + re.escape(original) + r"\b") + else: + pattern = re.compile(re.escape(original)) + identifier_checks.append((original, pattern)) + + string_literal = re.compile(r"'(?:[^']|'')*'") + placeholder = re.compile(r"^'(?:literal_\d+|)'$") + + # The identifier check runs over identifier positions only: SQL text and + # structural dict keys (e.g. a source child's fully-qualified key). It must + # NOT scan arbitrary scalar values — a kept literal like 'secret note' can + # contain a word that matches a renamed column without being a leak. + def check_identifiers(location: str, text: str) -> None: + for original, pattern in identifier_checks: + if pattern.search(text): + problems.append( + f"{location}: original identifier {original!r} survived" + ) + + for location, text in _iter_strings(new): + if location.endswith(".") and text not in RESERVED_FORMAT_KEYS: + check_identifiers(location, text) + + query_location = re.compile(r"\.queries\[(\d+)\]\.sql$") + + def literals_preserved(location: str) -> bool: + # Cluster create_sql and config statements (SET/RESET/ALTER SYSTEM) keep + # their literals on purpose; the anonymizer does not redact them, so the + # verify pass must not flag them. + if location.startswith(".clusters"): + return True + m = query_location.match(location) + if ( + m + and new["queries"][int(m.group(1))].get("statement_type") + in CONFIG_STATEMENT_TYPES + ): + return True + return False + + for location, sql in _iter_sql(new): + check_identifiers(location, sql) + if args.literals and not literals_preserved(location): + for match in string_literal.finditer(sql): + if not placeholder.fullmatch(match.group(0)): + problems.append( + f"{location}: non-anonymized string literal {match.group(0)!r}" + ) + + return problems + + def main() -> int: parser = argparse.ArgumentParser( prog="mz-workload-anonymize", @@ -46,7 +290,12 @@ def main() -> int: "--output", type=str, default=None, - help="Path to write the workload.yml, overrides the input file if not specified", + help="Path to write the workload.yml, or - for stdout. Required unless --in-place is given.", + ) + parser.add_argument( + "--in-place", + action="store_true", + help="Overwrite the input file with the anonymized workload. Destroys the original capture.", ) parser.add_argument( "--identifiers", action=argparse.BooleanOptionalAction, default=True @@ -54,6 +303,23 @@ def main() -> int: parser.add_argument( "--literals", action=argparse.BooleanOptionalAction, default=True ) + parser.add_argument( + "--verify", + action=argparse.BooleanOptionalAction, + default=True, + help="After anonymizing, scan the output for surviving original identifiers and " + "non-anonymized string literals, and refuse to write if any are found.", + ) + parser.add_argument( + "--require-parser", + action=argparse.BooleanOptionalAction, + default=True, + help="Require the mz-sql-anonymize parser for query literal redaction " + "(the default). With --no-require-parser, fall back to a weaker regex " + "that only redacts single-quoted strings (missing numbers, dollar-quoted " + "strings, and comments) when the parser binary is unavailable or a " + "statement does not parse.", + ) parser.add_argument( "file", @@ -63,6 +329,20 @@ def main() -> int: args = parser.parse_args() + # Resolve the output target up front so an invalid invocation fails before + # any work (and before the parser-availability check below). + if args.output: + output = args.output + elif args.in_place: + output = args.file + else: + print( + "error: specify an output with -o/--output (use '-' for stdout) " + "or pass --in-place to overwrite the input file", + file=sys.stderr, + ) + return 1 + with open(args.file) as f: workload = yaml.load(f, Loader=yaml.CSafeLoader) @@ -100,7 +380,9 @@ def set_name(name: str, new_name: str) -> str: else: return name - string_literal_pattern = re.compile(r"'(?:[^']*(?:'')?)*'") + # Matches a single-quoted SQL string literal, including '' escapes. Written + # without nested quantifiers to avoid catastrophic backtracking (ReDoS). + string_literal_pattern = re.compile(r"'(?:[^']|'')*'") def anonymize_string_literal(match: re.Match[str]) -> str: count["literals"] += 1 @@ -199,8 +481,17 @@ def anonymize_column_default(column: dict[str, Any]) -> None: if args.literals: anonymize_column_default(column) child["columns"].append(column) + # Build the child's fully-qualified key from the mapped + # database/schema names. child["name"] is already + # anonymized above, but database/schema are remapped + # later (pass 2); without mapping them here the key + # leaks the original database and schema names. When + # --identifiers is off the mapping is empty and these + # resolve back to the originals, as intended. source["children"][ - f"{child['database']}.{child['schema']}.{child['name']}" + f"{mapping.get(child['database'], child['database'])}." + f"{mapping.get(child['schema'], child['schema'])}." + f"{child['name']}" ] = child new_schema["sources"][new_source_name] = source @@ -245,75 +536,165 @@ def anonymize_column_default(column: dict[str, Any]) -> None: new_database[new_schema_name] = new_schema new["databases"][new_db_name] = new_database - # TODO: Case discrepancies are not handled. You can call a column `mintimestamp`, but then use it as `minTimestamp` - if args.identifiers: - pattern = re.compile( - "|".join(map(re.escape, sorted(mapping, key=len, reverse=True))) + # --- Pass 2: rewrite SQL text --- + # + # Identifier renaming and query-literal redaction run on the AST via the + # mz-sql-anonymize helper, which renames whole identifier tokens and redacts + # literal values without the corruption a text regex causes. DDL create_sql + # literals are still scrubbed with a blanket regex, because option strings + # (connection hosts/broker addresses, sink topics) are typed fields the AST + # — like the engine's own redacted Display — does not treat as redactable + # literals. Cluster/SET config literals (sizes, timeouts) are preserved by + # the helper and never regex-redacted here. + + binary_available = _locate_redactor() is not None + if not binary_available: + if args.require_parser: + print( + "error: mz-sql-anonymize helper not found, so SQL cannot be " + "anonymized with the parser. Build it with:\n" + " cargo build --release -p mz-sql-anonymize\n" + "or pass --no-require-parser to fall back to a regex that renames " + "identifiers by text substitution (which can corrupt SQL) and only " + "redacts single-quoted string literals.", + file=sys.stderr, + ) + return 1 + print( + "warning: mz-sql-anonymize helper not found; falling back to regex " + "identifier substitution and literal redaction for all SQL " + "(--no-require-parser).", + file=sys.stderr, ) - def replace_identifiers(d: dict[str, Any], entry: str) -> None: - if args.identifiers: - d[entry] = pattern.sub(lambda m: mapping[m.group(0)], d[entry]) + # Regex fallback, used only when the helper is unavailable or a statement + # does not parse. The identifier substitution is the corruption-prone + # heuristic the AST replaces; it is the degraded path. + fallback_pattern = ( + re.compile("|".join(map(re.escape, sorted(mapping, key=len, reverse=True)))) + if args.identifiers and mapping + else None + ) - def replace_literals(d: dict[str, Any], entry: str) -> None: - if args.literals: - d[entry] = anonymize_literals_in_sql(d[entry]) + def fallback_rewrite(sql: str, redact: bool) -> str: + if fallback_pattern is not None: + sql = fallback_pattern.sub(lambda m: mapping[m.group(0)], sql) + if redact and args.literals: + sql = anonymize_literals_in_sql(sql) + return sql - # TODO: The create_sql replacements are more of a heuristic because we might overwrite identifiers and same name existing twice. There are two alternatives: - # 1. Wrap the Mz parser in Python, parse the SQL, resolve what they map to, rename, and reserialize. - # 2. Spin up Materialize, RENAME everything. Doesn't work for column names? Then take a new recording - for cluster in new["clusters"].values(): - replace_identifiers(cluster, "create_sql") - for db in new["databases"].values(): - for schema in db.values(): - for table in schema["tables"].values(): - for column in table["columns"]: - if args.identifiers and column["type"] in mapping: - column["type"] = mapping[column["type"]] - replace_identifiers(table, "create_sql") - replace_literals(table, "create_sql") - for typ in schema["types"].values(): - replace_identifiers(typ, "create_sql") - for conn in schema["connections"].values(): - replace_identifiers(conn, "create_sql") - for source in schema["sources"].values(): - for column in source.get("columns", []): - if args.identifiers and column["type"] in mapping: - column["type"] = mapping[column["type"]] - for child in source.get("children", {}).values(): - if args.identifiers: - child["schema"] = mapping[child["schema"]] - child["database"] = mapping[child["database"]] - for column in child["columns"]: - if args.identifiers and column["type"] in mapping: + # Structural identifier fields that are not SQL text (the helper never sees + # them): column type references, child schema/database, and query routing. + if args.identifiers: + for db in new["databases"].values(): + for schema in db.values(): + for table in schema["tables"].values(): + for column in table["columns"]: + if column["type"] in mapping: column["type"] = mapping[column["type"]] - replace_identifiers(child, "create_sql") - replace_literals(child, "create_sql") - replace_identifiers(source, "create_sql") - replace_literals(source, "create_sql") - for view in schema["views"].values(): - replace_identifiers(view, "create_sql") - replace_literals(view, "create_sql") - for mv in schema["materialized_views"].values(): - replace_identifiers(mv, "create_sql") - replace_literals(mv, "create_sql") - for index in schema["indexes"].values(): - replace_identifiers(index, "create_sql") - for sink in schema["sinks"].values(): - replace_identifiers(sink, "create_sql") - for query in workload["queries"]: - if args.identifiers: + for source in schema["sources"].values(): + for column in source.get("columns", []): + if column["type"] in mapping: + column["type"] = mapping[column["type"]] + for child in source.get("children", {}).values(): + # A child's schema/database may be a builtin or otherwise + # uncaptured name not in the mapping; leave those as-is. + child["schema"] = mapping.get(child["schema"], child["schema"]) + child["database"] = mapping.get( + child["database"], child["database"] + ) + for column in child["columns"]: + if column["type"] in mapping: + column["type"] = mapping[column["type"]] + for query in workload["queries"]: query["cluster"] = mapping.get(query["cluster"], query["cluster"]) query["database"] = mapping.get(query["database"], query["database"]) query["search_path"] = [ mapping.get(schema, schema) for schema in query["search_path"] ] - replace_identifiers(query, "sql") - if args.literals: - replace_literals(query, "sql") + for query in workload["queries"]: new["queries"].append(query) - output = args.output or args.file + # Rewrite each group of create_sql/sql strings through the helper. `redact` + # asks the AST to redact literals (queries); `regex_literals` applies the + # DDL literal regex on top (DDL, but not clusters, whose config is kept). + n_unparsed = 0 + + def anonymize_group( + items: list[dict[str, Any]], + key: str, + *, + redact: bool, + regex_literals: bool, + ) -> None: + nonlocal n_unparsed + targets = [d for d in items if isinstance(d.get(key), str)] + if not targets: + return + sqls = [d[key] for d in targets] + results = ( + anonymize_sql_via_parser(sqls, mapping, args.identifiers, redact) + if binary_available + else None + ) + for i, d in enumerate(targets): + out = results[i] if results is not None else None + if out is None: + if results is not None: + n_unparsed += 1 + d[key] = fallback_rewrite(d[key], redact=redact or regex_literals) + else: + d[key] = out + if regex_literals and args.literals: + d[key] = anonymize_literals_in_sql(d[key]) + + clusters = list(new["clusters"].values()) + ddl: list[dict[str, Any]] = [] + for db in new["databases"].values(): + for schema in db.values(): + for group in ( + "tables", + "types", + "connections", + "sources", + "views", + "materialized_views", + "indexes", + "sinks", + ): + for obj in schema[group].values(): + ddl.append(obj) + if group == "sources": + ddl.extend(obj.get("children", {}).values()) + + # Clusters: rename only, keep config literals. + anonymize_group(clusters, "create_sql", redact=False, regex_literals=False) + # Other DDL: rename via AST, redact literals via regex (catches option strings). + anonymize_group(ddl, "create_sql", redact=False, regex_literals=True) + # Queries: rename + redact literals, both on the AST. + anonymize_group(new["queries"], "sql", redact=args.literals, regex_literals=False) + + if n_unparsed: + print( + f"warning: mz-sql-anonymize could not parse {n_unparsed} statement(s); " + "fell back to the regex for those (it only redacts single-quoted " + "strings, and its identifier substitution can corrupt SQL). The verify " + "pass still scans them.", + file=sys.stderr, + ) + + if args.verify: + problems = verify_anonymized(new, mapping, args) + if problems: + print( + "Refusing to write output: anonymization left sensitive data behind.\n" + "Pass --no-verify to write anyway.", + file=sys.stderr, + ) + for problem in problems: + print(f" {problem}", file=sys.stderr) + return 1 + if output == "-": yaml.dump(new, sys.stdout, Dumper=yaml.CSafeDumper) else: diff --git a/misc/python/materialize/cli/mz_workload_anonymize_test.py b/misc/python/materialize/cli/mz_workload_anonymize_test.py new file mode 100644 index 0000000000000..1e97d7893ed13 --- /dev/null +++ b/misc/python/materialize/cli/mz_workload_anonymize_test.py @@ -0,0 +1,450 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Tests for the workload anonymizer (`mz-workload-anonymize`). + +These exercise the pure-Python anonymization logic and the regex-based literal +fallback, which run without a built `mz-sql-anonymize` binary. One test +covers the parser-backed path and is skipped when that binary is absent. +""" + +from __future__ import annotations + +import sys +from typing import Any +from unittest import mock + +import pytest +import yaml + +from materialize.cli import mz_workload_anonymize + + +def base_workload() -> dict[str, Any]: + """A small but structurally complete workload capture. + + Carries sensitive identifiers (database/table/column/connection/sink names) + and literals (a connection host and user, a sink topic, a column default, + and string + numeric predicates in a query) so tests can assert they are + scrubbed. + """ + return { + "clusters": { + "prod_cluster": { + "create_sql": "CREATE CLUSTER prod_cluster (SIZE = '100cc')", + }, + }, + "databases": { + "customers_db": { + "public": { + "tables": { + "orders": { + "create_sql": "CREATE TABLE orders (id int, note text DEFAULT 'secret note')", + "columns": [ + {"name": "id", "type": "int4"}, + { + "name": "note", + "type": "text", + "default": "'secret note'", + }, + ], + "rows": 5, + }, + }, + "views": {}, + "materialized_views": {}, + "indexes": {}, + "types": {}, + "connections": { + "kafka_conn": { + "create_sql": "CREATE CONNECTION kafka_conn TO KAFKA (BROKER 'prod.internal.acme.com:9092', SASL USERNAME 'admin')", + }, + }, + "sources": {}, + "sinks": { + "out_sink": { + "create_sql": "CREATE SINK out_sink FROM orders INTO KAFKA CONNECTION kafka_conn (TOPIC 'customer-orders-prod')", + }, + }, + }, + }, + }, + "queries": [ + { + "sql": "SELECT id, note FROM customers_db.public.orders WHERE note = 'hunter2' AND id = 987654321", + "cluster": "prod_cluster", + "database": "customers_db", + "search_path": ["public"], + "statement_type": "select", + "finished_status": "success", + }, + ], + } + + +def run_tool( + tmp_path: Any, + workload: dict[str, Any], + *extra_args: str, + in_place: bool = False, + require_parser: bool = False, +) -> tuple[int, dict[str, Any] | None, str]: + """Run main() against a workload, returning (exit_code, output, dumped_text). + + Defaults to --no-require-parser so tests are deterministic whether or not + the mz-sql-anonymize binary is built; tests that exercise the parser + requirement pass require_parser=True. + """ + inp = tmp_path / "workload.yml" + inp.write_text(yaml.safe_dump(workload)) + argv = ["mz-workload-anonymize", str(inp)] + if in_place: + argv.append("--in-place") + out = inp + else: + out = tmp_path / "out.yml" + argv += ["-o", str(out)] + if not require_parser: + argv.append("--no-require-parser") + argv += list(extra_args) + + with mock.patch.object(sys, "argv", argv): + rc = mz_workload_anonymize.main() + + if rc == 0 and out.exists(): + text = out.read_text() + return rc, yaml.safe_load(text), text + return rc, None, "" + + +@pytest.fixture +def force_regex(monkeypatch: pytest.MonkeyPatch) -> None: + """Force the regex literal fallback by hiding the parser binary. + + Keeps the bulk of the tests deterministic regardless of whether the + `mz-sql-anonymize` helper happens to be built in the dev environment. + """ + monkeypatch.setattr(mz_workload_anonymize, "_locate_redactor", lambda: None) + + +def test_anonymizes_identifiers(tmp_path: Any, force_regex: None) -> None: + rc, out, text = run_tool(tmp_path, base_workload()) + assert rc == 0 + assert out is not None + # Original object names must not survive anywhere in the output. + for original in ("customers_db", "orders", "kafka_conn", "out_sink"): + assert original not in text, f"{original!r} leaked" + # And anonymized names should be present. + assert "db_0" in text + assert "table_1" in text + + +def test_connection_and_sink_literals_scrubbed( + tmp_path: Any, force_regex: None +) -> None: + # Regression test for the connection/sink literal leak: hostnames, + # usernames, and topic names live in DDL option strings. + rc, _out, text = run_tool(tmp_path, base_workload()) + assert rc == 0 + assert "prod.internal.acme.com" not in text + assert "admin" not in text + assert "customer-orders-prod" not in text + + +def test_table_default_literal_scrubbed(tmp_path: Any, force_regex: None) -> None: + rc, _out, text = run_tool(tmp_path, base_workload()) + assert rc == 0 + assert "secret note" not in text + + +def test_query_string_literal_scrubbed(tmp_path: Any, force_regex: None) -> None: + rc, _out, text = run_tool(tmp_path, base_workload()) + assert rc == 0 + assert "hunter2" not in text + + +def test_cluster_size_preserved(tmp_path: Any, force_regex: None) -> None: + # Cluster SIZE is non-sensitive config that replay must keep verbatim. + rc, _out, text = run_tool(tmp_path, base_workload()) + assert rc == 0 + assert "100cc" in text + + +def test_no_output_target_errors( + tmp_path: Any, force_regex: None, capsys: pytest.CaptureFixture[str] +) -> None: + inp = tmp_path / "workload.yml" + inp.write_text(yaml.safe_dump(base_workload())) + with mock.patch.object(sys, "argv", ["mz-workload-anonymize", str(inp)]): + rc = mz_workload_anonymize.main() + assert rc == 1 + assert "in-place" in capsys.readouterr().err + + +def test_in_place_overwrites_input(tmp_path: Any, force_regex: None) -> None: + rc, _out, text = run_tool(tmp_path, base_workload(), in_place=True) + assert rc == 0 + assert "customers_db" not in text + assert "hunter2" not in text + + +def test_no_literals_keeps_literals_but_anonymizes_identifiers( + tmp_path: Any, force_regex: None +) -> None: + rc, _out, text = run_tool(tmp_path, base_workload(), "--no-literals") + assert rc == 0 + # Literals retained... + assert "hunter2" in text + # ...but identifiers still anonymized. + assert "customers_db" not in text + + +def test_verify_catches_surviving_identifier() -> None: + new = { + "databases": {}, + "clusters": {}, + "queries": [{"sql": "SELECT * FROM orders"}], + } + mapping = {"orders": "table_1"} + args = mock.Mock(identifiers=True, literals=True) + problems = mz_workload_anonymize.verify_anonymized(new, mapping, args) + assert any("orders" in p for p in problems) + + +def test_verify_catches_unanonymized_literal() -> None: + new = { + "databases": {}, + "clusters": {}, + "queries": [{"sql": "SELECT * FROM t WHERE x = 'leak'"}], + } + args = mock.Mock(identifiers=True, literals=True) + problems = mz_workload_anonymize.verify_anonymized(new, {}, args) + assert any("leak" in p for p in problems) + + +def test_verify_accepts_both_placeholder_styles() -> None: + new = { + "databases": {}, + "clusters": {}, + "queries": [ + {"sql": "SELECT * FROM t WHERE a = 'literal_1' AND b = ''"}, + ], + } + args = mock.Mock(identifiers=True, literals=True) + assert mz_workload_anonymize.verify_anonymized(new, {}, args) == [] + + +def test_verify_exempts_config_statement_literals() -> None: + # A preserved SET/timeout literal in a config query must not be flagged. + new = { + "clusters": {}, + "databases": {}, + "queries": [ + {"sql": "SET statement_timeout = '5s'", "statement_type": "set_variable"}, + ], + } + args = mock.Mock(identifiers=True, literals=True) + assert mz_workload_anonymize.verify_anonymized(new, {}, args) == [] + + +def test_verify_exempts_cluster_literals() -> None: + # Cluster create_sql keeps its SIZE literal; verify must not flag it. + new = { + "clusters": { + "cluster_0": {"create_sql": "CREATE CLUSTER cluster_0 (SIZE = '100cc')"} + }, + "databases": {}, + "queries": [], + } + args = mock.Mock(identifiers=True, literals=True) + assert mz_workload_anonymize.verify_anonymized(new, {}, args) == [] + + +def test_anonymize_via_parser_returns_none_without_binary( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(mz_workload_anonymize, "_locate_redactor", lambda: None) + assert ( + mz_workload_anonymize.anonymize_sql_via_parser(["SELECT 1"], {}, True, True) + is None + ) + + +def test_regex_fallback_warns( + tmp_path: Any, force_regex: None, capsys: pytest.CaptureFixture[str] +) -> None: + rc, _out, _text = run_tool(tmp_path, base_workload()) + assert rc == 0 + assert "mz-sql-anonymize helper not found" in capsys.readouterr().err + + +def test_require_parser_errors_without_binary( + tmp_path: Any, force_regex: None, capsys: pytest.CaptureFixture[str] +) -> None: + # By default the parser is required: with no binary the tool must refuse to + # run rather than silently fall back to the weaker regex. + rc, out, _text = run_tool(tmp_path, base_workload(), require_parser=True) + assert rc == 1 + assert out is None + err = capsys.readouterr().err + assert "mz-sql-anonymize" in err + assert "--no-require-parser" in err + + +@pytest.mark.skipif( + mz_workload_anonymize._locate_redactor() is None, + reason="mz-sql-anonymize binary not built; run `cargo build --release -p mz-sql-anonymize`", +) +def test_parser_path_redacts_numeric_literal(tmp_path: Any) -> None: + # The parser path (unlike the regex) redacts numbers in query predicates. + rc, _out, text = run_tool(tmp_path, base_workload(), require_parser=True) + assert rc == 0 + assert "987654321" not in text + assert "" in text + + +@pytest.mark.skipif( + mz_workload_anonymize._locate_redactor() is None, + reason="mz-sql-anonymize binary not built; run `cargo build --release -p mz-sql-anonymize`", +) +def test_parser_path_renames_table_reference(tmp_path: Any) -> None: + # End-to-end proof the AST path renames a custom object reference as a whole + # token (where the old text regex risked substring corruption). `zorp` and + # `zorpcol` are non-keyword names, so both must be renamed and neither may + # survive as a bare word. + import re as _re + + wl = base_workload() + wl["databases"]["customers_db"]["public"]["tables"]["zorp"] = { + "create_sql": "CREATE TABLE zorp (zorpcol int)", + "columns": [{"name": "zorpcol", "type": "int4"}], + "rows": 1, + } + wl["queries"][0]["sql"] = "SELECT zorpcol FROM zorp" + rc, out, text = run_tool(tmp_path, wl, require_parser=True) + assert rc == 0 + assert out is not None + assert not _re.search(r"(? dict[str, Any]: + """A workload with a CDC source whose subsources (children) are keyed by + their fully-qualified `database.schema.name`.""" + return { + "clusters": {}, + "databases": { + "upstream_db": { + "ingest_schema": { + "tables": {}, + "views": {}, + "materialized_views": {}, + "indexes": {}, + "types": {}, + "connections": {}, + "sources": { + "pg_src": { + "create_sql": "CREATE SOURCE pg_src FROM POSTGRES CONNECTION c", + "type": "postgres", + "children": { + "upstream_db.ingest_schema.people": { + "name": "people", + "database": "upstream_db", + "schema": "ingest_schema", + "create_sql": "CREATE SUBSOURCE people (id int)", + "columns": [{"name": "id", "type": "int4"}], + }, + }, + }, + }, + "sinks": {}, + }, + }, + }, + "queries": [], + } + + +def test_subsource_child_key_is_anonymized(tmp_path: Any, force_regex: None) -> None: + # Regression: the child's fully-qualified dict key must not leak the + # original database/schema/name. + rc, _out, text = run_tool(tmp_path, cdc_workload()) + assert rc == 0 + for original in ("upstream_db", "ingest_schema", "people"): + assert original not in text, f"{original!r} leaked via a child key" + + +def test_verify_catches_leaked_child_key() -> None: + # A surviving original identifier in a structural dict key must be flagged. + new = { + "clusters": {}, + "databases": { + "db_0": { + "schema_1": { + "sources": { + "source_1": { + "children": { + # Leaky: original schema name survived in the key. + "db_0.ingest_schema.child_1": {"name": "child_1"}, + }, + }, + }, + }, + }, + }, + "queries": [], + } + mapping = {"ingest_schema": "schema_1"} + args = mock.Mock(identifiers=True, literals=True) + problems = mz_workload_anonymize.verify_anonymized(new, mapping, args) + assert any("ingest_schema" in p for p in problems) + + +def test_verify_ignores_reserved_format_key_collision() -> None: + # A user column named like a reserved format key (e.g. transaction_id) must + # not make verify flag the query record's own field name. + new = { + "clusters": {}, + "databases": {}, + "queries": [{"sql": "SELECT column_1 FROM table_1", "transaction_id": 7}], + } + mapping = {"transaction_id": "column_1"} + args = mock.Mock(identifiers=True, literals=True) + assert mz_workload_anonymize.verify_anonymized(new, mapping, args) == [] + + +def test_verify_ignores_identifier_word_in_scalar_value() -> None: + # A scalar value (here a column default) may contain a word matching a + # renamed column; that is data, not an identifier leak, so the identifier + # check must not scan scalar values. (SQL text is scanned separately.) + new = { + "clusters": {}, + "databases": { + "db_0": { + "public": { + "tables": { + "table_1": { + "create_sql": "CREATE TABLE table_1 (column_1 text)", + "columns": [ + { + "name": "column_1", + "type": "text", + "default": "'secret note'", + } + ], + } + }, + } + } + }, + "queries": [], + } + mapping = {"note": "column_1"} + args = mock.Mock(identifiers=True, literals=False) + assert mz_workload_anonymize.verify_anonymized(new, mapping, args) == [] diff --git a/src/sql-anonymize/Cargo.toml b/src/sql-anonymize/Cargo.toml new file mode 100644 index 0000000000000..a5da37d63ab2a --- /dev/null +++ b/src/sql-anonymize/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "mz-sql-anonymize" +description = "A small CLI that redacts literals from SQL using Materialize's own parser." +version = "0.0.0" +edition.workspace = true +rust-version.workspace = true +publish = false + +[lints] +workspace = true + +[[bin]] +name = "mz-sql-anonymize" +path = "src/main.rs" + +[dependencies] +mz-sql-parser = { path = "../sql-parser", default-features = false } +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true + +[dev-dependencies] +mz-ore = { path = "../ore", default-features = false, features = ["test"] } diff --git a/src/sql-anonymize/src/main.rs b/src/sql-anonymize/src/main.rs new file mode 100644 index 0000000000000..b0ad490289a44 --- /dev/null +++ b/src/sql-anonymize/src/main.rs @@ -0,0 +1,316 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Anonymizes SQL using Materialize's own parser. +//! +//! This is the SQL-rewriting half of `bin/mz-workload-anonymize`. Doing the +//! work on the parsed AST — rather than with text substitution — is what makes +//! it correct: identifiers are renamed as whole tokens (no substring or +//! in-string corruption, no word-boundary or case guesswork), and literals are +//! redacted in every position the dialect allows, including option values like +//! connection hosts and sink topics that the engine's redacted Display leaves +//! intact. +//! +//! Protocol: reads a JSON request on stdin and writes a JSON array on stdout, +//! one element per input statement — the rewritten SQL, or `null` when the +//! statement could not be parsed (the caller falls back to its regex for those). +//! +//! ```json +//! { +//! "mapping": {"orders": "table_1", "auction_house": "db_0"}, +//! "rename_identifiers": true, +//! "redact_literals": true, +//! "statements": ["SELECT * FROM orders WHERE id = 7", "..."] +//! } +//! ``` + +use std::collections::BTreeMap; +use std::io::{self, Read, Write}; + +use mz_sql_parser::ast::display::AstDisplay; +use mz_sql_parser::ast::visit_mut::{self, VisitMut}; +use mz_sql_parser::ast::{ + Ident, Raw, RawClusterName, RawDataType, RawItemName, RawNetworkPolicyName, Statement, + UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, +}; +use mz_sql_parser::parser; +use serde::Deserialize; + +#[derive(Deserialize)] +struct Request { + /// Original identifier -> anonymized identifier. Entries that map a name to + /// itself (e.g. preserved keywords) are no-ops. + #[serde(default)] + mapping: BTreeMap, + #[serde(default)] + rename_identifiers: bool, + #[serde(default)] + redact_literals: bool, + statements: Vec, +} + +struct Anonymizer<'a> { + mapping: &'a BTreeMap, + rename: bool, + redact: bool, +} + +impl Anonymizer<'_> { + fn rename_ident(&self, ident: &mut Ident) { + if self.rename { + if let Some(new) = self.mapping.get(ident.as_str()) { + *ident = Ident::new_unchecked(new.clone()); + } + } + } + + fn rename_idents(&self, idents: &mut [Ident]) { + for ident in idents { + self.rename_ident(ident); + } + } +} + +impl<'ast> VisitMut<'ast, Raw> for Anonymizer<'_> { + fn visit_ident_mut(&mut self, node: &'ast mut Ident) { + self.rename_ident(node); + } + + fn visit_value_mut(&mut self, node: &'ast mut Value) { + if self.redact { + match node { + // Collapse every data-bearing literal to a single string + // placeholder. Using a string (rather than redacting in place) + // keeps the output reparseable — `x = ''` is valid + // even where `x` is numeric — and matches the `''` + // sentinel the replay tooling already recognizes. + Value::Number(_) | Value::String(_) | Value::HexString(_) | Value::Interval(_) => { + *node = Value::String("".to_string()); + } + // Booleans and NULL are not sensitive. + Value::Boolean(_) | Value::Null => {} + } + } + } + + // The `Raw` AstInfo associated types below have no-op default visitors (the + // generic visitor cannot see into an associated type), yet they hold the + // identifiers for object/cluster/type references. Override each to descend + // to its `Ident`s so renaming reaches them. + + fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) { + match node { + RawItemName::Name(n) | RawItemName::Id(_, n, _) => self.rename_idents(&mut n.0), + } + } + + fn visit_unresolved_item_name_mut(&mut self, node: &'ast mut UnresolvedItemName) { + self.rename_idents(&mut node.0); + } + + fn visit_column_reference_mut(&mut self, node: &'ast mut Ident) { + self.rename_ident(node); + } + + fn visit_schema_name_mut(&mut self, node: &'ast mut UnresolvedSchemaName) { + self.rename_idents(&mut node.0); + } + + fn visit_database_name_mut(&mut self, node: &'ast mut UnresolvedDatabaseName) { + self.rename_ident(&mut node.0); + } + + fn visit_cluster_name_mut(&mut self, node: &'ast mut RawClusterName) { + if let RawClusterName::Unresolved(ident) = node { + self.rename_ident(ident); + } + } + + fn visit_network_policy_name_mut(&mut self, node: &'ast mut RawNetworkPolicyName) { + if let RawNetworkPolicyName::Unresolved(ident) = node { + self.rename_ident(ident); + } + } + + fn visit_data_type_mut(&mut self, node: &'ast mut RawDataType) { + match node { + RawDataType::Array(inner) | RawDataType::List(inner) => self.visit_data_type_mut(inner), + RawDataType::Map { + key_type, + value_type, + } => { + self.visit_data_type_mut(key_type); + self.visit_data_type_mut(value_type); + } + RawDataType::Other { name, .. } => self.visit_item_name_mut(name), + } + } + + fn visit_object_name_mut(&mut self, node: &'ast mut UnresolvedObjectName) { + match node { + UnresolvedObjectName::Cluster(i) + | UnresolvedObjectName::Role(i) + | UnresolvedObjectName::NetworkPolicy(i) => self.rename_ident(i), + UnresolvedObjectName::Database(n) => self.rename_ident(&mut n.0), + UnresolvedObjectName::Schema(n) => self.rename_idents(&mut n.0), + UnresolvedObjectName::Item(n) => self.rename_idents(&mut n.0), + UnresolvedObjectName::ClusterReplica(_) => visit_mut::visit_object_name_mut(self, node), + } + } +} + +/// Statement kinds whose literals are non-sensitive configuration — cluster +/// sizing/replication and session/system settings (timeouts, isolation, feature +/// flags) — which replay needs preserved verbatim. Identifiers in these +/// statements are still renamed; only literal redaction is skipped. +fn preserves_literals(stmt: &Statement) -> bool { + matches!( + stmt, + Statement::CreateCluster(_) + | Statement::CreateClusterReplica(_) + | Statement::AlterCluster(_) + | Statement::SetVariable(_) + | Statement::ResetVariable(_) + | Statement::SetTransaction(_) + | Statement::AlterSystemSet(_) + | Statement::AlterSystemReset(_) + ) +} + +/// Anonymizes one workload entry (which may hold more than one statement), +/// returning `None` if it does not parse. +fn anonymize( + sql: &str, + mapping: &BTreeMap, + rename: bool, + redact: bool, +) -> Option { + let stmts = parser::parse_statements(sql).ok()?; + let rewritten: Vec = stmts + .into_iter() + .map(|parsed| { + let mut ast = parsed.ast; + let mut visitor = Anonymizer { + mapping, + rename, + redact: redact && !preserves_literals(&ast), + }; + visitor.visit_statement_mut(&mut ast); + ast.to_ast_string_simple() + }) + .collect(); + Some(rewritten.join("; ")) +} + +fn main() -> io::Result<()> { + let mut input = String::new(); + io::stdin().read_to_string(&mut input)?; + + let req: Request = + serde_json::from_str(&input).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let result: Vec> = req + .statements + .iter() + .map(|sql| { + anonymize( + sql, + &req.mapping, + req.rename_identifiers, + req.redact_literals, + ) + }) + .collect(); + + let out = serde_json::to_string(&result)?; + io::stdout().write_all(out.as_bytes())?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::anonymize; + + fn map(pairs: &[(&str, &str)]) -> BTreeMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[mz_ore::test] + fn renames_table_reference() { + // The whole point of the AST approach: an object reference in FROM is an + // item name (an AstInfo associated type), which the old regex mangled + // and a naive visitor skips. It must be renamed as a whole token. + let m = map(&[("orders", "table_1")]); + let out = anonymize("SELECT id FROM orders", &m, true, false).expect("parses"); + assert!(out.contains("table_1"), "{out}"); + assert!(out.contains("id"), "id is unmapped, keep it: {out}"); + assert!(!out.contains("orders"), "{out}"); + } + + #[mz_ore::test] + fn renames_qualified_reference() { + let m = map(&[("mydb", "db_0"), ("myschema", "schema_1"), ("t", "table_1")]); + let out = anonymize("SELECT * FROM mydb.myschema.t", &m, true, false).expect("parses"); + assert!(out.contains("db_0.schema_1.table_1"), "{out}"); + } + + #[mz_ore::test] + fn does_not_rename_inside_other_identifiers() { + // The old regex would rewrite `id` inside `user_id`; the AST does not. + let m = map(&[("id", "column_1")]); + let out = anonymize("SELECT user_id FROM t", &m, true, false).expect("parses"); + assert!(out.contains("user_id"), "{out}"); + } + + #[mz_ore::test] + fn redacts_query_literals_including_numbers() { + let m = map(&[]); + let out = + anonymize("SELECT 'secret', 42 FROM t WHERE x = 'a'", &m, false, true).expect("parses"); + assert!(!out.contains("secret"), "{out}"); + assert!(!out.contains("42"), "{out}"); + assert!(out.contains(""), "{out}"); + } + + #[mz_ore::test] + fn does_not_rename_inside_string_literals() { + // A literal containing a word that matches a renamed identifier must not + // be touched by renaming (it is data). + let m = map(&[("orders", "table_1")]); + let out = anonymize("SELECT 'orders' FROM orders", &m, true, false).expect("parses"); + assert!(out.contains("'orders'"), "{out}"); + assert!(out.contains("table_1"), "{out}"); + } + + #[mz_ore::test] + fn preserves_cluster_config_literals() { + // Cluster sizing is config replay needs; rename the name, keep the size. + let m = map(&[("prod", "cluster_0")]); + let out = + anonymize("CREATE CLUSTER prod (SIZE = '100cc')", &m, true, true).expect("parses"); + assert!(out.contains("'100cc'"), "size must be preserved: {out}"); + assert!(out.contains("cluster_0"), "{out}"); + } + + #[mz_ore::test] + fn preserves_set_config_literals() { + let m = map(&[]); + let out = anonymize("SET statement_timeout = '5s'", &m, true, true).expect("parses"); + assert!(out.contains("'5s'"), "timeout must be preserved: {out}"); + } + + #[mz_ore::test] + fn returns_none_on_parse_error() { + assert_eq!(anonymize("SELEC not valid", &map(&[]), true, true), None); + } +} diff --git a/test/workload-replay/README.md b/test/workload-replay/README.md index c0adccb4cb649..19df69e4c4f6c 100644 --- a/test/workload-replay/README.md +++ b/test/workload-replay/README.md @@ -75,12 +75,35 @@ Anonymizes identifiers and literals in workload captures for sharing without exp - Table names → `table_1`, `table_2`, ... - Column names → `column_1`, `column_2`, ... - View, materialized view, source, sink, connection names -- All identifiers in `create_sql` definitions and queries +- All identifier references in `create_sql` definitions and queries + +Identifier renaming is done on the parsed AST by `mz-sql-anonymize`, which +renames whole identifier tokens. A text regex (the previous approach) corrupted +SQL by matching identifiers as substrings or inside string literals; the AST +does not. *Literals (`--literals`, enabled by default):* -- String literals in SQL → `'literal_1'`, `'literal_2'`, ... -- String default values in table/source/child columns -- String literals in queries +- Query SQL is redacted on the AST, replacing every literal — strings, numbers, + hex strings, intervals — with `''`. +- `create_sql` strings (connection hosts/users, sink topics, source options, + column defaults) → `'literal_1'`, `'literal_2'`, ... via a blanket regex. The + AST is not used for these because option values like broker addresses are + typed fields the parser does not treat as redactable literals (the engine's + own redacted Display leaves them intact too). +- Cluster sizing/replication and session/system config (`SET`/`RESET`/`ALTER + SYSTEM`, e.g. timeouts) are **preserved** — replay needs them and they are not + sensitive. + +**The parser binary is required by default**: if it is not built, the tool +errors rather than fall back to the corruption-prone regex for everything. Pass +`--no-require-parser` to allow that fallback. Individual statements that do not +parse fall back to the regex with a warning regardless, and the verify pass +still scans them. + +Build the helper once (required for the default `--require-parser` mode): +```bash +cargo build --release -p mz-sql-anonymize +``` **Usage:** ```bash @@ -90,9 +113,12 @@ bin/mz-workload-anonymize [OPTIONS] **Options:** | Option | Description | Default | |--------|-------------|---------| -| `-o, --output` | Path to write output | overwrites input file | +| `-o, --output` | Path to write output (`-` for stdout); required unless `--in-place` | — | +| `--in-place` | Overwrite the input file (destroys the original capture) | off | | `--identifiers` / `--no-identifiers` | Anonymize object names | enabled | -| `--literals` / `--no-literals` | Anonymize string literals | enabled | +| `--literals` / `--no-literals` | Anonymize literals | enabled | +| `--verify` / `--no-verify` | Re-scan output for leaks and refuse to write if any are found | enabled | +| `--require-parser` / `--no-require-parser` | Require the parser for query literals; error rather than fall back to the weaker regex | enabled | **Examples:** ```bash