Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

322 changes: 190 additions & 132 deletions misc/python/materialize/cli/mz_workload_anonymize.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,42 @@ def _locate_redactor() -> list[str] | None:
return None


def redact_literals_via_parser(sqls: list[str]) -> list[str | None] | None:
"""Redact literals in each SQL string using Materialize's own parser.

Returns a list aligned with the input, where each element is the redacted
SQL or None if that statement could not be parsed. Returns None for the
whole batch if the helper binary is unavailable or errors, signaling the
caller to fall back to regex-based redaction.
def anonymize_sql_via_parser(
sqls: list[str],
mapping: dict[str, str],
rename_identifiers: bool,
redact_literals: bool,
) -> list[str | None] | None:
"""Rename identifiers and/or redact literals in each SQL string via the AST.

Uses the `mz-sql-anonymize` helper, which parses each statement with
Materialize's own parser, renames identifier tokens per `mapping`, and (when
`redact_literals`) replaces literal values with `'<REDACTED>'`. Doing this on
the AST avoids the corruption a text regex causes (substring matches,
in-string rewrites, broken syntax).

Returns a list aligned with the input — the rewritten SQL, or None for a
statement that did not parse. Returns None for the whole batch if the helper
binary is unavailable or errors, signaling the caller to fall back to regex.
"""
cmd = _locate_redactor()
if cmd is None:
return None
request = {
"mapping": mapping,
"rename_identifiers": rename_identifiers,
"redact_literals": redact_literals,
"statements": sqls,
}
proc = subprocess.run(
cmd,
input=json.dumps(sqls),
input=json.dumps(request),
capture_output=True,
text=True,
)
if proc.returncode != 0:
print(
f"warning: {cmd[0]} failed, falling back to regex redaction:\n{proc.stderr}",
f"warning: {cmd[0]} failed, falling back to regex:\n{proc.stderr}",
file=sys.stderr,
)
return None
Expand Down Expand Up @@ -135,6 +151,21 @@ def keywords() -> set[str]:
)


# Query statement types whose literals are non-sensitive config (session/system
# settings such as timeouts and isolation). The anonymizer preserves these — see
# `preserves_literals` in the mz-sql-anonymize helper — so verify must not flag
# them. Cluster DDL is preserved too but lives in create_sql, handled separately.
CONFIG_STATEMENT_TYPES = frozenset(
{
"set_variable",
"reset_variable",
"set_transaction",
"alter_system_set",
"alter_system_reset",
}
)


def _iter_sql(obj: Any, path: str = "") -> Any:
"""Yield (location, sql) for every create_sql/sql string in the workload."""
if isinstance(obj, dict):
Expand Down Expand Up @@ -218,10 +249,26 @@ def check_identifiers(location: str, text: str) -> None:
if location.endswith(".<KEY>") and text not in RESERVED_FORMAT_KEYS:
check_identifiers(location, text)

query_location = re.compile(r"\.queries\[(\d+)\]\.sql$")

def literals_preserved(location: str) -> bool:
# Cluster create_sql and config statements (SET/RESET/ALTER SYSTEM) keep
# their literals on purpose; the anonymizer does not redact them, so the
# verify pass must not flag them.
if location.startswith(".clusters"):
return True
m = query_location.match(location)
if (
m
and new["queries"][int(m.group(1))].get("statement_type")
in CONFIG_STATEMENT_TYPES
):
return True
return False

for location, sql in _iter_sql(new):
check_identifiers(location, sql)
# Literal check runs only over SQL, and exempts cluster create_sql.
if args.literals and not location.startswith(".clusters"):
if args.literals and not literals_preserved(location):
for match in string_literal.finditer(sql):
if not placeholder.fullmatch(match.group(0)):
problems.append(
Expand Down Expand Up @@ -489,141 +536,152 @@ def anonymize_column_default(column: dict[str, Any]) -> None:
new_database[new_schema_name] = new_schema
new["databases"][new_db_name] = new_database

# TODO: Case discrepancies are not handled. You can call a column `mintimestamp`, but then use it as `minTimestamp`
if args.identifiers:
pattern = re.compile(
"|".join(map(re.escape, sorted(mapping, key=len, reverse=True)))
# --- Pass 2: rewrite SQL text ---
#
# Identifier renaming and query-literal redaction run on the AST via the
# mz-sql-anonymize helper, which renames whole identifier tokens and redacts
# literal values without the corruption a text regex causes. DDL create_sql
# literals are still scrubbed with a blanket regex, because option strings
# (connection hosts/broker addresses, sink topics) are typed fields the AST
# — like the engine's own redacted Display — does not treat as redactable
# literals. Cluster/SET config literals (sizes, timeouts) are preserved by
# the helper and never regex-redacted here.

binary_available = _locate_redactor() is not None
if not binary_available:
if args.require_parser:
print(
"error: mz-sql-anonymize helper not found, so SQL cannot be "
"anonymized with the parser. Build it with:\n"
" cargo build --release -p mz-sql-anonymize\n"
"or pass --no-require-parser to fall back to a regex that renames "
"identifiers by text substitution (which can corrupt SQL) and only "
"redacts single-quoted string literals.",
file=sys.stderr,
)
return 1
print(
"warning: mz-sql-anonymize helper not found; falling back to regex "
"identifier substitution and literal redaction for all SQL "
"(--no-require-parser).",
file=sys.stderr,
)

def replace_identifiers(d: dict[str, Any], entry: str) -> None:
if args.identifiers:
d[entry] = pattern.sub(lambda m: mapping[m.group(0)], d[entry])

# DDL create_sql is redacted with the blanket regex: option strings like
# connection hosts, sink topics, and source options must be scrubbed, and
# the parser's to_ast_string_redacted() intentionally does NOT redact those
# (it only redacts expression/value literals, treating DDL options as
# config). Query SQL is handled separately, via the parser (see below).
def replace_literals(d: dict[str, Any], entry: str) -> None:
if args.literals:
d[entry] = anonymize_literals_in_sql(d[entry])

# TODO: The create_sql replacements are more of a heuristic because we might overwrite identifiers and same name existing twice. There are two alternatives:
# 1. Wrap the Mz parser in Python, parse the SQL, resolve what they map to, rename, and reserialize.
# 2. Spin up Materialize, RENAME everything. Doesn't work for column names? Then take a new recording
for cluster in new["clusters"].values():
replace_identifiers(cluster, "create_sql")
for db in new["databases"].values():
for schema in db.values():
for table in schema["tables"].values():
for column in table["columns"]:
if args.identifiers and column["type"] in mapping:
column["type"] = mapping[column["type"]]
replace_identifiers(table, "create_sql")
replace_literals(table, "create_sql")
for typ in schema["types"].values():
replace_identifiers(typ, "create_sql")
replace_literals(typ, "create_sql")
for conn in schema["connections"].values():
replace_identifiers(conn, "create_sql")
# Connection create_sql carries hostnames, usernames, regions,
# bucket/broker URLs, etc. as string literals; anonymize them.
replace_literals(conn, "create_sql")
for source in schema["sources"].values():
for column in source.get("columns", []):
if args.identifiers and column["type"] in mapping:
column["type"] = mapping[column["type"]]
for child in source.get("children", {}).values():
if args.identifiers:
# Regex fallback, used only when the helper is unavailable or a statement
# does not parse. The identifier substitution is the corruption-prone
# heuristic the AST replaces; it is the degraded path.
fallback_pattern = (
re.compile("|".join(map(re.escape, sorted(mapping, key=len, reverse=True))))
if args.identifiers and mapping
else None
)

def fallback_rewrite(sql: str, redact: bool) -> str:
if fallback_pattern is not None:
sql = fallback_pattern.sub(lambda m: mapping[m.group(0)], sql)
if redact and args.literals:
sql = anonymize_literals_in_sql(sql)
return sql

# Structural identifier fields that are not SQL text (the helper never sees
# them): column type references, child schema/database, and query routing.
if args.identifiers:
for db in new["databases"].values():
for schema in db.values():
for table in schema["tables"].values():
for column in table["columns"]:
if column["type"] in mapping:
column["type"] = mapping[column["type"]]
for source in schema["sources"].values():
for column in source.get("columns", []):
if column["type"] in mapping:
column["type"] = mapping[column["type"]]
for child in source.get("children", {}).values():
# A child's schema/database may be a builtin or otherwise
# uncaptured name that never entered the mapping; leave
# those as-is rather than crashing.
# uncaptured name not in the mapping; leave those as-is.
child["schema"] = mapping.get(child["schema"], child["schema"])
child["database"] = mapping.get(
child["database"], child["database"]
)
for column in child["columns"]:
if args.identifiers and column["type"] in mapping:
column["type"] = mapping[column["type"]]
replace_identifiers(child, "create_sql")
replace_literals(child, "create_sql")
replace_identifiers(source, "create_sql")
replace_literals(source, "create_sql")
for view in schema["views"].values():
replace_identifiers(view, "create_sql")
replace_literals(view, "create_sql")
for mv in schema["materialized_views"].values():
replace_identifiers(mv, "create_sql")
replace_literals(mv, "create_sql")
for index in schema["indexes"].values():
replace_identifiers(index, "create_sql")
replace_literals(index, "create_sql")
for sink in schema["sinks"].values():
replace_identifiers(sink, "create_sql")
# Sink create_sql carries topic names, broker lists, and
# bucket/path URLs as string literals; anonymize them.
replace_literals(sink, "create_sql")
query_literal_targets: list[dict[str, Any]] = []
for query in workload["queries"]:
if args.identifiers:
for column in child["columns"]:
if column["type"] in mapping:
column["type"] = mapping[column["type"]]
for query in workload["queries"]:
query["cluster"] = mapping.get(query["cluster"], query["cluster"])
query["database"] = mapping.get(query["database"], query["database"])
query["search_path"] = [
mapping.get(schema, schema) for schema in query["search_path"]
]
replace_identifiers(query, "sql")
if args.literals:
query_literal_targets.append(query)
for query in workload["queries"]:
new["queries"].append(query)

# Redact literals in query SQL with Materialize's own parser, in one batch.
# The parser handles every literal form the dialect supports (numbers, hex
# strings, intervals, dollar-quoted and escape strings) where the regex only
# caught single-quoted strings.
#
# --require-parser gates only the wholesale case: if the parser binary is
# unavailable, the tool errors rather than silently redacting every query
# with the weaker regex. Individual statements that do not parse fall back
# to the regex with a warning either way (this is a property of the captured
# SQL, not of whether the parser is present), and the verify pass still
# scans the result.
if query_literal_targets:
sqls = [q["sql"] for q in query_literal_targets]
redacted = redact_literals_via_parser(sqls)
if redacted is None:
if args.require_parser:
print(
"error: mz-sql-anonymize helper not found, so query literals "
"cannot be redacted with the parser. Build it with:\n"
" cargo build --release -p mz-sql-anonymize\n"
"or pass --no-require-parser to fall back to a weaker regex "
"that only redacts single-quoted strings (missing numbers, "
"dollar-quoted strings, and comments).",
file=sys.stderr,
)
return 1
print(
"warning: mz-sql-anonymize helper not found; using regex literal "
"redaction for queries, which misses numbers, dollar-quoted "
"strings, and comments (--no-require-parser).",
file=sys.stderr,
)
for q in query_literal_targets:
q["sql"] = anonymize_literals_in_sql(q["sql"])
else:
unparsed = [i for i, red in enumerate(redacted) if red is None]
if unparsed:
print(
f"warning: mz-sql-anonymize could not parse {len(unparsed)} of "
f"{len(redacted)} captured queries; falling back to the regex "
"for those (it only redacts single-quoted strings). The verify "
"pass still scans them.",
file=sys.stderr,
)
for q, red in zip(query_literal_targets, redacted):
q["sql"] = (
red if red is not None else anonymize_literals_in_sql(q["sql"])
)
# Rewrite each group of create_sql/sql strings through the helper. `redact`
# asks the AST to redact literals (queries); `regex_literals` applies the
# DDL literal regex on top (DDL, but not clusters, whose config is kept).
n_unparsed = 0

def anonymize_group(
items: list[dict[str, Any]],
key: str,
*,
redact: bool,
regex_literals: bool,
) -> None:
nonlocal n_unparsed
targets = [d for d in items if isinstance(d.get(key), str)]
if not targets:
return
sqls = [d[key] for d in targets]
results = (
anonymize_sql_via_parser(sqls, mapping, args.identifiers, redact)
if binary_available
else None
)
for i, d in enumerate(targets):
out = results[i] if results is not None else None
if out is None:
if results is not None:
n_unparsed += 1
d[key] = fallback_rewrite(d[key], redact=redact or regex_literals)
else:
d[key] = out
if regex_literals and args.literals:
d[key] = anonymize_literals_in_sql(d[key])

clusters = list(new["clusters"].values())
ddl: list[dict[str, Any]] = []
for db in new["databases"].values():
for schema in db.values():
for group in (
"tables",
"types",
"connections",
"sources",
"views",
"materialized_views",
"indexes",
"sinks",
):
for obj in schema[group].values():
ddl.append(obj)
if group == "sources":
ddl.extend(obj.get("children", {}).values())

# Clusters: rename only, keep config literals.
anonymize_group(clusters, "create_sql", redact=False, regex_literals=False)
# Other DDL: rename via AST, redact literals via regex (catches option strings).
anonymize_group(ddl, "create_sql", redact=False, regex_literals=True)
# Queries: rename + redact literals, both on the AST.
anonymize_group(new["queries"], "sql", redact=args.literals, regex_literals=False)

if n_unparsed:
print(
f"warning: mz-sql-anonymize could not parse {n_unparsed} statement(s); "
"fell back to the regex for those (it only redacts single-quoted "
"strings, and its identifier substitution can corrupt SQL). The verify "
"pass still scans them.",
file=sys.stderr,
)

if args.verify:
problems = verify_anonymized(new, mapping, args)
Expand Down
Loading
Loading