Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sqlmesh/core/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
load_configs as load_configs,
)
from sqlmesh.core.config.migration import MigrationConfig as MigrationConfig
from sqlmesh.core.config.ownership import OwnershipConfig as OwnershipConfig
from sqlmesh.core.config.model import ModelDefaultsConfig as ModelDefaultsConfig
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
from sqlmesh.core.config.linter import LinterConfig as LinterConfig
Expand Down
78 changes: 78 additions & 0 deletions sqlmesh/core/config/ownership.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

import re
import typing as t

from pydantic.functional_validators import BeforeValidator

from sqlmesh.core.config.base import BaseConfig
from sqlmesh.core.config.common import compile_regex_mapping

if t.TYPE_CHECKING:
from sqlmesh.core.engine_adapter.base import EngineAdapter
OwnershipMapping = t.Dict[re.Pattern, str]
EnvironmentOwnerResolver = t.Callable[[str, EngineAdapter], t.Optional[str]]
PhysicalOwnerResolver = t.Callable[[EngineAdapter], t.Optional[str]]
else:
OwnershipMapping = t.Annotated[t.Dict[re.Pattern, str], BeforeValidator(compile_regex_mapping)]
EnvironmentOwnerResolver = t.Callable
PhysicalOwnerResolver = t.Callable


class OwnershipConfig(BaseConfig):
"""Configuration for object ownership rules applied at creation time.

For static YAML-based config, use ``environment_owner_mapping`` and
``physical_owner``. For programmatic config where the principal must be
resolved at plan-execution time (e.g. via ``adapter.current_user()`` or a
Databricks API call), supply ``environment_owner_resolver`` and/or
``physical_owner_resolver`` instead — callables take precedence over the
static fields.

Example (YAML)::

ownership:
environment_owner_mapping:
"^prod$": "svc_prod_spn"
".*": "group:shared-developers"
physical_owner: "group:shared-developers"

Example (Python)::

OwnershipConfig(
environment_owner_resolver=lambda env, adapter: (
adapter.current_user() if env == "prod" else "group:shared-developers"
),
physical_owner="group:shared-developers",
)
"""

environment_owner_mapping: OwnershipMapping = {}
environment_owner_resolver: t.Optional[EnvironmentOwnerResolver] = None
physical_owner: t.Optional[str] = None
physical_owner_resolver: t.Optional[PhysicalOwnerResolver] = None

@property
def is_active(self) -> bool:
"""True when any ownership rule is configured."""
return bool(
self.environment_owner_resolver is not None
or self.environment_owner_mapping
or self.physical_owner is not None
or self.physical_owner_resolver is not None
)

def resolve_owner(self, environment_name: str, adapter: "EngineAdapter") -> t.Optional[str]:
"""Return the configured owner for the given environment, or None."""
if self.environment_owner_resolver is not None:
return self.environment_owner_resolver(environment_name, adapter)
for pattern, owner in self.environment_owner_mapping.items():
if pattern.fullmatch(environment_name):
return owner
return None

def resolve_physical_owner(self, adapter: "EngineAdapter") -> t.Optional[str]:
"""Return the configured physical-layer owner, or None."""
if self.physical_owner_resolver is not None:
return self.physical_owner_resolver(adapter)
return self.physical_owner
4 changes: 4 additions & 0 deletions sqlmesh/core/config/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from sqlmesh.core.config.format import FormatConfig
from sqlmesh.core.config.gateway import GatewayConfig
from sqlmesh.core.config.janitor import JanitorConfig
from sqlmesh.core.config.ownership import OwnershipConfig
from sqlmesh.core.config.migration import MigrationConfig
from sqlmesh.core.config.model import ModelDefaultsConfig
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
Expand Down Expand Up @@ -118,6 +119,7 @@ class Config(BaseConfig):
gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
ownership: Ownership rules applied at schema/view creation time. Maps environment name patterns to owner principals so objects are correctly owned even after a partial run.
default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
log_limit: The default number of logs to keep.
format: The formatting options for SQL code.
Expand Down Expand Up @@ -175,6 +177,7 @@ class Config(BaseConfig):
janitor: JanitorConfig = JanitorConfig()
cache_dir: t.Optional[str] = None
dbt: t.Optional[DbtConfig] = None
ownership: OwnershipConfig = Field(default_factory=OwnershipConfig)

_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
"gateways": UpdateStrategy.NESTED_UPDATE,
Expand All @@ -194,6 +197,7 @@ class Config(BaseConfig):
"after_all": UpdateStrategy.EXTEND,
"linter": UpdateStrategy.NESTED_UPDATE,
"dbt": UpdateStrategy.NESTED_UPDATE,
"ownership": UpdateStrategy.NESTED_UPDATE,
}

_connection_config_validator = connection_config_validator
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/core/config/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,15 @@ class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig)
type_: t.Literal["builtin"] = Field(alias="type", default="builtin")

def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
ownership = context.config.ownership
ownership_config = ownership if ownership.is_active else None
return BuiltInPlanEvaluator(
state_sync=context.state_sync,
snapshot_evaluator=context.snapshot_evaluator,
create_scheduler=context.create_scheduler,
default_catalog=context.default_catalog,
console=context.console,
ownership_config=ownership_config,
)

def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
Expand Down
32 changes: 32 additions & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,38 @@ def _create_schema(
raise
logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e)

