From 1b226d386f4939b7b8e02b51d5142cedb9f42de3 Mon Sep 17 00:00:00 2001 From: Gabe Pesco Date: Thu, 21 May 2026 12:04:04 -0400 Subject: [PATCH 1/5] Adding schema ownership Signed-off-by: Gabe Pesco --- sqlmesh/core/config/__init__.py | 1 + sqlmesh/core/config/ownership.py | 42 +++++++++++ sqlmesh/core/config/root.py | 4 ++ sqlmesh/core/config/scheduler.py | 6 ++ sqlmesh/core/engine_adapter/base.py | 14 ++++ sqlmesh/core/engine_adapter/spark.py | 14 ++++ sqlmesh/core/plan/evaluator.py | 7 ++ sqlmesh/core/snapshot/evaluator.py | 11 ++- tests/core/engine_adapter/test_spark.py | 46 +++++++++++++ tests/core/integration/test_config.py | 45 ++++++++++++ tests/core/test_config.py | 92 +++++++++++++++++++++++++ tests/core/test_snapshot_evaluator.py | 68 ++++++++++++++++++ 12 files changed, 349 insertions(+), 1 deletion(-) create mode 100644 sqlmesh/core/config/ownership.py diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index 42ed82c6e6..a57af66b2e 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -32,6 +32,7 @@ load_configs as load_configs, ) from sqlmesh.core.config.migration import MigrationConfig as MigrationConfig +from sqlmesh.core.config.ownership import OwnershipConfig as OwnershipConfig from sqlmesh.core.config.model import ModelDefaultsConfig as ModelDefaultsConfig from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig from sqlmesh.core.config.linter import LinterConfig as LinterConfig diff --git a/sqlmesh/core/config/ownership.py b/sqlmesh/core/config/ownership.py new file mode 100644 index 0000000000..0aca8f1f07 --- /dev/null +++ b/sqlmesh/core/config/ownership.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import re +import typing as t + +from pydantic.functional_validators import BeforeValidator + +from sqlmesh.core.config.base import BaseConfig +from sqlmesh.core.config.common import compile_regex_mapping + +if t.TYPE_CHECKING: + OwnershipMapping = t.Dict[re.Pattern, str] +else: + OwnershipMapping = t.Annotated[ + t.Dict[re.Pattern, str], BeforeValidator(compile_regex_mapping) + ] + + +class OwnershipConfig(BaseConfig): + """Configuration for object ownership rules applied at creation time. + + Maps environment name regex patterns to owner principals. The first + matching pattern wins. Ownership is applied immediately when schemas and + views are created, so even a partially-completed run leaves objects in a + manageable state. + + Example:: + + ownership: + environment_owner_mapping: + "^prod$": "svc_prod_spn" + ".*": "group:shared-developers" + """ + + environment_owner_mapping: OwnershipMapping = {} + + def resolve_owner(self, environment_name: str) -> t.Optional[str]: + """Return the configured owner for the given environment name, or None.""" + for pattern, owner in self.environment_owner_mapping.items(): + if pattern.fullmatch(environment_name): + return owner + return None diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 211d271b01..fae30f6fbd 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -30,6 +30,7 @@ from sqlmesh.core.config.format import FormatConfig from sqlmesh.core.config.gateway import GatewayConfig from sqlmesh.core.config.janitor import JanitorConfig +from sqlmesh.core.config.ownership import OwnershipConfig from sqlmesh.core.config.migration import MigrationConfig from sqlmesh.core.config.model import ModelDefaultsConfig from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig @@ -118,6 +119,7 @@ class Config(BaseConfig): gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway. infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements. environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment. + ownership: Ownership rules applied at schema/view creation time. Maps environment name patterns to owner principals so objects are correctly owned even after a partial run. default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands. log_limit: The default number of logs to keep. format: The formatting options for SQL code. @@ -175,6 +177,7 @@ class Config(BaseConfig): janitor: JanitorConfig = JanitorConfig() cache_dir: t.Optional[str] = None dbt: t.Optional[DbtConfig] = None + ownership: OwnershipConfig = Field(default_factory=OwnershipConfig) _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { "gateways": UpdateStrategy.NESTED_UPDATE, @@ -194,6 +197,7 @@ class Config(BaseConfig): "after_all": UpdateStrategy.EXTEND, "linter": UpdateStrategy.NESTED_UPDATE, "dbt": UpdateStrategy.NESTED_UPDATE, + "ownership": UpdateStrategy.NESTED_UPDATE, } _connection_config_validator = connection_config_validator diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index 9d9d1d3c79..d8c5f7c96e 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -128,12 +128,18 @@ class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig) type_: t.Literal["builtin"] = Field(alias="type", default="builtin") def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: + from sqlmesh.core.config.ownership import OwnershipConfig + + ownership_config = getattr(context.config, "ownership", None) + if isinstance(ownership_config, OwnershipConfig) and not ownership_config.environment_owner_mapping: + ownership_config = None return BuiltInPlanEvaluator( state_sync=context.state_sync, snapshot_evaluator=context.snapshot_evaluator, create_scheduler=context.create_scheduler, default_catalog=context.default_catalog, console=context.console, + ownership_config=ownership_config, ) def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 5465ea1197..9bb408c582 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1418,6 +1418,20 @@ def _create_schema( raise logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e) + def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None: + """Set the owner of a schema. + + No-op by default. Override in dialect-specific adapters that support ownership control + (e.g. Spark/Databricks Unity Catalog: ALTER SCHEMA ... OWNER TO ...). + """ + + def alter_view_owner(self, view_name: TableName, owner: str) -> None: + """Set the owner of a view. + + No-op by default. Override in dialect-specific adapters that support ownership control + (e.g. Spark/Databricks Unity Catalog: ALTER VIEW ... OWNER TO ...). + """ + def drop_schema( self, schema_name: SchemaName, diff --git a/sqlmesh/core/engine_adapter/spark.py b/sqlmesh/core/engine_adapter/spark.py index 9199aa3bcd..6bcb19415b 100644 --- a/sqlmesh/core/engine_adapter/spark.py +++ b/sqlmesh/core/engine_adapter/spark.py @@ -553,6 +553,20 @@ def _build_create_comment_column_exp( return f"ALTER TABLE {table_sql} ALTER COLUMN {column_sql} COMMENT {comment_sql}" + def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None: + schema_sql = exp.to_table(schema_name, dialect=self.dialect).sql( + dialect=self.dialect, identify=True + ) + owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect) + self.execute(f"ALTER SCHEMA {schema_sql} OWNER TO {owner_sql}") + + def alter_view_owner(self, view_name: TableName, owner: str) -> None: + view_sql = exp.to_table(view_name, dialect=self.dialect).sql( + dialect=self.dialect, identify=True + ) + owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect) + self.execute(f"ALTER VIEW {view_sql} OWNER TO {owner_sql}") + @classmethod def _wap_branch_name(cls, wap_id: str) -> str: return f"{cls.WAP_PREFIX}{wap_id}" diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index f2f432a97e..75e85c8082 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -40,6 +40,7 @@ from sqlmesh.core.plan.common import identify_restatement_intervals_across_snapshot_versions from sqlmesh.utils import CorrelationId from sqlmesh.utils.concurrency import NodeExecutionFailedError +from sqlmesh.core.config.ownership import OwnershipConfig from sqlmesh.utils.errors import PlanError, ConflictingPlanError, SQLMeshError from sqlmesh.utils.date import now, to_timestamp @@ -74,12 +75,14 @@ def __init__( create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler], default_catalog: t.Optional[str], console: t.Optional[Console] = None, + ownership_config: t.Optional[OwnershipConfig] = None, ): self.state_sync = state_sync self.snapshot_evaluator = snapshot_evaluator self.create_scheduler = create_scheduler self.default_catalog = default_catalog self.console = console or get_console() + self.ownership_config = ownership_config self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None def evaluate( @@ -434,6 +437,9 @@ def _promote_snapshots( deployability_index: t.Optional[DeployabilityIndex] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: + owner: t.Optional[str] = None + if self.ownership_config: + owner = self.ownership_config.resolve_owner(environment_naming_info.name) self.snapshot_evaluator.promote( target_snapshots, start=plan.start, @@ -449,6 +455,7 @@ def _promote_snapshots( environment_naming_info=environment_naming_info, deployability_index=deployability_index, on_complete=on_complete, + owner=owner, ) def _demote_snapshots( diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 4df9ecb695..1f91cf344f 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -275,6 +275,7 @@ def promote( snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None, table_mapping: t.Optional[t.Dict[str, str]] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, + owner: t.Optional[str] = None, ) -> None: """Promotes the given collection of snapshots in the target environment by replacing a corresponding view with a physical table associated with the given snapshot. @@ -306,7 +307,7 @@ def promote( gateway_table_pairs = [ (gateway, table) for gateway, tables in tables_by_gateway.items() for table in tables ] - self._create_schemas(gateway_table_pairs=gateway_table_pairs) + self._create_schemas(gateway_table_pairs=gateway_table_pairs, owner=owner) # Fetch the view data objects for the promoted snapshots to get them cached self._get_virtual_data_objects(target_snapshots, environment_naming_info) @@ -325,6 +326,7 @@ def promote( environment_naming_info=environment_naming_info, deployability_index=deployability_index, # type: ignore on_complete=on_complete, + owner=owner, ), self.ddl_concurrent_tasks, ) @@ -1257,6 +1259,7 @@ def _promote_snapshot( execution_time: t.Optional[TimeLike] = None, snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None, table_mapping: t.Optional[t.Dict[str, str]] = None, + owner: t.Optional[str] = None, ) -> None: if not snapshot.is_model: return @@ -1298,6 +1301,9 @@ def _promote_snapshot( render_kwargs["snapshots"] = snapshot_by_name adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs)) + if owner: + adapter.alter_view_owner(view_name, owner) + if on_complete is not None: on_complete(snapshot) @@ -1449,6 +1455,7 @@ def _create_catalogs( def _create_schemas( self, gateway_table_pairs: t.Iterable[t.Tuple[t.Optional[str], t.Union[exp.Table, str]]], + owner: t.Optional[str] = None, ) -> None: table_exprs = [(gateway, exp.to_table(t)) for gateway, t in gateway_table_pairs] unique_schemas = { @@ -1464,6 +1471,8 @@ def _create_schema( logger.info("Creating schema '%s'", schema) adapter = self.get_adapter(gateway) adapter.create_schema(schema) + if owner: + adapter.alter_schema_owner(schema, owner) with self.concurrent_context(): concurrent_apply_to_values( diff --git a/tests/core/engine_adapter/test_spark.py b/tests/core/engine_adapter/test_spark.py index d7c3127f05..fd15e29a35 100644 --- a/tests/core/engine_adapter/test_spark.py +++ b/tests/core/engine_adapter/test_spark.py @@ -1092,6 +1092,52 @@ def test_table_format(adapter: SparkEngineAdapter, mocker: MockerFixture): ] +def test_alter_schema_owner(make_mocked_engine_adapter: t.Callable): + adapter = make_mocked_engine_adapter(SparkEngineAdapter) + adapter.alter_schema_owner("catalog.my_schema", "svc_prod_spn") + assert to_sql_calls(adapter) == [ + "ALTER SCHEMA `catalog`.`my_schema` OWNER TO `svc_prod_spn`" + ] + + +def test_alter_schema_owner_three_part_name(make_mocked_engine_adapter: t.Callable): + # Schema references are typically 2-part (catalog.schema), but verify quoting is correct. + adapter = make_mocked_engine_adapter(SparkEngineAdapter) + adapter.alter_schema_owner("my_schema", "svc_prod_spn") + assert to_sql_calls(adapter) == ["ALTER SCHEMA `my_schema` OWNER TO `svc_prod_spn`"] + + +def test_alter_view_owner(make_mocked_engine_adapter: t.Callable): + adapter = make_mocked_engine_adapter(SparkEngineAdapter) + adapter.alter_view_owner("catalog.my_schema.my_view", "svc_prod_spn") + assert to_sql_calls(adapter) == [ + "ALTER VIEW `catalog`.`my_schema`.`my_view` OWNER TO `svc_prod_spn`" + ] + + +def test_alter_view_owner_special_chars_in_principal(make_mocked_engine_adapter: t.Callable): + # Databricks Unity Catalog principals can contain colons and @ signs. + # Verify they are safely backtick-quoted and not interpreted as SQL syntax. + adapter = make_mocked_engine_adapter(SparkEngineAdapter) + adapter.alter_view_owner("catalog.sushi__dev.orders", "group:devs@company.com") + assert to_sql_calls(adapter) == [ + "ALTER VIEW `catalog`.`sushi__dev`.`orders` OWNER TO `group:devs@company.com`" + ] + + +def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable): + # The base EngineAdapter.alter_schema_owner is a no-op: adapters that don't + # support ownership control silently skip it without emitting any SQL. + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + + adapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + adapter.alter_schema_owner("my_schema", "some_owner") + adapter.alter_view_owner("my_schema.my_view", "some_owner") + # No ALTER SQL should have been emitted + alter_calls = [s for s in to_sql_calls(adapter) if "OWNER" in s.upper()] + assert alter_calls == [] + + def test_get_data_object_wap_branch(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): adapter = make_mocked_engine_adapter(SparkEngineAdapter, patch_get_data_objects=False) mocker.patch.object(adapter, "_get_data_objects", return_value=[]) diff --git a/tests/core/integration/test_config.py b/tests/core/integration/test_config.py index 5d571cd7c5..2845057079 100644 --- a/tests/core/integration/test_config.py +++ b/tests/core/integration/test_config.py @@ -15,6 +15,7 @@ GatewayConfig, ModelDefaultsConfig, DuckDBConnectionConfig, + OwnershipConfig, TableNamingConvention, AutoCategorizationMode, ) @@ -578,3 +579,47 @@ def test_auto_categorization(sushi_context: Context): sushi_context.get_snapshot("sushi.waiter_as_customer_by_day", raise_if_missing=True).version == version ) + + +def test_ownership_config_plan_applies_without_error( + tmp_path: Path, monkeypatch: MonkeyPatch +) -> None: + """OwnershipConfig flows through the full plan/apply lifecycle without errors. + + DuckDB's alter_schema_owner/alter_view_owner are no-ops, so we cannot verify + that ownership was actually changed — but we confirm the config plumbing + doesn't break schema creation, view promotion, or dev environment application. + """ + monkeypatch.chdir(tmp_path) + + config = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + default_connection=DuckDBConnectionConfig(), + ownership=OwnershipConfig( + environment_owner_mapping={ + "^prod$": "svc_prod_owner", + ".*": "group:shared-developers", + } + ), + ) + + models_dir = tmp_path / "models" + models_dir.mkdir() + (models_dir / "model.sql").write_text( + """ + MODEL (name example_schema.test_model, kind FULL); + SELECT '1' AS a + """ + ) + + ctx = Context(config=config, paths=tmp_path) + + # Prod plan/apply — exercises virtual layer schema and view creation with ownership config + ctx.plan(auto_apply=True) + assert ctx.engine_adapter.table_exists("example_schema.test_model") + + # Dev plan/apply — exercises env-suffixed schema and view creation with ownership config + ctx.plan(environment="dev", include_unmodified=True, auto_apply=True) + metadata = DuckDBMetadata.from_context(ctx) + dev_schemas = {s for s in metadata.schemas if "__dev" in s} + assert len(dev_schemas) > 0 diff --git a/tests/core/test_config.py b/tests/core/test_config.py index 8c81a90b8d..ee07d8e31b 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -14,6 +14,7 @@ DuckDBConnectionConfig, GatewayConfig, ModelDefaultsConfig, + OwnershipConfig, BigQueryConnectionConfig, MotherDuckConnectionConfig, BuiltInSchedulerConfig, @@ -606,6 +607,97 @@ def test_load_duckdb_attach_config(tmp_path): assert config.gateways["another_gateway"].connection.catalogs.get("memory") == ":memory:" + +# --------------------------------------------------------------------------- +# OwnershipConfig tests +# --------------------------------------------------------------------------- + + +def test_ownership_config_resolve_owner(): + config = OwnershipConfig( + environment_owner_mapping={ + "^prod$": "svc_prod_spn", + ".*": "group:shared-developers", + } + ) + assert config.resolve_owner("prod") == "svc_prod_spn" + assert config.resolve_owner("dev_alice") == "group:shared-developers" + assert config.resolve_owner("staging") == "group:shared-developers" + # "production" does not match ^prod$ so falls through to .* + assert config.resolve_owner("production") == "group:shared-developers" + + +def test_ownership_config_empty_returns_none(): + assert OwnershipConfig().resolve_owner("prod") is None + assert OwnershipConfig().resolve_owner("dev_env") is None + + +def test_ownership_config_first_match_wins(): + # The catch-all .* comes before a more specific pattern — it always wins. + # This documents the ordering contract: users must put specific patterns first. + config = OwnershipConfig( + environment_owner_mapping={ + ".*": "catch_all_owner", + "^prod$": "prod_owner", + } + ) + assert config.resolve_owner("prod") == "catch_all_owner" + + +def test_ownership_config_case_sensitive(): + # Patterns are compiled without re.IGNORECASE, so matching is case-sensitive. + config = OwnershipConfig(environment_owner_mapping={"^prod$": "svc_prod"}) + assert config.resolve_owner("prod") == "svc_prod" + assert config.resolve_owner("PROD") is None + assert config.resolve_owner("Prod") is None + + +def test_ownership_config_no_match_returns_none(): + config = OwnershipConfig(environment_owner_mapping={"^prod$": "svc_prod"}) + assert config.resolve_owner("staging") is None + assert config.resolve_owner("dev_bob") is None + + +def test_ownership_config_deserialization_from_dict(): + # Simulates YAML/dict-based config loading (as produced by load_config_from_yaml). + config = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + ownership={ + "environment_owner_mapping": { + "^prod$": "svc_prod_spn", + ".*": "group:shared-developers", + } + }, + ) + assert config.ownership.resolve_owner("prod") == "svc_prod_spn" + assert config.ownership.resolve_owner("dev") == "group:shared-developers" + + +def test_ownership_config_nested_update(): + # Config.ownership uses UpdateStrategy.NESTED_UPDATE. + # When two Configs are merged, the second one's environment_owner_mapping + # replaces the first's (REPLACE semantics within OwnershipConfig since + # environment_owner_mapping has no explicit strategy). + c1 = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + ownership=OwnershipConfig(environment_owner_mapping={"^prod$": "spn_prod"}), + ) + c2 = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + ownership=OwnershipConfig(environment_owner_mapping={".*": "grp_devs"}), + ) + merged = c1.update_with(c2) + # c2's mapping fully replaces c1's — the ^prod$ pattern is gone + assert merged.ownership.resolve_owner("prod") == "grp_devs" + assert merged.ownership.resolve_owner("dev_alice") == "grp_devs" + + +def test_config_ownership_defaults_to_empty(): + # Configs without an explicit ownership block have a no-op OwnershipConfig. + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + assert config.ownership.environment_owner_mapping == {} + assert config.ownership.resolve_owner("prod") is None + attach_config_1 = config.gateways["another_gateway"].connection.catalogs.get("sqlite") assert isinstance(attach_config_1, DuckDBAttachOptions) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index d7aa9e4a80..a32121cfca 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -436,6 +436,74 @@ def test_promote_forward_only(mocker: MockerFixture, adapter_mock, make_snapshot ) +def test_promote_with_owner(mocker: MockerFixture, adapter_mock, make_snapshot): + """When owner is supplied, alter_schema_owner and alter_view_owner are called.""" + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a"), + storage_format="parquet", + query=parse_one("SELECT a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + evaluator.promote( + [snapshot], EnvironmentNamingInfo(name="test_env"), owner="group:shared-developers" + ) + + adapter_mock.alter_schema_owner.assert_called_once_with( + to_schema("test_schema__test_env"), "group:shared-developers" + ) + adapter_mock.alter_view_owner.assert_called_once_with( + "test_schema__test_env.test_model", "group:shared-developers" + ) + + +def test_promote_without_owner_skips_alter(mocker: MockerFixture, adapter_mock, make_snapshot): + """When no owner is configured (the default), ownership DDL is never issued.""" + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a"), + storage_format="parquet", + query=parse_one("SELECT a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) + + adapter_mock.alter_schema_owner.assert_not_called() + adapter_mock.alter_view_owner.assert_not_called() + + +def test_promote_owner_applied_per_view(mocker: MockerFixture, adapter_mock, make_snapshot): + """alter_view_owner is called once per promoted snapshot.""" + evaluator = SnapshotEvaluator(adapter_mock) + + snapshots = [] + for name in ("model_a", "model_b", "model_c"): + model = SqlModel( + name=f"test_schema.{name}", + kind=ViewKind(), + query=parse_one("SELECT 1"), + ) + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + snapshots.append(snapshot) + + evaluator.promote( + snapshots, EnvironmentNamingInfo(name="test_env"), owner="svc_prod_spn" + ) + + assert adapter_mock.alter_view_owner.call_count == 3 + called_owners = {c.args[1] for c in adapter_mock.alter_view_owner.call_args_list} + assert called_owners == {"svc_prod_spn"} + + def test_cleanup(mocker: MockerFixture, adapter_mock, make_snapshot): evaluator = SnapshotEvaluator(adapter_mock) From 671f7621638d3af5a86b3610764d73b19577a79e Mon Sep 17 00:00:00 2001 From: Gabe Pesco Date: Thu, 21 May 2026 12:31:07 -0400 Subject: [PATCH 2/5] feat: extend ownership control to physical layer tables Add `physical_owner` field to `OwnershipConfig` so that SQLMesh__* physical tables get ownership applied at creation time, not just virtual-layer views and schemas. * `OwnershipConfig.physical_owner` - optional plain string, no env-pattern matching needed * `EngineAdapterBase.alter_table_owner` - no-op default * `SparkEngineAdapter.alter_table_owner` - ALTER TABLE ... OWNER TO ... with backtick-quoting * `SnapshotEvaluator.create_snapshot` - calls alter_table_owner after _execute_create (skips ViewKind) * `SnapshotEvaluator.create` / `create_physical_schemas` - accept and thread owner param * `BuiltInPlanEvaluator` - resolves physical_owner in both PhysicalLayerSchemaCreation and PhysicalLayerUpdate stages Signed-off-by: Gabe Pesco Signed-off-by: Gabe Pesco --- sqlmesh/core/config/ownership.py | 2 + sqlmesh/core/engine_adapter/base.py | 7 +++ sqlmesh/core/engine_adapter/spark.py | 7 +++ sqlmesh/core/plan/evaluator.py | 5 +- sqlmesh/core/snapshot/evaluator.py | 24 ++++++-- tests/core/engine_adapter/test_spark.py | 18 ++++++ tests/core/test_config.py | 22 +++++++ tests/core/test_snapshot_evaluator.py | 82 +++++++++++++++++++++++++ 8 files changed, 162 insertions(+), 5 deletions(-) diff --git a/sqlmesh/core/config/ownership.py b/sqlmesh/core/config/ownership.py index 0aca8f1f07..463e6608b2 100644 --- a/sqlmesh/core/config/ownership.py +++ b/sqlmesh/core/config/ownership.py @@ -30,9 +30,11 @@ class OwnershipConfig(BaseConfig): environment_owner_mapping: "^prod$": "svc_prod_spn" ".*": "group:shared-developers" + physical_owner: "group:shared-developers" """ environment_owner_mapping: OwnershipMapping = {} + physical_owner: t.Optional[str] = None def resolve_owner(self, environment_name: str) -> t.Optional[str]: """Return the configured owner for the given environment name, or None.""" diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 9bb408c582..f7d91118d2 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1432,6 +1432,13 @@ def alter_view_owner(self, view_name: TableName, owner: str) -> None: (e.g. Spark/Databricks Unity Catalog: ALTER VIEW ... OWNER TO ...). """ + def alter_table_owner(self, table_name: TableName, owner: str) -> None: + """Set the owner of a table. + + No-op by default. Override in dialect-specific adapters that support ownership control + (e.g. Spark/Databricks Unity Catalog: ALTER TABLE ... OWNER TO ...). + """ + def drop_schema( self, schema_name: SchemaName, diff --git a/sqlmesh/core/engine_adapter/spark.py b/sqlmesh/core/engine_adapter/spark.py index 6bcb19415b..411dbd69f8 100644 --- a/sqlmesh/core/engine_adapter/spark.py +++ b/sqlmesh/core/engine_adapter/spark.py @@ -567,6 +567,13 @@ def alter_view_owner(self, view_name: TableName, owner: str) -> None: owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect) self.execute(f"ALTER VIEW {view_sql} OWNER TO {owner_sql}") + def alter_table_owner(self, table_name: TableName, owner: str) -> None: + table_sql = exp.to_table(table_name, dialect=self.dialect).sql( + dialect=self.dialect, identify=True + ) + owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect) + self.execute(f"ALTER TABLE {table_sql} OWNER TO {owner_sql}") + @classmethod def _wap_branch_name(cls, wap_id: str) -> str: return f"{cls.WAP_PREFIX}{wap_id}" diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 75e85c8082..7858bbbec8 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -175,6 +175,7 @@ def visit_physical_layer_update_stage( self.console.log_success(skip_message) return + physical_owner = self.ownership_config.physical_owner if self.ownership_config else None completion_status = None progress_stopped = False try: @@ -188,6 +189,7 @@ def visit_physical_layer_update_stage( x, plan.environment, self.default_catalog ), on_complete=self.console.update_creation_progress, + owner=physical_owner, ) if completion_status.is_nothing_to_do: self.console.log_success(skip_message) @@ -212,9 +214,10 @@ def visit_physical_layer_update_stage( def visit_physical_layer_schema_creation_stage( self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan ) -> None: + physical_owner = self.ownership_config.physical_owner if self.ownership_config else None try: self.snapshot_evaluator.create_physical_schemas( - stage.snapshots, stage.deployability_index + stage.snapshots, stage.deployability_index, owner=physical_owner ) except Exception as ex: raise PlanError("Plan application failed.") from ex diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 1f91cf344f..89ff5b6b82 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -368,6 +368,7 @@ def create( on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, allow_destructive_snapshots: t.Optional[t.Set[str]] = None, allow_additive_snapshots: t.Optional[t.Set[str]] = None, + owner: t.Optional[str] = None, ) -> CompletionStatus: """Creates a physical snapshot schema and table for the given collection of snapshots. @@ -379,6 +380,7 @@ def create( on_complete: A callback to call on each successfully created snapshot. allow_destructive_snapshots: Set of snapshots that are allowed to have destructive schema changes. allow_additive_snapshots: Set of snapshots that are allowed to have additive schema changes. + owner: Optional principal to set as table owner after creation. Returns: CompletionStatus: The status of the creation operation (success, failure, nothing to do). @@ -398,17 +400,22 @@ def create( on_complete=on_complete, allow_destructive_snapshots=allow_destructive_snapshots or set(), allow_additive_snapshots=allow_additive_snapshots or set(), + owner=owner, ) return CompletionStatus.SUCCESS def create_physical_schemas( - self, snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex + self, + snapshots: t.Iterable[Snapshot], + deployability_index: DeployabilityIndex, + owner: t.Optional[str] = None, ) -> None: """Creates the physical schemas for the given snapshots. Args: snapshots: Snapshots to create physical schemas for. deployability_index: Determines snapshots that are deployable in the context of this creation. + owner: Optional principal to set as schema owner after creation. """ tables_by_gateway: t.Dict[t.Optional[str], t.List[str]] = defaultdict(list) for snapshot in snapshots: @@ -420,7 +427,7 @@ def create_physical_schemas( gateway_table_pairs = [ (gateway, table) for gateway, tables in tables_by_gateway.items() for table in tables ] - self._create_schemas(gateway_table_pairs=gateway_table_pairs) + self._create_schemas(gateway_table_pairs=gateway_table_pairs, owner=owner) def get_snapshots_to_create( self, target_snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex @@ -453,6 +460,7 @@ def _create_snapshots( on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]], allow_destructive_snapshots: t.Set[str], allow_additive_snapshots: t.Set[str], + owner: t.Optional[str] = None, ) -> None: """Internal method to create tables in parallel.""" with self.concurrent_context(): @@ -465,6 +473,7 @@ def _create_snapshots( allow_destructive_snapshots=allow_destructive_snapshots, allow_additive_snapshots=allow_additive_snapshots, on_complete=on_complete, + owner=owner, ), self.ddl_concurrent_tasks, raise_on_error=False, @@ -870,6 +879,7 @@ def create_snapshot( allow_destructive_snapshots: t.Set[str], allow_additive_snapshots: t.Set[str], on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, + owner: t.Optional[str] = None, ) -> None: """Creates a physical table for the given snapshot. @@ -880,6 +890,7 @@ def create_snapshot( on_complete: A callback to call on each successfully created database object. allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed. allow_additive_snapshots: Snapshots for which additive schema changes are allowed. + owner: Optional principal to set as table owner after creation. """ if not snapshot.is_model: return @@ -907,6 +918,9 @@ def create_snapshot( **create_render_kwargs ) + is_table_deployable = deployability_index.is_deployable(snapshot) + table_name = snapshot.table_name(is_deployable=is_table_deployable) + if self._can_clone(snapshot, deployability_index): self._clone_snapshot_in_dev( snapshot=snapshot, @@ -919,10 +933,9 @@ def create_snapshot( run_pre_post_statements=True, ) else: - is_table_deployable = deployability_index.is_deployable(snapshot) self._execute_create( snapshot=snapshot, - table_name=snapshot.table_name(is_deployable=is_table_deployable), + table_name=table_name, is_table_deployable=is_table_deployable, deployability_index=deployability_index, create_render_kwargs=create_render_kwargs, @@ -930,6 +943,9 @@ def create_snapshot( dry_run=True, ) + if owner and not isinstance(snapshot.model.kind, ViewKind): + adapter.alter_table_owner(table_name, owner) + evaluation_strategy.run_post_statements( snapshot=snapshot, render_kwargs={**create_render_kwargs, "inside_transaction": False} ) diff --git a/tests/core/engine_adapter/test_spark.py b/tests/core/engine_adapter/test_spark.py index fd15e29a35..2d4c9705b7 100644 --- a/tests/core/engine_adapter/test_spark.py +++ b/tests/core/engine_adapter/test_spark.py @@ -1125,6 +1125,23 @@ def test_alter_view_owner_special_chars_in_principal(make_mocked_engine_adapter: ] +def test_alter_table_owner(make_mocked_engine_adapter: t.Callable): + adapter = make_mocked_engine_adapter(SparkEngineAdapter) + adapter.alter_table_owner("catalog.sqlmesh__sushi.orders__abc123", "svc_prod_spn") + assert to_sql_calls(adapter) == [ + "ALTER TABLE `catalog`.`sqlmesh__sushi`.`orders__abc123` OWNER TO `svc_prod_spn`" + ] + + +def test_alter_table_owner_special_chars_in_principal(make_mocked_engine_adapter: t.Callable): + # Databricks Unity Catalog principals can contain colons and @ signs. + adapter = make_mocked_engine_adapter(SparkEngineAdapter) + adapter.alter_table_owner("catalog.sqlmesh__sushi.orders__abc123", "group:data@company.com") + assert to_sql_calls(adapter) == [ + "ALTER TABLE `catalog`.`sqlmesh__sushi`.`orders__abc123` OWNER TO `group:data@company.com`" + ] + + def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable): # The base EngineAdapter.alter_schema_owner is a no-op: adapters that don't # support ownership control silently skip it without emitting any SQL. @@ -1133,6 +1150,7 @@ def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable): adapter = make_mocked_engine_adapter(DuckDBEngineAdapter) adapter.alter_schema_owner("my_schema", "some_owner") adapter.alter_view_owner("my_schema.my_view", "some_owner") + adapter.alter_table_owner("my_schema.my_table", "some_owner") # No ALTER SQL should have been emitted alter_calls = [s for s in to_sql_calls(adapter) if "OWNER" in s.upper()] assert alter_calls == [] diff --git a/tests/core/test_config.py b/tests/core/test_config.py index ee07d8e31b..03d35b37f6 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -713,6 +713,28 @@ def test_config_ownership_defaults_to_empty(): assert attach_config_2.read_only is True +def test_ownership_config_physical_owner(): + # physical_owner is a simple optional string — no pattern matching. + config = OwnershipConfig(physical_owner="group:data-platform") + assert config.physical_owner == "group:data-platform" + + +def test_ownership_config_physical_owner_default_none(): + assert OwnershipConfig().physical_owner is None + + +def test_ownership_config_physical_owner_deserialization(): + config = Config( + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + ownership={ + "environment_owner_mapping": {"^prod$": "svc_prod"}, + "physical_owner": "group:data-platform", + }, + ) + assert config.ownership.physical_owner == "group:data-platform" + assert config.ownership.resolve_owner("prod") == "svc_prod" + + def test_load_model_defaults_audits(tmp_path): config_path = tmp_path / "config_model_defaults_audits.yaml" with open(config_path, "w", encoding="utf-8") as fd: diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index a32121cfca..8720148679 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -504,6 +504,88 @@ def test_promote_owner_applied_per_view(mocker: MockerFixture, adapter_mock, mak assert called_owners == {"svc_prod_spn"} +def test_create_with_physical_owner(mocker: MockerFixture, adapter_mock, make_snapshot): + """alter_table_owner is called for each non-view table when physical owner is set.""" + adapter_mock.get_data_objects.return_value = [] + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="ds"), + storage_format="parquet", + query=parse_one("SELECT a, ds FROM tbl WHERE ds BETWEEN @start_ds AND @end_ds"), + ) + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + evaluator.create([snapshot], {}, owner="group:data-platform") + + adapter_mock.alter_table_owner.assert_called_once() + call_args = adapter_mock.alter_table_owner.call_args + assert call_args.args[1] == "group:data-platform" + + +def test_create_without_physical_owner_skips_alter( + mocker: MockerFixture, adapter_mock, make_snapshot +): + """When no physical owner is set, alter_table_owner is never called.""" + adapter_mock.get_data_objects.return_value = [] + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="ds"), + storage_format="parquet", + query=parse_one("SELECT a, ds FROM tbl WHERE ds BETWEEN @start_ds AND @end_ds"), + ) + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + evaluator.create([snapshot], {}) + + adapter_mock.alter_table_owner.assert_not_called() + + +def test_create_view_kind_skips_physical_owner( + mocker: MockerFixture, adapter_mock, make_snapshot +): + """ViewKind snapshots skip alter_table_owner even when physical_owner is set.""" + adapter_mock.get_data_objects.return_value = [] + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="test_schema.test_view", + kind=ViewKind(), + query=parse_one("SELECT 1"), + ) + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + evaluator.create([snapshot], {}, owner="group:data-platform") + + adapter_mock.alter_table_owner.assert_not_called() + + +def test_create_physical_schemas_with_owner(mocker: MockerFixture, adapter_mock, make_snapshot): + """create_physical_schemas passes owner to _create_schemas so alter_schema_owner is called.""" + evaluator = SnapshotEvaluator(adapter_mock) + deployability_index = DeployabilityIndex.all_deployable() + + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="ds"), + storage_format="parquet", + query=parse_one("SELECT a, ds FROM tbl WHERE ds BETWEEN @start_ds AND @end_ds"), + ) + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + evaluator.create_physical_schemas([snapshot], deployability_index, owner="svc_prod_spn") + + adapter_mock.alter_schema_owner.assert_called_once() + assert adapter_mock.alter_schema_owner.call_args.args[1] == "svc_prod_spn" + + def test_cleanup(mocker: MockerFixture, adapter_mock, make_snapshot): evaluator = SnapshotEvaluator(adapter_mock) From c628df9a1ebfcf4a082ddb7ba032c258bd0516ab Mon Sep 17 00:00:00 2001 From: Gabe Pesco Date: Thu, 21 May 2026 13:22:25 -0400 Subject: [PATCH 3/5] style: apply ruff-format and fix stray test assertions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Run ruff-format on changed files. Remove DuckDB attach assertions accidentally included in test_config_ownership_defaults_to_empty — those are already covered by test_load_duckdb_attach_config. Signed-off-by: Gabe Pesco Signed-off-by: Gabe Pesco --- sqlmesh/core/config/ownership.py | 4 +--- tests/core/engine_adapter/test_spark.py | 4 +--- tests/core/test_config.py | 14 -------------- tests/core/test_snapshot_evaluator.py | 8 ++------ 4 files changed, 4 insertions(+), 26 deletions(-) diff --git a/sqlmesh/core/config/ownership.py b/sqlmesh/core/config/ownership.py index 463e6608b2..3d2f35c541 100644 --- a/sqlmesh/core/config/ownership.py +++ b/sqlmesh/core/config/ownership.py @@ -11,9 +11,7 @@ if t.TYPE_CHECKING: OwnershipMapping = t.Dict[re.Pattern, str] else: - OwnershipMapping = t.Annotated[ - t.Dict[re.Pattern, str], BeforeValidator(compile_regex_mapping) - ] + OwnershipMapping = t.Annotated[t.Dict[re.Pattern, str], BeforeValidator(compile_regex_mapping)] class OwnershipConfig(BaseConfig): diff --git a/tests/core/engine_adapter/test_spark.py b/tests/core/engine_adapter/test_spark.py index 2d4c9705b7..f88b47d3b2 100644 --- a/tests/core/engine_adapter/test_spark.py +++ b/tests/core/engine_adapter/test_spark.py @@ -1095,9 +1095,7 @@ def test_table_format(adapter: SparkEngineAdapter, mocker: MockerFixture): def test_alter_schema_owner(make_mocked_engine_adapter: t.Callable): adapter = make_mocked_engine_adapter(SparkEngineAdapter) adapter.alter_schema_owner("catalog.my_schema", "svc_prod_spn") - assert to_sql_calls(adapter) == [ - "ALTER SCHEMA `catalog`.`my_schema` OWNER TO `svc_prod_spn`" - ] + assert to_sql_calls(adapter) == ["ALTER SCHEMA `catalog`.`my_schema` OWNER TO `svc_prod_spn`"] def test_alter_schema_owner_three_part_name(make_mocked_engine_adapter: t.Callable): diff --git a/tests/core/test_config.py b/tests/core/test_config.py index 03d35b37f6..9bde352f22 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -698,20 +698,6 @@ def test_config_ownership_defaults_to_empty(): assert config.ownership.environment_owner_mapping == {} assert config.ownership.resolve_owner("prod") is None - attach_config_1 = config.gateways["another_gateway"].connection.catalogs.get("sqlite") - - assert isinstance(attach_config_1, DuckDBAttachOptions) - assert attach_config_1.type == "sqlite" - assert attach_config_1.path == "test.db" - assert attach_config_1.read_only is False - - attach_config_2 = config.gateways["another_gateway"].connection.catalogs.get("postgres") - - assert isinstance(attach_config_2, DuckDBAttachOptions) - assert attach_config_2.type == "postgres" - assert attach_config_2.path == "dbname=postgres user=postgres host=127.0.0.1" - assert attach_config_2.read_only is True - def test_ownership_config_physical_owner(): # physical_owner is a simple optional string — no pattern matching. diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 8720148679..a333fb1746 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -495,9 +495,7 @@ def test_promote_owner_applied_per_view(mocker: MockerFixture, adapter_mock, mak snapshot.categorize_as(SnapshotChangeCategory.BREAKING) snapshots.append(snapshot) - evaluator.promote( - snapshots, EnvironmentNamingInfo(name="test_env"), owner="svc_prod_spn" - ) + evaluator.promote(snapshots, EnvironmentNamingInfo(name="test_env"), owner="svc_prod_spn") assert adapter_mock.alter_view_owner.call_count == 3 called_owners = {c.args[1] for c in adapter_mock.alter_view_owner.call_args_list} @@ -546,9 +544,7 @@ def test_create_without_physical_owner_skips_alter( adapter_mock.alter_table_owner.assert_not_called() -def test_create_view_kind_skips_physical_owner( - mocker: MockerFixture, adapter_mock, make_snapshot -): +def test_create_view_kind_skips_physical_owner(mocker: MockerFixture, adapter_mock, make_snapshot): """ViewKind snapshots skip alter_table_owner even when physical_owner is set.""" adapter_mock.get_data_objects.return_value = [] evaluator = SnapshotEvaluator(adapter_mock) From ed2ce65b26d7f3d8001106dec13a31a2975a827e Mon Sep 17 00:00:00 2001 From: Gabe Pesco Date: Thu, 21 May 2026 13:29:26 -0400 Subject: [PATCH 4/5] style: apply ruff-format to scheduler.py Signed-off-by: Gabe Pesco --- sqlmesh/core/config/scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index d8c5f7c96e..cd24fedcc4 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -131,7 +131,10 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: from sqlmesh.core.config.ownership import OwnershipConfig ownership_config = getattr(context.config, "ownership", None) - if isinstance(ownership_config, OwnershipConfig) and not ownership_config.environment_owner_mapping: + if ( + isinstance(ownership_config, OwnershipConfig) + and not ownership_config.environment_owner_mapping + ): ownership_config = None return BuiltInPlanEvaluator( state_sync=context.state_sync, From c75f3766467a7363eca1674599e2a9427e0aaba4 Mon Sep 17 00:00:00 2001 From: Gabe Pesco Date: Thu, 21 May 2026 16:50:52 -0400 Subject: [PATCH 5/5] feat: add callable resolver support to OwnershipConfig - OwnershipConfig gains environment_owner_resolver and physical_owner_resolver callable fields for cases where the owner principal must be resolved at plan execution time (e.g. dynamic SPN identity via adapter.current_user()). - Callable resolvers take precedence over the static mapping/string fields when both are set. - resolve_owner() and resolve_physical_owner() now accept the active EngineAdapter so callables can query the connection. - Add current_user() to the base EngineAdapter (SELECT CURRENT_USER()). - Replace the getattr/isinstance guard in BuiltInSchedulerConfig with OwnershipConfig.is_active. - Update all resolve_owner/resolve_physical_owner call sites in BuiltInPlanEvaluator to thread the adapter through. Co-Authored-By: Claude Sonnet 4.6 --- sqlmesh/core/config/ownership.py | 50 +++++++++-- sqlmesh/core/config/scheduler.py | 10 +-- sqlmesh/core/engine_adapter/base.py | 11 +++ sqlmesh/core/plan/evaluator.py | 16 +++- tests/core/engine_adapter/test_spark.py | 18 ++++ tests/core/test_config.py | 107 ++++++++++++++++++++---- 6 files changed, 176 insertions(+), 36 deletions(-) diff --git a/sqlmesh/core/config/ownership.py b/sqlmesh/core/config/ownership.py index 3d2f35c541..9730be8d78 100644 --- a/sqlmesh/core/config/ownership.py +++ b/sqlmesh/core/config/ownership.py @@ -9,34 +9,70 @@ from sqlmesh.core.config.common import compile_regex_mapping if t.TYPE_CHECKING: + from sqlmesh.core.engine_adapter.base import EngineAdapter OwnershipMapping = t.Dict[re.Pattern, str] + EnvironmentOwnerResolver = t.Callable[[str, EngineAdapter], t.Optional[str]] + PhysicalOwnerResolver = t.Callable[[EngineAdapter], t.Optional[str]] else: OwnershipMapping = t.Annotated[t.Dict[re.Pattern, str], BeforeValidator(compile_regex_mapping)] + EnvironmentOwnerResolver = t.Callable + PhysicalOwnerResolver = t.Callable class OwnershipConfig(BaseConfig): """Configuration for object ownership rules applied at creation time. - Maps environment name regex patterns to owner principals. The first - matching pattern wins. Ownership is applied immediately when schemas and - views are created, so even a partially-completed run leaves objects in a - manageable state. + For static YAML-based config, use ``environment_owner_mapping`` and + ``physical_owner``. For programmatic config where the principal must be + resolved at plan-execution time (e.g. via ``adapter.current_user()`` or a + Databricks API call), supply ``environment_owner_resolver`` and/or + ``physical_owner_resolver`` instead — callables take precedence over the + static fields. - Example:: + Example (YAML):: ownership: environment_owner_mapping: "^prod$": "svc_prod_spn" ".*": "group:shared-developers" physical_owner: "group:shared-developers" + + Example (Python):: + + OwnershipConfig( + environment_owner_resolver=lambda env, adapter: ( + adapter.current_user() if env == "prod" else "group:shared-developers" + ), + physical_owner="group:shared-developers", + ) """ environment_owner_mapping: OwnershipMapping = {} + environment_owner_resolver: t.Optional[EnvironmentOwnerResolver] = None physical_owner: t.Optional[str] = None + physical_owner_resolver: t.Optional[PhysicalOwnerResolver] = None - def resolve_owner(self, environment_name: str) -> t.Optional[str]: - """Return the configured owner for the given environment name, or None.""" + @property + def is_active(self) -> bool: + """True when any ownership rule is configured.""" + return bool( + self.environment_owner_resolver is not None + or self.environment_owner_mapping + or self.physical_owner is not None + or self.physical_owner_resolver is not None + ) + + def resolve_owner(self, environment_name: str, adapter: "EngineAdapter") -> t.Optional[str]: + """Return the configured owner for the given environment, or None.""" + if self.environment_owner_resolver is not None: + return self.environment_owner_resolver(environment_name, adapter) for pattern, owner in self.environment_owner_mapping.items(): if pattern.fullmatch(environment_name): return owner return None + + def resolve_physical_owner(self, adapter: "EngineAdapter") -> t.Optional[str]: + """Return the configured physical-layer owner, or None.""" + if self.physical_owner_resolver is not None: + return self.physical_owner_resolver(adapter) + return self.physical_owner diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index cd24fedcc4..dd8f9caaa7 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -128,14 +128,8 @@ class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig) type_: t.Literal["builtin"] = Field(alias="type", default="builtin") def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: - from sqlmesh.core.config.ownership import OwnershipConfig - - ownership_config = getattr(context.config, "ownership", None) - if ( - isinstance(ownership_config, OwnershipConfig) - and not ownership_config.environment_owner_mapping - ): - ownership_config = None + ownership = context.config.ownership + ownership_config = ownership if ownership.is_active else None return BuiltInPlanEvaluator( state_sync=context.state_sync, snapshot_evaluator=context.snapshot_evaluator, diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index f7d91118d2..b6954d3f3d 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1418,6 +1418,17 @@ def _create_schema( raise logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e) + def current_user(self) -> str: + """Return the identity of the currently-connected principal. + + Uses SQL ``CURRENT_USER()`` which is supported by Spark/Databricks and + DuckDB. Override in adapters where a different mechanism is required. + """ + row = self.fetchone("SELECT CURRENT_USER()") + if not row: + raise SQLMeshError("Could not determine current user: CURRENT_USER() returned no rows") + return row[0] + def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None: """Set the owner of a schema. diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 7858bbbec8..e69bf4b78f 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -175,7 +175,11 @@ def visit_physical_layer_update_stage( self.console.log_success(skip_message) return - physical_owner = self.ownership_config.physical_owner if self.ownership_config else None + physical_owner = ( + self.ownership_config.resolve_physical_owner(self.snapshot_evaluator.adapter) + if self.ownership_config + else None + ) completion_status = None progress_stopped = False try: @@ -214,7 +218,11 @@ def visit_physical_layer_update_stage( def visit_physical_layer_schema_creation_stage( self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan ) -> None: - physical_owner = self.ownership_config.physical_owner if self.ownership_config else None + physical_owner = ( + self.ownership_config.resolve_physical_owner(self.snapshot_evaluator.adapter) + if self.ownership_config + else None + ) try: self.snapshot_evaluator.create_physical_schemas( stage.snapshots, stage.deployability_index, owner=physical_owner @@ -442,7 +450,9 @@ def _promote_snapshots( ) -> None: owner: t.Optional[str] = None if self.ownership_config: - owner = self.ownership_config.resolve_owner(environment_naming_info.name) + owner = self.ownership_config.resolve_owner( + environment_naming_info.name, self.snapshot_evaluator.adapter + ) self.snapshot_evaluator.promote( target_snapshots, start=plan.start, diff --git a/tests/core/engine_adapter/test_spark.py b/tests/core/engine_adapter/test_spark.py index f88b47d3b2..618bd381cb 100644 --- a/tests/core/engine_adapter/test_spark.py +++ b/tests/core/engine_adapter/test_spark.py @@ -1154,6 +1154,24 @@ def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable): assert alter_calls == [] +def test_current_user(make_mocked_engine_adapter: t.Callable): + adapter = make_mocked_engine_adapter(SparkEngineAdapter) + adapter.cursor.fetchone.return_value = ("spn-abc-123",) + result = adapter.current_user() + assert result == "spn-abc-123" + sql_calls = to_sql_calls(adapter) + assert any("CURRENT_USER" in s.upper() for s in sql_calls) + + +def test_current_user_base_noop(make_mocked_engine_adapter: t.Callable): + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + + adapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + adapter.cursor.fetchone.return_value = ("duckdb-user",) + result = adapter.current_user() + assert result == "duckdb-user" + + def test_get_data_object_wap_branch(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): adapter = make_mocked_engine_adapter(SparkEngineAdapter, patch_get_data_objects=False) mocker.patch.object(adapter, "_get_data_objects", return_value=[]) diff --git a/tests/core/test_config.py b/tests/core/test_config.py index 9bde352f22..582434c0c7 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -614,52 +614,58 @@ def test_load_duckdb_attach_config(tmp_path): def test_ownership_config_resolve_owner(): + mock_adapter = mock.MagicMock() config = OwnershipConfig( environment_owner_mapping={ "^prod$": "svc_prod_spn", ".*": "group:shared-developers", } ) - assert config.resolve_owner("prod") == "svc_prod_spn" - assert config.resolve_owner("dev_alice") == "group:shared-developers" - assert config.resolve_owner("staging") == "group:shared-developers" + assert config.resolve_owner("prod", mock_adapter) == "svc_prod_spn" + assert config.resolve_owner("dev_alice", mock_adapter) == "group:shared-developers" + assert config.resolve_owner("staging", mock_adapter) == "group:shared-developers" # "production" does not match ^prod$ so falls through to .* - assert config.resolve_owner("production") == "group:shared-developers" + assert config.resolve_owner("production", mock_adapter) == "group:shared-developers" def test_ownership_config_empty_returns_none(): - assert OwnershipConfig().resolve_owner("prod") is None - assert OwnershipConfig().resolve_owner("dev_env") is None + mock_adapter = mock.MagicMock() + assert OwnershipConfig().resolve_owner("prod", mock_adapter) is None + assert OwnershipConfig().resolve_owner("dev_env", mock_adapter) is None def test_ownership_config_first_match_wins(): # The catch-all .* comes before a more specific pattern — it always wins. # This documents the ordering contract: users must put specific patterns first. + mock_adapter = mock.MagicMock() config = OwnershipConfig( environment_owner_mapping={ ".*": "catch_all_owner", "^prod$": "prod_owner", } ) - assert config.resolve_owner("prod") == "catch_all_owner" + assert config.resolve_owner("prod", mock_adapter) == "catch_all_owner" def test_ownership_config_case_sensitive(): # Patterns are compiled without re.IGNORECASE, so matching is case-sensitive. + mock_adapter = mock.MagicMock() config = OwnershipConfig(environment_owner_mapping={"^prod$": "svc_prod"}) - assert config.resolve_owner("prod") == "svc_prod" - assert config.resolve_owner("PROD") is None - assert config.resolve_owner("Prod") is None + assert config.resolve_owner("prod", mock_adapter) == "svc_prod" + assert config.resolve_owner("PROD", mock_adapter) is None + assert config.resolve_owner("Prod", mock_adapter) is None def test_ownership_config_no_match_returns_none(): + mock_adapter = mock.MagicMock() config = OwnershipConfig(environment_owner_mapping={"^prod$": "svc_prod"}) - assert config.resolve_owner("staging") is None - assert config.resolve_owner("dev_bob") is None + assert config.resolve_owner("staging", mock_adapter) is None + assert config.resolve_owner("dev_bob", mock_adapter) is None def test_ownership_config_deserialization_from_dict(): # Simulates YAML/dict-based config loading (as produced by load_config_from_yaml). + mock_adapter = mock.MagicMock() config = Config( model_defaults=ModelDefaultsConfig(dialect="duckdb"), ownership={ @@ -669,8 +675,8 @@ def test_ownership_config_deserialization_from_dict(): } }, ) - assert config.ownership.resolve_owner("prod") == "svc_prod_spn" - assert config.ownership.resolve_owner("dev") == "group:shared-developers" + assert config.ownership.resolve_owner("prod", mock_adapter) == "svc_prod_spn" + assert config.ownership.resolve_owner("dev", mock_adapter) == "group:shared-developers" def test_ownership_config_nested_update(): @@ -678,6 +684,7 @@ def test_ownership_config_nested_update(): # When two Configs are merged, the second one's environment_owner_mapping # replaces the first's (REPLACE semantics within OwnershipConfig since # environment_owner_mapping has no explicit strategy). + mock_adapter = mock.MagicMock() c1 = Config( model_defaults=ModelDefaultsConfig(dialect="duckdb"), ownership=OwnershipConfig(environment_owner_mapping={"^prod$": "spn_prod"}), @@ -688,15 +695,16 @@ def test_ownership_config_nested_update(): ) merged = c1.update_with(c2) # c2's mapping fully replaces c1's — the ^prod$ pattern is gone - assert merged.ownership.resolve_owner("prod") == "grp_devs" - assert merged.ownership.resolve_owner("dev_alice") == "grp_devs" + assert merged.ownership.resolve_owner("prod", mock_adapter) == "grp_devs" + assert merged.ownership.resolve_owner("dev_alice", mock_adapter) == "grp_devs" def test_config_ownership_defaults_to_empty(): # Configs without an explicit ownership block have a no-op OwnershipConfig. + mock_adapter = mock.MagicMock() config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) assert config.ownership.environment_owner_mapping == {} - assert config.ownership.resolve_owner("prod") is None + assert config.ownership.resolve_owner("prod", mock_adapter) is None def test_ownership_config_physical_owner(): @@ -718,7 +726,70 @@ def test_ownership_config_physical_owner_deserialization(): }, ) assert config.ownership.physical_owner == "group:data-platform" - assert config.ownership.resolve_owner("prod") == "svc_prod" + assert config.ownership.resolve_owner("prod", mock.MagicMock()) == "svc_prod" + + +def test_ownership_config_resolve_owner_callable(): + # A callable resolver takes precedence over environment_owner_mapping and + # receives (env_name, adapter) so it can call adapter.current_user() etc. + mock_adapter = mock.MagicMock() + mock_adapter.current_user.return_value = "spn-dynamic-uuid" + + config = OwnershipConfig( + environment_owner_mapping={".*": "group:fallback"}, + environment_owner_resolver=lambda env, adapter: ( + adapter.current_user() if env == "prod" else "group:shared-developers" + ), + ) + + assert config.resolve_owner("prod", mock_adapter) == "spn-dynamic-uuid" + assert config.resolve_owner("dev_alice", mock_adapter) == "group:shared-developers" + mock_adapter.current_user.assert_called_once() + + +def test_ownership_config_resolver_overrides_mapping(): + # Resolver always wins when set, even if the mapping would also match. + mock_adapter = mock.MagicMock() + config = OwnershipConfig( + environment_owner_mapping={"^prod$": "static-owner"}, + environment_owner_resolver=lambda env, adapter: "dynamic-owner", + ) + assert config.resolve_owner("prod", mock_adapter) == "dynamic-owner" + + +def test_ownership_config_resolve_physical_owner_callable(): + mock_adapter = mock.MagicMock() + mock_adapter.current_user.return_value = "spn-uuid-123" + + config = OwnershipConfig( + physical_owner_resolver=lambda adapter: adapter.current_user(), + ) + assert config.resolve_physical_owner(mock_adapter) == "spn-uuid-123" + mock_adapter.current_user.assert_called_once() + + +def test_ownership_config_resolve_physical_owner_static(): + mock_adapter = mock.MagicMock() + config = OwnershipConfig(physical_owner="group:data-platform") + assert config.resolve_physical_owner(mock_adapter) == "group:data-platform" + mock_adapter.current_user.assert_not_called() + + +def test_ownership_config_physical_owner_resolver_overrides_static(): + mock_adapter = mock.MagicMock() + config = OwnershipConfig( + physical_owner="static-owner", + physical_owner_resolver=lambda adapter: "dynamic-owner", + ) + assert config.resolve_physical_owner(mock_adapter) == "dynamic-owner" + + +def test_ownership_config_is_active(): + assert not OwnershipConfig().is_active + assert OwnershipConfig(environment_owner_mapping={".*": "grp"}).is_active + assert OwnershipConfig(environment_owner_resolver=lambda e, a: None).is_active + assert OwnershipConfig(physical_owner="grp").is_active + assert OwnershipConfig(physical_owner_resolver=lambda a: "grp").is_active def test_load_model_defaults_audits(tmp_path):