diff --git a/Cargo.lock b/Cargo.lock index 4c9e8217ac32b..74983defa8bd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7815,6 +7815,7 @@ version = "0.0.0" dependencies = [ "mz-ore", "mz-sql-parser", + "serde", "serde_json", ] diff --git a/misc/python/materialize/cli/mz_workload_anonymize.py b/misc/python/materialize/cli/mz_workload_anonymize.py index 2dd754a6b17fd..442b066b84ae4 100644 --- a/misc/python/materialize/cli/mz_workload_anonymize.py +++ b/misc/python/materialize/cli/mz_workload_anonymize.py @@ -37,26 +37,42 @@ def _locate_redactor() -> list[str] | None: return None -def redact_literals_via_parser(sqls: list[str]) -> list[str | None] | None: - """Redact literals in each SQL string using Materialize's own parser. - - Returns a list aligned with the input, where each element is the redacted - SQL or None if that statement could not be parsed. Returns None for the - whole batch if the helper binary is unavailable or errors, signaling the - caller to fall back to regex-based redaction. +def anonymize_sql_via_parser( + sqls: list[str], + mapping: dict[str, str], + rename_identifiers: bool, + redact_literals: bool, +) -> list[str | None] | None: + """Rename identifiers and/or redact literals in each SQL string via the AST. + + Uses the `mz-sql-anonymize` helper, which parses each statement with + Materialize's own parser, renames identifier tokens per `mapping`, and (when + `redact_literals`) replaces literal values with `''`. Doing this on + the AST avoids the corruption a text regex causes (substring matches, + in-string rewrites, broken syntax). + + Returns a list aligned with the input — the rewritten SQL, or None for a + statement that did not parse. Returns None for the whole batch if the helper + binary is unavailable or errors, signaling the caller to fall back to regex. """ cmd = _locate_redactor() if cmd is None: return None + request = { + "mapping": mapping, + "rename_identifiers": rename_identifiers, + "redact_literals": redact_literals, + "statements": sqls, + } proc = subprocess.run( cmd, - input=json.dumps(sqls), + input=json.dumps(request), capture_output=True, text=True, ) if proc.returncode != 0: print( - f"warning: {cmd[0]} failed, falling back to regex redaction:\n{proc.stderr}", + f"warning: {cmd[0]} failed, falling back to regex:\n{proc.stderr}", file=sys.stderr, ) return None @@ -135,6 +151,21 @@ def keywords() -> set[str]: ) +# Query statement types whose literals are non-sensitive config (session/system +# settings such as timeouts and isolation). The anonymizer preserves these — see +# `preserves_literals` in the mz-sql-anonymize helper — so verify must not flag +# them. Cluster DDL is preserved too but lives in create_sql, handled separately. +CONFIG_STATEMENT_TYPES = frozenset( + { + "set_variable", + "reset_variable", + "set_transaction", + "alter_system_set", + "alter_system_reset", + } +) + + def _iter_sql(obj: Any, path: str = "") -> Any: """Yield (location, sql) for every create_sql/sql string in the workload.""" if isinstance(obj, dict): @@ -218,10 +249,26 @@ def check_identifiers(location: str, text: str) -> None: if location.endswith(".") and text not in RESERVED_FORMAT_KEYS: check_identifiers(location, text) + query_location = re.compile(r"\.queries\[(\d+)\]\.sql$") + + def literals_preserved(location: str) -> bool: + # Cluster create_sql and config statements (SET/RESET/ALTER SYSTEM) keep + # their literals on purpose; the anonymizer does not redact them, so the + # verify pass must not flag them. + if location.startswith(".clusters"): + return True + m = query_location.match(location) + if ( + m + and new["queries"][int(m.group(1))].get("statement_type") + in CONFIG_STATEMENT_TYPES + ): + return True + return False + for location, sql in _iter_sql(new): check_identifiers(location, sql) - # Literal check runs only over SQL, and exempts cluster create_sql. - if args.literals and not location.startswith(".clusters"): + if args.literals and not literals_preserved(location): for match in string_literal.finditer(sql): if not placeholder.fullmatch(match.group(0)): problems.append( @@ -489,141 +536,152 @@ def anonymize_column_default(column: dict[str, Any]) -> None: new_database[new_schema_name] = new_schema new["databases"][new_db_name] = new_database - # TODO: Case discrepancies are not handled. You can call a column `mintimestamp`, but then use it as `minTimestamp` - if args.identifiers: - pattern = re.compile( - "|".join(map(re.escape, sorted(mapping, key=len, reverse=True))) + # --- Pass 2: rewrite SQL text --- + # + # Identifier renaming and query-literal redaction run on the AST via the + # mz-sql-anonymize helper, which renames whole identifier tokens and redacts + # literal values without the corruption a text regex causes. DDL create_sql + # literals are still scrubbed with a blanket regex, because option strings + # (connection hosts/broker addresses, sink topics) are typed fields the AST + # — like the engine's own redacted Display — does not treat as redactable + # literals. Cluster/SET config literals (sizes, timeouts) are preserved by + # the helper and never regex-redacted here. + + binary_available = _locate_redactor() is not None + if not binary_available: + if args.require_parser: + print( + "error: mz-sql-anonymize helper not found, so SQL cannot be " + "anonymized with the parser. Build it with:\n" + " cargo build --release -p mz-sql-anonymize\n" + "or pass --no-require-parser to fall back to a regex that renames " + "identifiers by text substitution (which can corrupt SQL) and only " + "redacts single-quoted string literals.", + file=sys.stderr, + ) + return 1 + print( + "warning: mz-sql-anonymize helper not found; falling back to regex " + "identifier substitution and literal redaction for all SQL " + "(--no-require-parser).", + file=sys.stderr, ) - def replace_identifiers(d: dict[str, Any], entry: str) -> None: - if args.identifiers: - d[entry] = pattern.sub(lambda m: mapping[m.group(0)], d[entry]) - - # DDL create_sql is redacted with the blanket regex: option strings like - # connection hosts, sink topics, and source options must be scrubbed, and - # the parser's to_ast_string_redacted() intentionally does NOT redact those - # (it only redacts expression/value literals, treating DDL options as - # config). Query SQL is handled separately, via the parser (see below). - def replace_literals(d: dict[str, Any], entry: str) -> None: - if args.literals: - d[entry] = anonymize_literals_in_sql(d[entry]) - - # TODO: The create_sql replacements are more of a heuristic because we might overwrite identifiers and same name existing twice. There are two alternatives: - # 1. Wrap the Mz parser in Python, parse the SQL, resolve what they map to, rename, and reserialize. - # 2. Spin up Materialize, RENAME everything. Doesn't work for column names? Then take a new recording - for cluster in new["clusters"].values(): - replace_identifiers(cluster, "create_sql") - for db in new["databases"].values(): - for schema in db.values(): - for table in schema["tables"].values(): - for column in table["columns"]: - if args.identifiers and column["type"] in mapping: - column["type"] = mapping[column["type"]] - replace_identifiers(table, "create_sql") - replace_literals(table, "create_sql") - for typ in schema["types"].values(): - replace_identifiers(typ, "create_sql") - replace_literals(typ, "create_sql") - for conn in schema["connections"].values(): - replace_identifiers(conn, "create_sql") - # Connection create_sql carries hostnames, usernames, regions, - # bucket/broker URLs, etc. as string literals; anonymize them. - replace_literals(conn, "create_sql") - for source in schema["sources"].values(): - for column in source.get("columns", []): - if args.identifiers and column["type"] in mapping: - column["type"] = mapping[column["type"]] - for child in source.get("children", {}).values(): - if args.identifiers: + # Regex fallback, used only when the helper is unavailable or a statement + # does not parse. The identifier substitution is the corruption-prone + # heuristic the AST replaces; it is the degraded path. + fallback_pattern = ( + re.compile("|".join(map(re.escape, sorted(mapping, key=len, reverse=True)))) + if args.identifiers and mapping + else None + ) + + def fallback_rewrite(sql: str, redact: bool) -> str: + if fallback_pattern is not None: + sql = fallback_pattern.sub(lambda m: mapping[m.group(0)], sql) + if redact and args.literals: + sql = anonymize_literals_in_sql(sql) + return sql + + # Structural identifier fields that are not SQL text (the helper never sees + # them): column type references, child schema/database, and query routing. + if args.identifiers: + for db in new["databases"].values(): + for schema in db.values(): + for table in schema["tables"].values(): + for column in table["columns"]: + if column["type"] in mapping: + column["type"] = mapping[column["type"]] + for source in schema["sources"].values(): + for column in source.get("columns", []): + if column["type"] in mapping: + column["type"] = mapping[column["type"]] + for child in source.get("children", {}).values(): # A child's schema/database may be a builtin or otherwise - # uncaptured name that never entered the mapping; leave - # those as-is rather than crashing. + # uncaptured name not in the mapping; leave those as-is. child["schema"] = mapping.get(child["schema"], child["schema"]) child["database"] = mapping.get( child["database"], child["database"] ) - for column in child["columns"]: - if args.identifiers and column["type"] in mapping: - column["type"] = mapping[column["type"]] - replace_identifiers(child, "create_sql") - replace_literals(child, "create_sql") - replace_identifiers(source, "create_sql") - replace_literals(source, "create_sql") - for view in schema["views"].values(): - replace_identifiers(view, "create_sql") - replace_literals(view, "create_sql") - for mv in schema["materialized_views"].values(): - replace_identifiers(mv, "create_sql") - replace_literals(mv, "create_sql") - for index in schema["indexes"].values(): - replace_identifiers(index, "create_sql") - replace_literals(index, "create_sql") - for sink in schema["sinks"].values(): - replace_identifiers(sink, "create_sql") - # Sink create_sql carries topic names, broker lists, and - # bucket/path URLs as string literals; anonymize them. - replace_literals(sink, "create_sql") - query_literal_targets: list[dict[str, Any]] = [] - for query in workload["queries"]: - if args.identifiers: + for column in child["columns"]: + if column["type"] in mapping: + column["type"] = mapping[column["type"]] + for query in workload["queries"]: query["cluster"] = mapping.get(query["cluster"], query["cluster"]) query["database"] = mapping.get(query["database"], query["database"]) query["search_path"] = [ mapping.get(schema, schema) for schema in query["search_path"] ] - replace_identifiers(query, "sql") - if args.literals: - query_literal_targets.append(query) + for query in workload["queries"]: new["queries"].append(query) - # Redact literals in query SQL with Materialize's own parser, in one batch. - # The parser handles every literal form the dialect supports (numbers, hex - # strings, intervals, dollar-quoted and escape strings) where the regex only - # caught single-quoted strings. - # - # --require-parser gates only the wholesale case: if the parser binary is - # unavailable, the tool errors rather than silently redacting every query - # with the weaker regex. Individual statements that do not parse fall back - # to the regex with a warning either way (this is a property of the captured - # SQL, not of whether the parser is present), and the verify pass still - # scans the result. - if query_literal_targets: - sqls = [q["sql"] for q in query_literal_targets] - redacted = redact_literals_via_parser(sqls) - if redacted is None: - if args.require_parser: - print( - "error: mz-sql-anonymize helper not found, so query literals " - "cannot be redacted with the parser. Build it with:\n" - " cargo build --release -p mz-sql-anonymize\n" - "or pass --no-require-parser to fall back to a weaker regex " - "that only redacts single-quoted strings (missing numbers, " - "dollar-quoted strings, and comments).", - file=sys.stderr, - ) - return 1 - print( - "warning: mz-sql-anonymize helper not found; using regex literal " - "redaction for queries, which misses numbers, dollar-quoted " - "strings, and comments (--no-require-parser).", - file=sys.stderr, - ) - for q in query_literal_targets: - q["sql"] = anonymize_literals_in_sql(q["sql"]) - else: - unparsed = [i for i, red in enumerate(redacted) if red is None] - if unparsed: - print( - f"warning: mz-sql-anonymize could not parse {len(unparsed)} of " - f"{len(redacted)} captured queries; falling back to the regex " - "for those (it only redacts single-quoted strings). The verify " - "pass still scans them.", - file=sys.stderr, - ) - for q, red in zip(query_literal_targets, redacted): - q["sql"] = ( - red if red is not None else anonymize_literals_in_sql(q["sql"]) - ) + # Rewrite each group of create_sql/sql strings through the helper. `redact` + # asks the AST to redact literals (queries); `regex_literals` applies the + # DDL literal regex on top (DDL, but not clusters, whose config is kept). + n_unparsed = 0 + + def anonymize_group( + items: list[dict[str, Any]], + key: str, + *, + redact: bool, + regex_literals: bool, + ) -> None: + nonlocal n_unparsed + targets = [d for d in items if isinstance(d.get(key), str)] + if not targets: + return + sqls = [d[key] for d in targets] + results = ( + anonymize_sql_via_parser(sqls, mapping, args.identifiers, redact) + if binary_available + else None + ) + for i, d in enumerate(targets): + out = results[i] if results is not None else None + if out is None: + if results is not None: + n_unparsed += 1 + d[key] = fallback_rewrite(d[key], redact=redact or regex_literals) + else: + d[key] = out + if regex_literals and args.literals: + d[key] = anonymize_literals_in_sql(d[key]) + + clusters = list(new["clusters"].values()) + ddl: list[dict[str, Any]] = [] + for db in new["databases"].values(): + for schema in db.values(): + for group in ( + "tables", + "types", + "connections", + "sources", + "views", + "materialized_views", + "indexes", + "sinks", + ): + for obj in schema[group].values(): + ddl.append(obj) + if group == "sources": + ddl.extend(obj.get("children", {}).values()) + + # Clusters: rename only, keep config literals. + anonymize_group(clusters, "create_sql", redact=False, regex_literals=False) + # Other DDL: rename via AST, redact literals via regex (catches option strings). + anonymize_group(ddl, "create_sql", redact=False, regex_literals=True) + # Queries: rename + redact literals, both on the AST. + anonymize_group(new["queries"], "sql", redact=args.literals, regex_literals=False) + + if n_unparsed: + print( + f"warning: mz-sql-anonymize could not parse {n_unparsed} statement(s); " + "fell back to the regex for those (it only redacts single-quoted " + "strings, and its identifier substitution can corrupt SQL). The verify " + "pass still scans them.", + file=sys.stderr, + ) if args.verify: problems = verify_anonymized(new, mapping, args) diff --git a/misc/python/materialize/cli/mz_workload_anonymize_test.py b/misc/python/materialize/cli/mz_workload_anonymize_test.py index 17c462272e0d1..1e97d7893ed13 100644 --- a/misc/python/materialize/cli/mz_workload_anonymize_test.py +++ b/misc/python/materialize/cli/mz_workload_anonymize_test.py @@ -240,6 +240,19 @@ def test_verify_accepts_both_placeholder_styles() -> None: assert mz_workload_anonymize.verify_anonymized(new, {}, args) == [] +def test_verify_exempts_config_statement_literals() -> None: + # A preserved SET/timeout literal in a config query must not be flagged. + new = { + "clusters": {}, + "databases": {}, + "queries": [ + {"sql": "SET statement_timeout = '5s'", "statement_type": "set_variable"}, + ], + } + args = mock.Mock(identifiers=True, literals=True) + assert mz_workload_anonymize.verify_anonymized(new, {}, args) == [] + + def test_verify_exempts_cluster_literals() -> None: # Cluster create_sql keeps its SIZE literal; verify must not flag it. new = { @@ -253,11 +266,14 @@ def test_verify_exempts_cluster_literals() -> None: assert mz_workload_anonymize.verify_anonymized(new, {}, args) == [] -def test_redact_via_parser_returns_none_without_binary( +def test_anonymize_via_parser_returns_none_without_binary( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(mz_workload_anonymize, "_locate_redactor", lambda: None) - assert mz_workload_anonymize.redact_literals_via_parser(["SELECT 1"]) is None + assert ( + mz_workload_anonymize.anonymize_sql_via_parser(["SELECT 1"], {}, True, True) + is None + ) def test_regex_fallback_warns( @@ -287,12 +303,37 @@ def test_require_parser_errors_without_binary( ) def test_parser_path_redacts_numeric_literal(tmp_path: Any) -> None: # The parser path (unlike the regex) redacts numbers in query predicates. - rc, _out, text = run_tool(tmp_path, base_workload()) + rc, _out, text = run_tool(tmp_path, base_workload(), require_parser=True) assert rc == 0 assert "987654321" not in text assert "" in text +@pytest.mark.skipif( + mz_workload_anonymize._locate_redactor() is None, + reason="mz-sql-anonymize binary not built; run `cargo build --release -p mz-sql-anonymize`", +) +def test_parser_path_renames_table_reference(tmp_path: Any) -> None: + # End-to-end proof the AST path renames a custom object reference as a whole + # token (where the old text regex risked substring corruption). `zorp` and + # `zorpcol` are non-keyword names, so both must be renamed and neither may + # survive as a bare word. + import re as _re + + wl = base_workload() + wl["databases"]["customers_db"]["public"]["tables"]["zorp"] = { + "create_sql": "CREATE TABLE zorp (zorpcol int)", + "columns": [{"name": "zorpcol", "type": "int4"}], + "rows": 1, + } + wl["queries"][0]["sql"] = "SELECT zorpcol FROM zorp" + rc, out, text = run_tool(tmp_path, wl, require_parser=True) + assert rc == 0 + assert out is not None + assert not _re.search(r"(? dict[str, Any]: """A workload with a CDC source whose subsources (children) are keyed by their fully-qualified `database.schema.name`.""" diff --git a/src/sql-anonymize/Cargo.toml b/src/sql-anonymize/Cargo.toml index e4eee1c042a93..a5da37d63ab2a 100644 --- a/src/sql-anonymize/Cargo.toml +++ b/src/sql-anonymize/Cargo.toml @@ -15,6 +15,7 @@ path = "src/main.rs" [dependencies] mz-sql-parser = { path = "../sql-parser", default-features = false } +serde = { workspace = true, features = ["derive"] } serde_json.workspace = true [dev-dependencies] diff --git a/src/sql-anonymize/src/main.rs b/src/sql-anonymize/src/main.rs index e31cfbd36ab6f..b0ad490289a44 100644 --- a/src/sql-anonymize/src/main.rs +++ b/src/sql-anonymize/src/main.rs @@ -7,83 +7,310 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -//! Redacts literals from SQL using Materialize's own parser. +//! Anonymizes SQL using Materialize's own parser. //! -//! This is a helper for `bin/mz-workload-anonymize`. Doing literal redaction -//! with the real parser (rather than a regex) handles every literal form the -//! dialect supports — quoted strings with `''` escapes, escape strings, -//! dollar-quoted strings, numbers, hex strings, and intervals — and reuses the -//! exact `''` placeholder the rest of Materialize uses to turn -//! "customer data" into "usage data". +//! This is the SQL-rewriting half of `bin/mz-workload-anonymize`. Doing the +//! work on the parsed AST — rather than with text substitution — is what makes +//! it correct: identifiers are renamed as whole tokens (no substring or +//! in-string corruption, no word-boundary or case guesswork), and literals are +//! redacted in every position the dialect allows, including option values like +//! connection hosts and sink topics that the engine's redacted Display leaves +//! intact. //! -//! Protocol: reads a JSON array of SQL strings on stdin and writes a JSON -//! array of the same length on stdout. Each output element is either the -//! redacted SQL or `null` when the input could not be parsed, in which case -//! the caller should fall back to its own redaction for that element. +//! Protocol: reads a JSON request on stdin and writes a JSON array on stdout, +//! one element per input statement — the rewritten SQL, or `null` when the +//! statement could not be parsed (the caller falls back to its regex for those). +//! +//! ```json +//! { +//! "mapping": {"orders": "table_1", "auction_house": "db_0"}, +//! "rename_identifiers": true, +//! "redact_literals": true, +//! "statements": ["SELECT * FROM orders WHERE id = 7", "..."] +//! } +//! ``` +use std::collections::BTreeMap; use std::io::{self, Read, Write}; use mz_sql_parser::ast::display::AstDisplay; +use mz_sql_parser::ast::visit_mut::{self, VisitMut}; +use mz_sql_parser::ast::{ + Ident, Raw, RawClusterName, RawDataType, RawItemName, RawNetworkPolicyName, Statement, + UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, +}; use mz_sql_parser::parser; +use serde::Deserialize; + +#[derive(Deserialize)] +struct Request { + /// Original identifier -> anonymized identifier. Entries that map a name to + /// itself (e.g. preserved keywords) are no-ops. + #[serde(default)] + mapping: BTreeMap, + #[serde(default)] + rename_identifiers: bool, + #[serde(default)] + redact_literals: bool, + statements: Vec, +} + +struct Anonymizer<'a> { + mapping: &'a BTreeMap, + rename: bool, + redact: bool, +} + +impl Anonymizer<'_> { + fn rename_ident(&self, ident: &mut Ident) { + if self.rename { + if let Some(new) = self.mapping.get(ident.as_str()) { + *ident = Ident::new_unchecked(new.clone()); + } + } + } + + fn rename_idents(&self, idents: &mut [Ident]) { + for ident in idents { + self.rename_ident(ident); + } + } +} + +impl<'ast> VisitMut<'ast, Raw> for Anonymizer<'_> { + fn visit_ident_mut(&mut self, node: &'ast mut Ident) { + self.rename_ident(node); + } + + fn visit_value_mut(&mut self, node: &'ast mut Value) { + if self.redact { + match node { + // Collapse every data-bearing literal to a single string + // placeholder. Using a string (rather than redacting in place) + // keeps the output reparseable — `x = ''` is valid + // even where `x` is numeric — and matches the `''` + // sentinel the replay tooling already recognizes. + Value::Number(_) | Value::String(_) | Value::HexString(_) | Value::Interval(_) => { + *node = Value::String("".to_string()); + } + // Booleans and NULL are not sensitive. + Value::Boolean(_) | Value::Null => {} + } + } + } + + // The `Raw` AstInfo associated types below have no-op default visitors (the + // generic visitor cannot see into an associated type), yet they hold the + // identifiers for object/cluster/type references. Override each to descend + // to its `Ident`s so renaming reaches them. + + fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) { + match node { + RawItemName::Name(n) | RawItemName::Id(_, n, _) => self.rename_idents(&mut n.0), + } + } + + fn visit_unresolved_item_name_mut(&mut self, node: &'ast mut UnresolvedItemName) { + self.rename_idents(&mut node.0); + } + + fn visit_column_reference_mut(&mut self, node: &'ast mut Ident) { + self.rename_ident(node); + } + + fn visit_schema_name_mut(&mut self, node: &'ast mut UnresolvedSchemaName) { + self.rename_idents(&mut node.0); + } + + fn visit_database_name_mut(&mut self, node: &'ast mut UnresolvedDatabaseName) { + self.rename_ident(&mut node.0); + } + + fn visit_cluster_name_mut(&mut self, node: &'ast mut RawClusterName) { + if let RawClusterName::Unresolved(ident) = node { + self.rename_ident(ident); + } + } + + fn visit_network_policy_name_mut(&mut self, node: &'ast mut RawNetworkPolicyName) { + if let RawNetworkPolicyName::Unresolved(ident) = node { + self.rename_ident(ident); + } + } + + fn visit_data_type_mut(&mut self, node: &'ast mut RawDataType) { + match node { + RawDataType::Array(inner) | RawDataType::List(inner) => self.visit_data_type_mut(inner), + RawDataType::Map { + key_type, + value_type, + } => { + self.visit_data_type_mut(key_type); + self.visit_data_type_mut(value_type); + } + RawDataType::Other { name, .. } => self.visit_item_name_mut(name), + } + } + + fn visit_object_name_mut(&mut self, node: &'ast mut UnresolvedObjectName) { + match node { + UnresolvedObjectName::Cluster(i) + | UnresolvedObjectName::Role(i) + | UnresolvedObjectName::NetworkPolicy(i) => self.rename_ident(i), + UnresolvedObjectName::Database(n) => self.rename_ident(&mut n.0), + UnresolvedObjectName::Schema(n) => self.rename_idents(&mut n.0), + UnresolvedObjectName::Item(n) => self.rename_idents(&mut n.0), + UnresolvedObjectName::ClusterReplica(_) => visit_mut::visit_object_name_mut(self, node), + } + } +} + +/// Statement kinds whose literals are non-sensitive configuration — cluster +/// sizing/replication and session/system settings (timeouts, isolation, feature +/// flags) — which replay needs preserved verbatim. Identifiers in these +/// statements are still renamed; only literal redaction is skipped. +fn preserves_literals(stmt: &Statement) -> bool { + matches!( + stmt, + Statement::CreateCluster(_) + | Statement::CreateClusterReplica(_) + | Statement::AlterCluster(_) + | Statement::SetVariable(_) + | Statement::ResetVariable(_) + | Statement::SetTransaction(_) + | Statement::AlterSystemSet(_) + | Statement::AlterSystemReset(_) + ) +} -/// Redacts every literal in `sql`, or returns `None` if it does not parse. -/// -/// A workload may hold more than one statement per entry, so we redact each -/// parsed statement and rejoin them with `; `. -fn redact(sql: &str) -> Option { +/// Anonymizes one workload entry (which may hold more than one statement), +/// returning `None` if it does not parse. +fn anonymize( + sql: &str, + mapping: &BTreeMap, + rename: bool, + redact: bool, +) -> Option { let stmts = parser::parse_statements(sql).ok()?; - let redacted: Vec = stmts - .iter() - .map(|s| s.ast.to_ast_string_redacted()) + let rewritten: Vec = stmts + .into_iter() + .map(|parsed| { + let mut ast = parsed.ast; + let mut visitor = Anonymizer { + mapping, + rename, + redact: redact && !preserves_literals(&ast), + }; + visitor.visit_statement_mut(&mut ast); + ast.to_ast_string_simple() + }) .collect(); - Some(redacted.join("; ")) + Some(rewritten.join("; ")) } fn main() -> io::Result<()> { let mut input = String::new(); io::stdin().read_to_string(&mut input)?; - let sqls: Vec = + let req: Request = serde_json::from_str(&input).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let redacted: Vec> = sqls.iter().map(|sql| redact(sql)).collect(); + let result: Vec> = req + .statements + .iter() + .map(|sql| { + anonymize( + sql, + &req.mapping, + req.rename_identifiers, + req.redact_literals, + ) + }) + .collect(); - let out = serde_json::to_string(&redacted)?; + let out = serde_json::to_string(&result)?; io::stdout().write_all(out.as_bytes())?; Ok(()) } #[cfg(test)] mod tests { - use super::redact; + use std::collections::BTreeMap; + + use super::anonymize; + + fn map(pairs: &[(&str, &str)]) -> BTreeMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } #[mz_ore::test] - fn redacts_strings_and_numbers() { - let redacted = redact("SELECT 'secret', 42 FROM t WHERE name = 'alice'").expect("parses"); - assert!(!redacted.contains("secret"), "{redacted}"); - assert!(!redacted.contains("alice"), "{redacted}"); - assert!(!redacted.contains("42"), "{redacted}"); - assert!(redacted.contains(""), "{redacted}"); - // Identifiers must be preserved; only literals are redacted here. - assert!(redacted.contains("name"), "{redacted}"); + fn renames_table_reference() { + // The whole point of the AST approach: an object reference in FROM is an + // item name (an AstInfo associated type), which the old regex mangled + // and a naive visitor skips. It must be renamed as a whole token. + let m = map(&[("orders", "table_1")]); + let out = anonymize("SELECT id FROM orders", &m, true, false).expect("parses"); + assert!(out.contains("table_1"), "{out}"); + assert!(out.contains("id"), "id is unmapped, keep it: {out}"); + assert!(!out.contains("orders"), "{out}"); } #[mz_ore::test] - fn preserves_quoted_identifier_that_looks_like_a_literal() { - // A column named with embedded spaces is double-quoted, not a literal, - // and must survive redaction. - let redacted = redact(r#"SELECT "my col" FROM t"#).expect("parses"); - assert!(redacted.contains(r#""my col""#), "{redacted}"); + fn renames_qualified_reference() { + let m = map(&[("mydb", "db_0"), ("myschema", "schema_1"), ("t", "table_1")]); + let out = anonymize("SELECT * FROM mydb.myschema.t", &m, true, false).expect("parses"); + assert!(out.contains("db_0.schema_1.table_1"), "{out}"); } #[mz_ore::test] - fn returns_none_on_parse_error() { - assert_eq!(redact("SELEC not valid sql"), None); + fn does_not_rename_inside_other_identifiers() { + // The old regex would rewrite `id` inside `user_id`; the AST does not. + let m = map(&[("id", "column_1")]); + let out = anonymize("SELECT user_id FROM t", &m, true, false).expect("parses"); + assert!(out.contains("user_id"), "{out}"); + } + + #[mz_ore::test] + fn redacts_query_literals_including_numbers() { + let m = map(&[]); + let out = + anonymize("SELECT 'secret', 42 FROM t WHERE x = 'a'", &m, false, true).expect("parses"); + assert!(!out.contains("secret"), "{out}"); + assert!(!out.contains("42"), "{out}"); + assert!(out.contains(""), "{out}"); + } + + #[mz_ore::test] + fn does_not_rename_inside_string_literals() { + // A literal containing a word that matches a renamed identifier must not + // be touched by renaming (it is data). + let m = map(&[("orders", "table_1")]); + let out = anonymize("SELECT 'orders' FROM orders", &m, true, false).expect("parses"); + assert!(out.contains("'orders'"), "{out}"); + assert!(out.contains("table_1"), "{out}"); } #[mz_ore::test] - fn redacts_each_statement_in_a_multi_statement_entry() { - let redacted = redact("SELECT 'a'; SELECT 'b'").expect("parses"); - assert!(!redacted.contains("'a'"), "{redacted}"); - assert!(!redacted.contains("'b'"), "{redacted}"); + fn preserves_cluster_config_literals() { + // Cluster sizing is config replay needs; rename the name, keep the size. + let m = map(&[("prod", "cluster_0")]); + let out = + anonymize("CREATE CLUSTER prod (SIZE = '100cc')", &m, true, true).expect("parses"); + assert!(out.contains("'100cc'"), "size must be preserved: {out}"); + assert!(out.contains("cluster_0"), "{out}"); + } + + #[mz_ore::test] + fn preserves_set_config_literals() { + let m = map(&[]); + let out = anonymize("SET statement_timeout = '5s'", &m, true, true).expect("parses"); + assert!(out.contains("'5s'"), "timeout must be preserved: {out}"); + } + + #[mz_ore::test] + fn returns_none_on_parse_error() { + assert_eq!(anonymize("SELEC not valid", &map(&[]), true, true), None); } } diff --git a/test/workload-replay/README.md b/test/workload-replay/README.md index 9a2800ad78ce3..19df69e4c4f6c 100644 --- a/test/workload-replay/README.md +++ b/test/workload-replay/README.md @@ -75,21 +75,30 @@ Anonymizes identifiers and literals in workload captures for sharing without exp - Table names → `table_1`, `table_2`, ... - Column names → `column_1`, `column_2`, ... - View, materialized view, source, sink, connection names -- All identifiers in `create_sql` definitions and queries +- All identifier references in `create_sql` definitions and queries + +Identifier renaming is done on the parsed AST by `mz-sql-anonymize`, which +renames whole identifier tokens. A text regex (the previous approach) corrupted +SQL by matching identifiers as substrings or inside string literals; the AST +does not. *Literals (`--literals`, enabled by default):* -- Query SQL is redacted with Materialize's own parser (`mz-sql-anonymize`), - replacing all literals — strings, numbers, hex strings, intervals — with - `''`. **The parser binary is required by default**: if it is not - built, the tool errors instead of silently redacting every query with the - weaker regex. Pass `--no-require-parser` to allow that regex fallback (it - only catches single-quoted strings, missing numbers, dollar-quoted strings, - and comments). Individual statements that do not parse fall back to the regex - with a warning regardless, and the verify pass still scans them. -- `create_sql` strings (including connection hosts/users, sink topics, source - options, and column defaults) → `'literal_1'`, `'literal_2'`, ... via regex. - The parser is not used here because `to_ast_string_redacted()` intentionally - does not redact DDL option strings. +- Query SQL is redacted on the AST, replacing every literal — strings, numbers, + hex strings, intervals — with `''`. +- `create_sql` strings (connection hosts/users, sink topics, source options, + column defaults) → `'literal_1'`, `'literal_2'`, ... via a blanket regex. The + AST is not used for these because option values like broker addresses are + typed fields the parser does not treat as redactable literals (the engine's + own redacted Display leaves them intact too). +- Cluster sizing/replication and session/system config (`SET`/`RESET`/`ALTER + SYSTEM`, e.g. timeouts) are **preserved** — replay needs them and they are not + sensitive. + +**The parser binary is required by default**: if it is not built, the tool +errors rather than fall back to the corruption-prone regex for everything. Pass +`--no-require-parser` to allow that fallback. Individual statements that do not +parse fall back to the regex with a warning regardless, and the verify pass +still scans them. Build the helper once (required for the default `--require-parser` mode): ```bash