def current_user(self) -> str:
"""Return the identity of the currently-connected principal.

Uses SQL ``CURRENT_USER()`` which is supported by Spark/Databricks and
DuckDB. Override in adapters where a different mechanism is required.
"""
row = self.fetchone("SELECT CURRENT_USER()")
if not row:
raise SQLMeshError("Could not determine current user: CURRENT_USER() returned no rows")
return row[0]

def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None:
"""Set the owner of a schema.

No-op by default. Override in dialect-specific adapters that support ownership control
(e.g. Spark/Databricks Unity Catalog: ALTER SCHEMA ... OWNER TO ...).
"""

def alter_view_owner(self, view_name: TableName, owner: str) -> None:
"""Set the owner of a view.

No-op by default. Override in dialect-specific adapters that support ownership control
(e.g. Spark/Databricks Unity Catalog: ALTER VIEW ... OWNER TO ...).
"""

def alter_table_owner(self, table_name: TableName, owner: str) -> None:
"""Set the owner of a table.

No-op by default. Override in dialect-specific adapters that support ownership control
(e.g. Spark/Databricks Unity Catalog: ALTER TABLE ... OWNER TO ...).
"""

def drop_schema(
self,
schema_name: SchemaName,
Expand Down
21 changes: 21 additions & 0 deletions sqlmesh/core/engine_adapter/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,27 @@ def _build_create_comment_column_exp(

return f"ALTER TABLE {table_sql} ALTER COLUMN {column_sql} COMMENT {comment_sql}"

def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None:
schema_sql = exp.to_table(schema_name, dialect=self.dialect).sql(
dialect=self.dialect, identify=True
)
owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect)
self.execute(f"ALTER SCHEMA {schema_sql} OWNER TO {owner_sql}")

def alter_view_owner(self, view_name: TableName, owner: str) -> None:
view_sql = exp.to_table(view_name, dialect=self.dialect).sql(
dialect=self.dialect, identify=True
)
owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect)
self.execute(f"ALTER VIEW {view_sql} OWNER TO {owner_sql}")

def alter_table_owner(self, table_name: TableName, owner: str) -> None:
table_sql = exp.to_table(table_name, dialect=self.dialect).sql(
dialect=self.dialect, identify=True
)
owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect)
self.execute(f"ALTER TABLE {table_sql} OWNER TO {owner_sql}")

@classmethod
def _wap_branch_name(cls, wap_id: str) -> str:
return f"{cls.WAP_PREFIX}{wap_id}"
22 changes: 21 additions & 1 deletion sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from sqlmesh.core.plan.common import identify_restatement_intervals_across_snapshot_versions
from sqlmesh.utils import CorrelationId
from sqlmesh.utils.concurrency import NodeExecutionFailedError
from sqlmesh.core.config.ownership import OwnershipConfig
from sqlmesh.utils.errors import PlanError, ConflictingPlanError, SQLMeshError
from sqlmesh.utils.date import now, to_timestamp

Expand Down Expand Up @@ -74,12 +75,14 @@ def __init__(
create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler],
default_catalog: t.Optional[str],
console: t.Optional[Console] = None,
ownership_config: t.Optional[OwnershipConfig] = None,
):
self.state_sync = state_sync
self.snapshot_evaluator = snapshot_evaluator
self.create_scheduler = create_scheduler
self.default_catalog = default_catalog
self.console = console or get_console()
self.ownership_config = ownership_config
self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None

def evaluate(
Expand Down Expand Up @@ -172,6 +175,11 @@ def visit_physical_layer_update_stage(
self.console.log_success(skip_message)
return

physical_owner = (
self.ownership_config.resolve_physical_owner(self.snapshot_evaluator.adapter)
if self.ownership_config
else None
)
completion_status = None
progress_stopped = False
try:
Expand All @@ -185,6 +193,7 @@ def visit_physical_layer_update_stage(
x, plan.environment, self.default_catalog
),
on_complete=self.console.update_creation_progress,
owner=physical_owner,
)
if completion_status.is_nothing_to_do:
self.console.log_success(skip_message)
Expand All @@ -209,9 +218,14 @@ def visit_physical_layer_update_stage(
def visit_physical_layer_schema_creation_stage(
self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
) -> None:
physical_owner = (
self.ownership_config.resolve_physical_owner(self.snapshot_evaluator.adapter)
if self.ownership_config
else None
)
try:
self.snapshot_evaluator.create_physical_schemas(
stage.snapshots, stage.deployability_index
stage.snapshots, stage.deployability_index, owner=physical_owner
)
except Exception as ex:
raise PlanError("Plan application failed.") from ex
Expand Down Expand Up @@ -434,6 +448,11 @@ def _promote_snapshots(
deployability_index: t.Optional[DeployabilityIndex] = None,
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
) -> None:
owner: t.Optional[str] = None
if self.ownership_config:
owner = self.ownership_config.resolve_owner(
environment_naming_info.name, self.snapshot_evaluator.adapter
)
self.snapshot_evaluator.promote(
target_snapshots,
start=plan.start,
Expand All @@ -449,6 +468,7 @@ def _promote_snapshots(
environment_naming_info=environment_naming_info,
deployability_index=deployability_index,
on_complete=on_complete,
owner=owner,
)

def _demote_snapshots(
Expand Down
Loading