From 1f514038f19384e0202fa302a3ac9ab1e19d586a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 17 May 2026 14:03:23 -0400 Subject: [PATCH 01/10] feat: user-defined OptimizerRule and AnalyzerRule from Python Expose `SessionContext.add_optimizer_rule` and `SessionContext.add_analyzer_rule` symmetric with the existing `remove_optimizer_rule`. Each accepts a Python subclass of the new `datafusion.optimizer.OptimizerRule` / `AnalyzerRule` ABCs. Implementation: * New `crates/core/src/optimizer_rules.rs` wraps user Python instances in `PyOptimizerRuleAdapter` / `PyAnalyzerRuleAdapter`, which implement the upstream `OptimizerRule` / `AnalyzerRule` traits. * `OptimizerRule.rewrite(plan)` returns `None` for "no change" or a new `LogicalPlan`. The adapter maps that to `Transformed::no` / `Transformed::yes` so the upstream optimizer's fixed-point loop terminates correctly. * `AnalyzerRule.analyze(plan)` must always return a `LogicalPlan`; returning `None` surfaces a `DataFusionError::Execution` naming the offending rule. * The upstream `&dyn OptimizerConfig` / `&ConfigOptions` arguments are not surfaced to Python in this MVP; rules that need configuration should capture it at construction time (for example by holding a `SessionContext` reference) or be implemented in Rust. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/context.rs | 12 +++ crates/core/src/lib.rs | 1 + crates/core/src/optimizer_rules.rs | 168 +++++++++++++++++++++++++++++ python/datafusion/context.py | 47 ++++++++ python/datafusion/optimizer.py | 144 +++++++++++++++++++++++++ python/tests/test_optimizer.py | 111 +++++++++++++++++++ 6 files changed, 483 insertions(+) create mode 100644 crates/core/src/optimizer_rules.rs create mode 100644 python/datafusion/optimizer.py create mode 100644 python/tests/test_optimizer.py diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 642afeef7..67f8e001a 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -1145,6 +1145,18 @@ impl PySessionContext { self.ctx.remove_optimizer_rule(name) } + pub fn add_optimizer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> { + let adapter = crate::optimizer_rules::build_optimizer_rule(rule)?; + self.ctx.add_optimizer_rule(adapter); + Ok(()) + } + + pub fn add_analyzer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> { + let adapter = crate::optimizer_rules::build_analyzer_rule(rule)?; + self.ctx.add_analyzer_rule(adapter); + Ok(()) + } + pub fn table_provider(&self, name: &str, py: Python) -> PyResult { let provider = wait_for_future(py, self.ctx.table_provider(name)) // Outer error: runtime/async failure diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 8b622d344..1c1227ce2 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -45,6 +45,7 @@ pub mod expr; #[allow(clippy::borrow_deref_ref)] mod functions; pub mod metrics; +pub mod optimizer_rules; mod options; pub mod physical_plan; mod pyarrow_filter_expression; diff --git a/crates/core/src/optimizer_rules.rs b/crates/core/src/optimizer_rules.rs new file mode 100644 index 000000000..a281272ed --- /dev/null +++ b/crates/core/src/optimizer_rules.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Bridges between user-provided Python rule classes and the upstream +//! [`OptimizerRule`] / [`AnalyzerRule`] traits. +//! +//! The Python side defines abstract base classes ``OptimizerRule`` and +//! ``AnalyzerRule`` with ``name()`` plus, respectively, ``rewrite(plan)`` +//! and ``analyze(plan)``. Instances are wrapped in +//! [`PyOptimizerRuleAdapter`] / [`PyAnalyzerRuleAdapter`] before being +//! handed to [`SessionContext::add_optimizer_rule`] / +//! [`SessionContext::add_analyzer_rule`]. +//! +//! `rewrite` may return ``None`` to signal "no transformation" — the +//! adapter maps that to [`Transformed::no`]. Any returned +//! :class:`LogicalPlan` becomes [`Transformed::yes`]. `analyze` is +//! mandatory-rewrite (must return a plan); returning ``None`` is an +//! error. +//! +//! The upstream ``&dyn OptimizerConfig`` / ``&ConfigOptions`` arguments +//! are not surfaced to Python in this MVP. Rules that need configuration +//! access should be implemented in Rust today; Python rules read state +//! from the plan and from any captured ``SessionContext`` they were +//! constructed with. + +use std::fmt; +use std::sync::Arc; + +use datafusion::common::config::ConfigOptions; +use datafusion::common::tree_node::Transformed; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::logical_expr::LogicalPlan; +use datafusion::optimizer::analyzer::AnalyzerRule; +use datafusion::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; +use pyo3::prelude::*; + +use crate::errors::to_datafusion_err; +use crate::sql::logical::PyLogicalPlan; + +/// Wraps a Python ``OptimizerRule`` instance so that it can be registered +/// with the upstream optimizer pipeline. +pub struct PyOptimizerRuleAdapter { + rule: Py, + name: String, +} + +impl PyOptimizerRuleAdapter { + pub fn new(rule: Bound<'_, PyAny>) -> PyResult { + let name = rule.call_method0("name")?.extract::()?; + Ok(Self { + rule: rule.unbind(), + name, + }) + } +} + +impl fmt::Debug for PyOptimizerRuleAdapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PyOptimizerRuleAdapter") + .field("name", &self.name) + .finish() + } +} + +impl OptimizerRule for PyOptimizerRuleAdapter { + fn name(&self) -> &str { + &self.name + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> DataFusionResult> { + Python::attach(|py| { + let py_plan = PyLogicalPlan::from(plan.clone()); + let result = self + .rule + .bind(py) + .call_method1("rewrite", (py_plan,)) + .map_err(to_datafusion_err)?; + if result.is_none() { + return Ok(Transformed::no(plan)); + } + let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?; + Ok(Transformed::yes(LogicalPlan::from(rewritten))) + }) + } +} + +/// Wraps a Python ``AnalyzerRule`` instance so that it can be registered +/// with the upstream analyzer pipeline. +pub struct PyAnalyzerRuleAdapter { + rule: Py, + name: String, +} + +impl PyAnalyzerRuleAdapter { + pub fn new(rule: Bound<'_, PyAny>) -> PyResult { + let name = rule.call_method0("name")?.extract::()?; + Ok(Self { + rule: rule.unbind(), + name, + }) + } +} + +impl fmt::Debug for PyAnalyzerRuleAdapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PyAnalyzerRuleAdapter") + .field("name", &self.name) + .finish() + } +} + +impl AnalyzerRule for PyAnalyzerRuleAdapter { + fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> DataFusionResult { + Python::attach(|py| { + let py_plan = PyLogicalPlan::from(plan); + let result = self + .rule + .bind(py) + .call_method1("analyze", (py_plan,)) + .map_err(to_datafusion_err)?; + if result.is_none() { + return Err(DataFusionError::Execution(format!( + "AnalyzerRule {} returned None from analyze(); analyzer rules \ + must return a LogicalPlan", + self.name + ))); + } + let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?; + Ok(LogicalPlan::from(rewritten)) + }) + } + + fn name(&self) -> &str { + &self.name + } +} + +/// Construct an adapter from a Python ``OptimizerRule`` instance. +pub(crate) fn build_optimizer_rule( + rule: Bound<'_, PyAny>, +) -> PyResult> { + Ok(Arc::new(PyOptimizerRuleAdapter::new(rule)?)) +} + +/// Construct an adapter from a Python ``AnalyzerRule`` instance. +pub(crate) fn build_analyzer_rule( + rule: Bound<'_, PyAny>, +) -> PyResult> { + Ok(Arc::new(PyAnalyzerRuleAdapter::new(rule)?)) +} diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 5c3501941..e2a4305c4 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -90,6 +90,7 @@ from datafusion.catalog import CatalogProvider, Table from datafusion.common import DFSchema from datafusion.expr import Expr, SortKey + from datafusion.optimizer import AnalyzerRule, OptimizerRule from datafusion.plan import ExecutionPlan, LogicalPlan from datafusion.user_defined import ( AggregateUDF, @@ -1260,6 +1261,52 @@ def register_udwf(self, udwf: WindowUDF) -> None: """Register a user-defined window function (UDWF) with the context.""" self.ctx.register_udwf(udwf._udwf) + def add_optimizer_rule(self, rule: OptimizerRule) -> None: + """Append a user-defined :class:`OptimizerRule` to the session. + + The rule's :py:meth:`OptimizerRule.rewrite` method is invoked + during query planning. Returning ``None`` from ``rewrite`` + signals no change; returning a new + :class:`~datafusion.plan.LogicalPlan` signals a rewrite. + + Args: + rule: An instance of a class that implements + :class:`datafusion.optimizer.OptimizerRule`. + + Examples: + >>> from datafusion.optimizer import OptimizerRule + >>> class NoopRule(OptimizerRule): + ... def name(self) -> str: return "noop" + ... def rewrite(self, plan): return None + >>> ctx = dfn.SessionContext() + >>> ctx.add_optimizer_rule(NoopRule()) + >>> ctx.remove_optimizer_rule("noop") + True + """ + self.ctx.add_optimizer_rule(rule) + + def add_analyzer_rule(self, rule: AnalyzerRule) -> None: + """Append a user-defined :class:`AnalyzerRule` to the session. + + The rule's :py:meth:`AnalyzerRule.analyze` method is invoked + during the analysis phase of query planning. Analyzer rules + must always return a :class:`~datafusion.plan.LogicalPlan` + (return the input plan unchanged when no rewrite applies). + + Args: + rule: An instance of a class that implements + :class:`datafusion.optimizer.AnalyzerRule`. + + Examples: + >>> from datafusion.optimizer import AnalyzerRule + >>> class Identity(AnalyzerRule): + ... def name(self) -> str: return "identity" + ... def analyze(self, plan): return plan + >>> ctx = dfn.SessionContext() + >>> ctx.add_analyzer_rule(Identity()) + """ + self.ctx.add_analyzer_rule(rule) + def deregister_udwf(self, name: str) -> None: """Remove a user-defined window function from the session. diff --git a/python/datafusion/optimizer.py b/python/datafusion/optimizer.py new file mode 100644 index 000000000..84beca001 --- /dev/null +++ b/python/datafusion/optimizer.py @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Abstract base classes for user-defined optimizer and analyzer rules. + +DataFusion's planner is built from two pipelines: + +* The :class:`Analyzer ` runs first and is responsible for + semantic rewrites — type coercion, function lookup, and rewrites that + cannot leave the plan structurally unchanged. Analyzer rules must + return a fully rewritten :class:`~datafusion.plan.LogicalPlan` every + time they run. +* The :class:`Optimizer ` runs afterwards and applies + cost-driven or semantics-preserving transformations until a fixed + point is reached. Optimizer rules may return ``None`` to signal "no + change," letting the optimizer terminate as soon as no rule mutates + the plan. + +Both ABCs are registered against a :class:`~datafusion.SessionContext` +through :py:meth:`~datafusion.SessionContext.add_optimizer_rule` / +:py:meth:`~datafusion.SessionContext.add_analyzer_rule`. + +The upstream rule traits also receive an +``OptimizerConfig`` / ``ConfigOptions`` reference. Those are not +surfaced to Python here; rules that need configuration access should +capture state at construction time (for example a +:class:`~datafusion.SessionContext` reference) or be implemented in +Rust. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from datafusion.plan import LogicalPlan + +__all__ = ["AnalyzerRule", "OptimizerRule"] + + +class OptimizerRule(ABC): + """Abstract base class for a user-defined optimizer rule. + + Subclasses must implement :py:meth:`name` and :py:meth:`rewrite`. + + Examples: + >>> import datafusion as dfn + >>> from datafusion.optimizer import OptimizerRule + >>> from datafusion.plan import LogicalPlan + >>> + >>> class TaggingRule(OptimizerRule): + ... # Mark each plan we see; never actually mutate it. + ... def __init__(self) -> None: + ... self.seen = 0 + ... + ... def name(self) -> str: + ... return "tagging_rule" + ... + ... def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: + ... self.seen += 1 + ... return None + >>> + >>> ctx = dfn.SessionContext() + >>> rule = TaggingRule() + >>> ctx.add_optimizer_rule(rule) + >>> ctx.from_pydict({"a": [1]}).count() + 1 + >>> rule.seen > 0 + True + """ + + @abstractmethod + def name(self) -> str: + """Return a unique name for this rule. + + DataFusion uses the name to deduplicate rules and to support + removal via :py:meth:`~datafusion.SessionContext.remove_optimizer_rule`. + """ + + @abstractmethod + def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: + """Attempt to rewrite ``plan``. + + Return a new :class:`~datafusion.plan.LogicalPlan` if the rule + produced one, or ``None`` to indicate no change. The optimizer + calls each rule repeatedly until no rule reports a change, so + returning ``None`` when nothing was rewritten is important for + termination. + """ + + +class AnalyzerRule(ABC): + """Abstract base class for a user-defined analyzer rule. + + Subclasses must implement :py:meth:`name` and :py:meth:`analyze`. + Unlike optimizer rules, analyzer rules must always return a + :class:`~datafusion.plan.LogicalPlan` (return the input plan + unmodified when nothing applies). + + Examples: + >>> import datafusion as dfn + >>> from datafusion.optimizer import AnalyzerRule + >>> from datafusion.plan import LogicalPlan + >>> + >>> class IdentityAnalyzer(AnalyzerRule): + ... def name(self) -> str: + ... return "identity_analyzer" + ... + ... def analyze(self, plan: LogicalPlan) -> LogicalPlan: + ... return plan + >>> + >>> ctx = dfn.SessionContext() + >>> ctx.add_analyzer_rule(IdentityAnalyzer()) + >>> ctx.from_pydict({"a": [1, 2, 3]}).count() + 3 + """ + + @abstractmethod + def name(self) -> str: + """Return a unique name for this rule.""" + + @abstractmethod + def analyze(self, plan: LogicalPlan) -> LogicalPlan: + """Rewrite ``plan`` and return the new plan. + + Analyzer rules must always return a + :class:`~datafusion.plan.LogicalPlan`. Return the input plan + unchanged when there is nothing to rewrite. + """ diff --git a/python/tests/test_optimizer.py b/python/tests/test_optimizer.py new file mode 100644 index 000000000..5c161573b --- /dev/null +++ b/python/tests/test_optimizer.py @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from datafusion import SessionContext +from datafusion.optimizer import AnalyzerRule, OptimizerRule +from datafusion.plan import LogicalPlan + + +def test_optimizer_rule_is_invoked_during_planning() -> None: + """A registered OptimizerRule is called as the plan is optimized.""" + seen_plans: list[str] = [] + + class TracingRule(OptimizerRule): + def name(self) -> str: + return "tracing_rule" + + def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: + seen_plans.append(plan.display()) + return None + + ctx = SessionContext() + ctx.add_optimizer_rule(TracingRule()) + df = ctx.from_pydict({"a": [1, 2, 3]}) + result = df.collect() + + # The rule sees the plan at least once during optimization, and + # since it returns None each time the optimizer terminates cleanly. + assert seen_plans, "optimizer rule was not invoked during planning" + assert result[0].column(0).to_pylist() == [1, 2, 3] + + +def test_optimizer_rule_can_be_removed_by_name() -> None: + """remove_optimizer_rule deregisters a user-supplied rule by name.""" + + class NoopRule(OptimizerRule): + def name(self) -> str: + return "noop_for_removal" + + def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: + return None + + ctx = SessionContext() + ctx.add_optimizer_rule(NoopRule()) + assert ctx.remove_optimizer_rule("noop_for_removal") is True + # Second remove returns False — already gone. + assert ctx.remove_optimizer_rule("noop_for_removal") is False + + +def test_analyzer_rule_is_invoked_during_analysis() -> None: + """A registered AnalyzerRule is called and must return a plan.""" + invocations: list[str] = [] + + class IdentityAnalyzer(AnalyzerRule): + def name(self) -> str: + return "identity_analyzer" + + def analyze(self, plan: LogicalPlan) -> LogicalPlan: + invocations.append(plan.display()) + return plan + + ctx = SessionContext() + ctx.add_analyzer_rule(IdentityAnalyzer()) + df = ctx.from_pydict({"a": [1, 2, 3]}) + result = df.collect() + + assert invocations, "analyzer rule was not invoked" + assert result[0].column(0).to_pylist() == [1, 2, 3] + + +def test_analyzer_rule_returning_none_errors() -> None: + """Analyzer rules must return a LogicalPlan; None surfaces as an error.""" + + class BadAnalyzer(AnalyzerRule): + def name(self) -> str: + return "bad_analyzer" + + def analyze(self, plan: LogicalPlan): # type: ignore[override] + return None + + ctx = SessionContext() + ctx.add_analyzer_rule(BadAnalyzer()) + df = ctx.from_pydict({"a": [1]}) + with pytest.raises(Exception, match="bad_analyzer"): + df.collect() + + +def test_optimizer_rule_abc_cannot_be_instantiated() -> None: + """OptimizerRule is abstract — direct instantiation must fail.""" + with pytest.raises(TypeError): + OptimizerRule() # type: ignore[abstract] + + +def test_analyzer_rule_abc_cannot_be_instantiated() -> None: + """AnalyzerRule is abstract — direct instantiation must fail.""" + with pytest.raises(TypeError): + AnalyzerRule() # type: ignore[abstract] From daa8da94b513d68911ac33bd6786055654587233 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 15:57:19 -0400 Subject: [PATCH 02/10] feat: import FFI physical optimizer rules; drop Python logical rules Replace the Python-defined OptimizerRule/AnalyzerRule approach with FFI-imported physical optimizer rules. The Python logical-rule approach could observe plans but not transform them: there are no Python constructors for LogicalPlan node variants, so a rule could only return None or the input plan unchanged. The audience for custom rules also overlaps strongly with people who can write Rust. DataFusion exposes no FFI bridge for the logical OptimizerRule/AnalyzerRule traits, but it does export FFI_PhysicalOptimizerRule for the physical PhysicalOptimizerRule trait. This commit imports those instead. Changes: * Remove crates/core/src/optimizer_rules.rs, python/datafusion/optimizer.py, python/tests/test_optimizer.py, and the SessionContext.add_optimizer_rule / add_analyzer_rule methods. remove_optimizer_rule is unchanged (pre-existing). * New crates/core/src/physical_optimizer.rs reads a __datafusion_physical_optimizer_rule__ capsule and converts it via Arc::from(&FFI_PhysicalOptimizerRule). * SessionContext gains a physical_optimizer_rules constructor argument. Upstream offers no API to add physical rules to a live context, so they are appended to the builder at construction time only. * The datafusion-ffi-example crate gains MyPhysicalOptimizerRule, a counter-backed rule used by _test_physical_optimizer_rule.py to prove the rule fires over FFI during physical planning. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/context.rs | 30 ++-- crates/core/src/lib.rs | 2 +- crates/core/src/optimizer_rules.rs | 168 ------------------ crates/core/src/physical_optimizer.rs | 70 ++++++++ .../tests/_test_physical_optimizer_rule.py | 56 ++++++ examples/datafusion-ffi-example/src/lib.rs | 3 + .../src/physical_optimizer.rs | 98 ++++++++++ python/datafusion/context.py | 68 +++---- python/datafusion/optimizer.py | 144 --------------- python/tests/test_optimizer.py | 111 ------------ 10 files changed, 262 insertions(+), 488 deletions(-) delete mode 100644 crates/core/src/optimizer_rules.rs create mode 100644 crates/core/src/physical_optimizer.rs create mode 100644 examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py create mode 100644 examples/datafusion-ffi-example/src/physical_optimizer.rs delete mode 100644 python/datafusion/optimizer.py delete mode 100644 python/tests/test_optimizer.py diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 67f8e001a..2d399476f 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -375,11 +375,12 @@ pub struct PySessionContext { #[pymethods] impl PySessionContext { - #[pyo3(signature = (config=None, runtime=None))] + #[pyo3(signature = (config=None, runtime=None, physical_optimizer_rules=None))] #[new] pub fn new( config: Option, runtime: Option, + physical_optimizer_rules: Option>>, ) -> PyDataFusionResult { let config = if let Some(c) = config { c.config @@ -392,11 +393,20 @@ impl PySessionContext { RuntimeEnvBuilder::default() }; let runtime = Arc::new(runtime_env_builder.build()?); - let session_state = SessionStateBuilder::new() + let mut state_builder = SessionStateBuilder::new() .with_config(config) .with_runtime_env(runtime) - .with_default_features() - .build(); + .with_default_features(); + // DataFusion exposes no FFI bridge for the logical optimizer or + // analyzer, so only physical optimizer rules can be supplied from + // another library. They are appended after the default rules at + // construction time; there is no upstream API to add them to a live + // `SessionContext`. + for rule in physical_optimizer_rules.unwrap_or_default() { + let rule = crate::physical_optimizer::physical_optimizer_rule_from_pyobject(&rule)?; + state_builder = state_builder.with_physical_optimizer_rule(rule); + } + let session_state = state_builder.build(); let ctx = Arc::new(SessionContext::new_with_state(session_state)); Ok(PySessionContext { ctx, @@ -1145,18 +1155,6 @@ impl PySessionContext { self.ctx.remove_optimizer_rule(name) } - pub fn add_optimizer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> { - let adapter = crate::optimizer_rules::build_optimizer_rule(rule)?; - self.ctx.add_optimizer_rule(adapter); - Ok(()) - } - - pub fn add_analyzer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> { - let adapter = crate::optimizer_rules::build_analyzer_rule(rule)?; - self.ctx.add_analyzer_rule(adapter); - Ok(()) - } - pub fn table_provider(&self, name: &str, py: Python) -> PyResult { let provider = wait_for_future(py, self.ctx.table_provider(name)) // Outer error: runtime/async failure diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 1c1227ce2..2ca4237e5 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -45,8 +45,8 @@ pub mod expr; #[allow(clippy::borrow_deref_ref)] mod functions; pub mod metrics; -pub mod optimizer_rules; mod options; +pub mod physical_optimizer; pub mod physical_plan; mod pyarrow_filter_expression; pub mod pyarrow_util; diff --git a/crates/core/src/optimizer_rules.rs b/crates/core/src/optimizer_rules.rs deleted file mode 100644 index a281272ed..000000000 --- a/crates/core/src/optimizer_rules.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Bridges between user-provided Python rule classes and the upstream -//! [`OptimizerRule`] / [`AnalyzerRule`] traits. -//! -//! The Python side defines abstract base classes ``OptimizerRule`` and -//! ``AnalyzerRule`` with ``name()`` plus, respectively, ``rewrite(plan)`` -//! and ``analyze(plan)``. Instances are wrapped in -//! [`PyOptimizerRuleAdapter`] / [`PyAnalyzerRuleAdapter`] before being -//! handed to [`SessionContext::add_optimizer_rule`] / -//! [`SessionContext::add_analyzer_rule`]. -//! -//! `rewrite` may return ``None`` to signal "no transformation" — the -//! adapter maps that to [`Transformed::no`]. Any returned -//! :class:`LogicalPlan` becomes [`Transformed::yes`]. `analyze` is -//! mandatory-rewrite (must return a plan); returning ``None`` is an -//! error. -//! -//! The upstream ``&dyn OptimizerConfig`` / ``&ConfigOptions`` arguments -//! are not surfaced to Python in this MVP. Rules that need configuration -//! access should be implemented in Rust today; Python rules read state -//! from the plan and from any captured ``SessionContext`` they were -//! constructed with. - -use std::fmt; -use std::sync::Arc; - -use datafusion::common::config::ConfigOptions; -use datafusion::common::tree_node::Transformed; -use datafusion::error::{DataFusionError, Result as DataFusionResult}; -use datafusion::logical_expr::LogicalPlan; -use datafusion::optimizer::analyzer::AnalyzerRule; -use datafusion::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; -use pyo3::prelude::*; - -use crate::errors::to_datafusion_err; -use crate::sql::logical::PyLogicalPlan; - -/// Wraps a Python ``OptimizerRule`` instance so that it can be registered -/// with the upstream optimizer pipeline. -pub struct PyOptimizerRuleAdapter { - rule: Py, - name: String, -} - -impl PyOptimizerRuleAdapter { - pub fn new(rule: Bound<'_, PyAny>) -> PyResult { - let name = rule.call_method0("name")?.extract::()?; - Ok(Self { - rule: rule.unbind(), - name, - }) - } -} - -impl fmt::Debug for PyOptimizerRuleAdapter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PyOptimizerRuleAdapter") - .field("name", &self.name) - .finish() - } -} - -impl OptimizerRule for PyOptimizerRuleAdapter { - fn name(&self) -> &str { - &self.name - } - - fn rewrite( - &self, - plan: LogicalPlan, - _config: &dyn OptimizerConfig, - ) -> DataFusionResult> { - Python::attach(|py| { - let py_plan = PyLogicalPlan::from(plan.clone()); - let result = self - .rule - .bind(py) - .call_method1("rewrite", (py_plan,)) - .map_err(to_datafusion_err)?; - if result.is_none() { - return Ok(Transformed::no(plan)); - } - let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?; - Ok(Transformed::yes(LogicalPlan::from(rewritten))) - }) - } -} - -/// Wraps a Python ``AnalyzerRule`` instance so that it can be registered -/// with the upstream analyzer pipeline. -pub struct PyAnalyzerRuleAdapter { - rule: Py, - name: String, -} - -impl PyAnalyzerRuleAdapter { - pub fn new(rule: Bound<'_, PyAny>) -> PyResult { - let name = rule.call_method0("name")?.extract::()?; - Ok(Self { - rule: rule.unbind(), - name, - }) - } -} - -impl fmt::Debug for PyAnalyzerRuleAdapter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PyAnalyzerRuleAdapter") - .field("name", &self.name) - .finish() - } -} - -impl AnalyzerRule for PyAnalyzerRuleAdapter { - fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> DataFusionResult { - Python::attach(|py| { - let py_plan = PyLogicalPlan::from(plan); - let result = self - .rule - .bind(py) - .call_method1("analyze", (py_plan,)) - .map_err(to_datafusion_err)?; - if result.is_none() { - return Err(DataFusionError::Execution(format!( - "AnalyzerRule {} returned None from analyze(); analyzer rules \ - must return a LogicalPlan", - self.name - ))); - } - let rewritten: PyLogicalPlan = result.extract().map_err(to_datafusion_err)?; - Ok(LogicalPlan::from(rewritten)) - }) - } - - fn name(&self) -> &str { - &self.name - } -} - -/// Construct an adapter from a Python ``OptimizerRule`` instance. -pub(crate) fn build_optimizer_rule( - rule: Bound<'_, PyAny>, -) -> PyResult> { - Ok(Arc::new(PyOptimizerRuleAdapter::new(rule)?)) -} - -/// Construct an adapter from a Python ``AnalyzerRule`` instance. -pub(crate) fn build_analyzer_rule( - rule: Bound<'_, PyAny>, -) -> PyResult> { - Ok(Arc::new(PyAnalyzerRuleAdapter::new(rule)?)) -} diff --git a/crates/core/src/physical_optimizer.rs b/crates/core/src/physical_optimizer.rs new file mode 100644 index 000000000..8d2e687c8 --- /dev/null +++ b/crates/core/src/physical_optimizer.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Imports physical optimizer rules supplied by another library over FFI. +//! +//! DataFusion has no FFI bridge for the logical [`OptimizerRule`] / +//! [`AnalyzerRule`] traits, but it does export +//! [`FFI_PhysicalOptimizerRule`] for the physical +//! [`PhysicalOptimizerRule`] trait. A producer crate (typically a separate +//! compiled extension) exposes an object with a +//! ``__datafusion_physical_optimizer_rule__`` method returning a +//! :class:`PyCapsule` that wraps an [`FFI_PhysicalOptimizerRule`]. This +//! module reads that capsule and converts it into an +//! ``Arc`` so it can be registered with a +//! [`SessionContext`](datafusion::prelude::SessionContext) at construction +//! time. +//! +//! [`OptimizerRule`]: datafusion::optimizer::optimizer::OptimizerRule +//! [`AnalyzerRule`]: datafusion::optimizer::analyzer::AnalyzerRule + +use std::ptr::NonNull; +use std::sync::Arc; + +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule; +use pyo3::prelude::*; +use pyo3::types::PyCapsule; + +use crate::errors::{PyDataFusionError, PyDataFusionResult, to_datafusion_err}; + +/// Convert a Python object exposing ``__datafusion_physical_optimizer_rule__`` +/// into an ``Arc`` by reading its FFI capsule. +pub(crate) fn physical_optimizer_rule_from_pyobject( + obj: &Bound<'_, PyAny>, +) -> PyDataFusionResult> { + if !obj.hasattr("__datafusion_physical_optimizer_rule__")? { + return Err(PyDataFusionError::Common( + "Expected physical optimizer rule object to define \ + __datafusion_physical_optimizer_rule__()" + .to_string(), + )); + } + + let capsule = obj + .getattr("__datafusion_physical_optimizer_rule__")? + .call0()?; + let capsule = capsule.cast::().map_err(to_datafusion_err)?; + let data: NonNull = capsule + .pointer_checked(Some(c"datafusion_physical_optimizer_rule"))? + .cast(); + let ffi_rule = unsafe { data.as_ref() }; + + Ok(Arc::::from( + ffi_rule, + )) +} diff --git a/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py new file mode 100644 index 000000000..6e566c0f4 --- /dev/null +++ b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pyarrow as pa +from datafusion import SessionContext +from datafusion_ffi_example import MyPhysicalOptimizerRule + + +def _setup_session_with_rule() -> tuple[SessionContext, MyPhysicalOptimizerRule]: + rule = MyPhysicalOptimizerRule() + ctx = SessionContext(physical_optimizer_rules=[rule]) + batch = pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3])], + names=["a"], + ) + ctx.register_record_batches("t", [[batch]]) + return ctx, rule + + +def test_ffi_physical_optimizer_rule_runs_during_planning(): + """A rule supplied via physical_optimizer_rules is invoked while the + physical plan is built, and the query still returns correct results.""" + ctx, rule = _setup_session_with_rule() + + before = rule.optimize_calls() + result = ctx.sql("SELECT a FROM t").collect() + after = rule.optimize_calls() + + assert after > before, ( + f"Expected user FFI physical optimizer rule to fire, " + f"before={before} after={after}" + ) + assert result[0].column(0).to_pylist() == [1, 2, 3] + + +def test_ffi_physical_optimizer_rule_export(): + """The rule object exposes the FFI capsule entry point.""" + rule = MyPhysicalOptimizerRule() + capsule = rule.__datafusion_physical_optimizer_rule__() + assert capsule is not None diff --git a/examples/datafusion-ffi-example/src/lib.rs b/examples/datafusion-ffi-example/src/lib.rs index 3323ac982..eccf7b81a 100644 --- a/examples/datafusion-ffi-example/src/lib.rs +++ b/examples/datafusion-ffi-example/src/lib.rs @@ -22,6 +22,7 @@ use crate::catalog_provider::{FixedSchemaProvider, MyCatalogProvider, MyCatalogP use crate::config::MyConfig; use crate::logical_extension_codec::MyLogicalExtensionCodec; use crate::physical_extension_codec::MyPhysicalExtensionCodec; +use crate::physical_optimizer::MyPhysicalOptimizerRule; use crate::scalar_udf::IsNullUDF; use crate::table_function::MyTableFunction; use crate::table_provider::MyTableProvider; @@ -33,6 +34,7 @@ pub(crate) mod catalog_provider; pub(crate) mod config; pub(crate) mod logical_extension_codec; pub(crate) mod physical_extension_codec; +pub(crate) mod physical_optimizer; pub(crate) mod scalar_udf; pub(crate) mod table_function; pub(crate) mod table_provider; @@ -55,5 +57,6 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/examples/datafusion-ffi-example/src/physical_optimizer.rs b/examples/datafusion-ffi-example/src/physical_optimizer.rs new file mode 100644 index 000000000..0acd1bb4a --- /dev/null +++ b/examples/datafusion-ffi-example/src/physical_optimizer.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use datafusion::common::Result; +use datafusion::common::config::ConfigOptions; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule; +use datafusion_python_util::get_tokio_runtime; +use pyo3::prelude::*; +use pyo3::types::PyCapsule; + +/// A physical optimizer rule that leaves every plan unchanged but bumps a +/// shared counter each time it runs. Tests use the counter to prove that a +/// session built with this rule actually routed physical planning through a +/// user-supplied [`PhysicalOptimizerRule`] over FFI. +#[derive(Debug)] +struct CountingPhysicalOptimizerRule { + optimize_calls: Arc, +} + +impl PhysicalOptimizerRule for CountingPhysicalOptimizerRule { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + self.optimize_calls.fetch_add(1, Ordering::SeqCst); + Ok(plan) + } + + fn name(&self) -> &str { + "counting_physical_optimizer_rule" + } + + fn schema_check(&self) -> bool { + // The plan is returned unchanged, so the schema is preserved. + true + } +} + +/// Python-visible handle that produces an [`FFI_PhysicalOptimizerRule`] and +/// exposes the shared call counter. +#[pyclass( + from_py_object, + name = "MyPhysicalOptimizerRule", + module = "datafusion_ffi_example", + subclass +)] +#[derive(Debug, Default, Clone)] +pub(crate) struct MyPhysicalOptimizerRule { + optimize_calls: Arc, +} + +#[pymethods] +impl MyPhysicalOptimizerRule { + #[new] + fn new() -> Self { + Self::default() + } + + fn optimize_calls(&self) -> usize { + self.optimize_calls.load(Ordering::SeqCst) + } + + fn __datafusion_physical_optimizer_rule__<'py>( + &self, + py: Python<'py>, + ) -> PyResult> { + let rule: Arc = + Arc::new(CountingPhysicalOptimizerRule { + optimize_calls: Arc::clone(&self.optimize_calls), + }); + + let runtime = get_tokio_runtime().handle().clone(); + let ffi = FFI_PhysicalOptimizerRule::new(rule, Some(runtime)); + + let name = cr"datafusion_physical_optimizer_rule".into(); + PyCapsule::new(py, ffi, Some(name)) + } +} diff --git a/python/datafusion/context.py b/python/datafusion/context.py index e2a4305c4..695783055 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -90,7 +90,6 @@ from datafusion.catalog import CatalogProvider, Table from datafusion.common import DFSchema from datafusion.expr import Expr, SortKey - from datafusion.optimizer import AnalyzerRule, OptimizerRule from datafusion.plan import ExecutionPlan, LogicalPlan from datafusion.user_defined import ( AggregateUDF, @@ -525,6 +524,7 @@ def __init__( self, config: SessionConfig | None = None, runtime: RuntimeEnvBuilder | None = None, + physical_optimizer_rules: list[Any] | None = None, ) -> None: """Main interface for executing queries with DataFusion. @@ -535,6 +535,14 @@ def __init__( Args: config: Session configuration options. runtime: Runtime configuration options. + physical_optimizer_rules: User-defined physical optimizer rules to + append to the default set. Each item is an object that exposes a + ``__datafusion_physical_optimizer_rule__`` method returning a + PyCapsule around a ``FFI_PhysicalOptimizerRule`` (typically built + in a separate compiled extension). DataFusion provides no FFI + bridge for logical optimizer or analyzer rules, and there is no + upstream API to add physical rules to a live context, so these + can only be supplied at construction time. Example usage: @@ -545,11 +553,21 @@ def __init__( ctx = SessionContext() df = ctx.read_csv("data.csv") + + To register a physical optimizer rule supplied by a compiled + extension, pass it via ``physical_optimizer_rules``:: + + from datafusion import SessionContext + from my_extension import MyPhysicalOptimizerRule + + ctx = SessionContext( + physical_optimizer_rules=[MyPhysicalOptimizerRule()] + ) """ config = config.config_internal if config is not None else None runtime = runtime.config_internal if runtime is not None else None - self.ctx = SessionContextInternal(config, runtime) + self.ctx = SessionContextInternal(config, runtime, physical_optimizer_rules) def __repr__(self) -> str: """Print a string representation of the Session Context.""" @@ -1261,52 +1279,6 @@ def register_udwf(self, udwf: WindowUDF) -> None: """Register a user-defined window function (UDWF) with the context.""" self.ctx.register_udwf(udwf._udwf) - def add_optimizer_rule(self, rule: OptimizerRule) -> None: - """Append a user-defined :class:`OptimizerRule` to the session. - - The rule's :py:meth:`OptimizerRule.rewrite` method is invoked - during query planning. Returning ``None`` from ``rewrite`` - signals no change; returning a new - :class:`~datafusion.plan.LogicalPlan` signals a rewrite. - - Args: - rule: An instance of a class that implements - :class:`datafusion.optimizer.OptimizerRule`. - - Examples: - >>> from datafusion.optimizer import OptimizerRule - >>> class NoopRule(OptimizerRule): - ... def name(self) -> str: return "noop" - ... def rewrite(self, plan): return None - >>> ctx = dfn.SessionContext() - >>> ctx.add_optimizer_rule(NoopRule()) - >>> ctx.remove_optimizer_rule("noop") - True - """ - self.ctx.add_optimizer_rule(rule) - - def add_analyzer_rule(self, rule: AnalyzerRule) -> None: - """Append a user-defined :class:`AnalyzerRule` to the session. - - The rule's :py:meth:`AnalyzerRule.analyze` method is invoked - during the analysis phase of query planning. Analyzer rules - must always return a :class:`~datafusion.plan.LogicalPlan` - (return the input plan unchanged when no rewrite applies). - - Args: - rule: An instance of a class that implements - :class:`datafusion.optimizer.AnalyzerRule`. - - Examples: - >>> from datafusion.optimizer import AnalyzerRule - >>> class Identity(AnalyzerRule): - ... def name(self) -> str: return "identity" - ... def analyze(self, plan): return plan - >>> ctx = dfn.SessionContext() - >>> ctx.add_analyzer_rule(Identity()) - """ - self.ctx.add_analyzer_rule(rule) - def deregister_udwf(self, name: str) -> None: """Remove a user-defined window function from the session. diff --git a/python/datafusion/optimizer.py b/python/datafusion/optimizer.py deleted file mode 100644 index 84beca001..000000000 --- a/python/datafusion/optimizer.py +++ /dev/null @@ -1,144 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Abstract base classes for user-defined optimizer and analyzer rules. - -DataFusion's planner is built from two pipelines: - -* The :class:`Analyzer ` runs first and is responsible for - semantic rewrites — type coercion, function lookup, and rewrites that - cannot leave the plan structurally unchanged. Analyzer rules must - return a fully rewritten :class:`~datafusion.plan.LogicalPlan` every - time they run. -* The :class:`Optimizer ` runs afterwards and applies - cost-driven or semantics-preserving transformations until a fixed - point is reached. Optimizer rules may return ``None`` to signal "no - change," letting the optimizer terminate as soon as no rule mutates - the plan. - -Both ABCs are registered against a :class:`~datafusion.SessionContext` -through :py:meth:`~datafusion.SessionContext.add_optimizer_rule` / -:py:meth:`~datafusion.SessionContext.add_analyzer_rule`. - -The upstream rule traits also receive an -``OptimizerConfig`` / ``ConfigOptions`` reference. Those are not -surfaced to Python here; rules that need configuration access should -capture state at construction time (for example a -:class:`~datafusion.SessionContext` reference) or be implemented in -Rust. -""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from datafusion.plan import LogicalPlan - -__all__ = ["AnalyzerRule", "OptimizerRule"] - - -class OptimizerRule(ABC): - """Abstract base class for a user-defined optimizer rule. - - Subclasses must implement :py:meth:`name` and :py:meth:`rewrite`. - - Examples: - >>> import datafusion as dfn - >>> from datafusion.optimizer import OptimizerRule - >>> from datafusion.plan import LogicalPlan - >>> - >>> class TaggingRule(OptimizerRule): - ... # Mark each plan we see; never actually mutate it. - ... def __init__(self) -> None: - ... self.seen = 0 - ... - ... def name(self) -> str: - ... return "tagging_rule" - ... - ... def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: - ... self.seen += 1 - ... return None - >>> - >>> ctx = dfn.SessionContext() - >>> rule = TaggingRule() - >>> ctx.add_optimizer_rule(rule) - >>> ctx.from_pydict({"a": [1]}).count() - 1 - >>> rule.seen > 0 - True - """ - - @abstractmethod - def name(self) -> str: - """Return a unique name for this rule. - - DataFusion uses the name to deduplicate rules and to support - removal via :py:meth:`~datafusion.SessionContext.remove_optimizer_rule`. - """ - - @abstractmethod - def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: - """Attempt to rewrite ``plan``. - - Return a new :class:`~datafusion.plan.LogicalPlan` if the rule - produced one, or ``None`` to indicate no change. The optimizer - calls each rule repeatedly until no rule reports a change, so - returning ``None`` when nothing was rewritten is important for - termination. - """ - - -class AnalyzerRule(ABC): - """Abstract base class for a user-defined analyzer rule. - - Subclasses must implement :py:meth:`name` and :py:meth:`analyze`. - Unlike optimizer rules, analyzer rules must always return a - :class:`~datafusion.plan.LogicalPlan` (return the input plan - unmodified when nothing applies). - - Examples: - >>> import datafusion as dfn - >>> from datafusion.optimizer import AnalyzerRule - >>> from datafusion.plan import LogicalPlan - >>> - >>> class IdentityAnalyzer(AnalyzerRule): - ... def name(self) -> str: - ... return "identity_analyzer" - ... - ... def analyze(self, plan: LogicalPlan) -> LogicalPlan: - ... return plan - >>> - >>> ctx = dfn.SessionContext() - >>> ctx.add_analyzer_rule(IdentityAnalyzer()) - >>> ctx.from_pydict({"a": [1, 2, 3]}).count() - 3 - """ - - @abstractmethod - def name(self) -> str: - """Return a unique name for this rule.""" - - @abstractmethod - def analyze(self, plan: LogicalPlan) -> LogicalPlan: - """Rewrite ``plan`` and return the new plan. - - Analyzer rules must always return a - :class:`~datafusion.plan.LogicalPlan`. Return the input plan - unchanged when there is nothing to rewrite. - """ diff --git a/python/tests/test_optimizer.py b/python/tests/test_optimizer.py deleted file mode 100644 index 5c161573b..000000000 --- a/python/tests/test_optimizer.py +++ /dev/null @@ -1,111 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import pytest -from datafusion import SessionContext -from datafusion.optimizer import AnalyzerRule, OptimizerRule -from datafusion.plan import LogicalPlan - - -def test_optimizer_rule_is_invoked_during_planning() -> None: - """A registered OptimizerRule is called as the plan is optimized.""" - seen_plans: list[str] = [] - - class TracingRule(OptimizerRule): - def name(self) -> str: - return "tracing_rule" - - def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: - seen_plans.append(plan.display()) - return None - - ctx = SessionContext() - ctx.add_optimizer_rule(TracingRule()) - df = ctx.from_pydict({"a": [1, 2, 3]}) - result = df.collect() - - # The rule sees the plan at least once during optimization, and - # since it returns None each time the optimizer terminates cleanly. - assert seen_plans, "optimizer rule was not invoked during planning" - assert result[0].column(0).to_pylist() == [1, 2, 3] - - -def test_optimizer_rule_can_be_removed_by_name() -> None: - """remove_optimizer_rule deregisters a user-supplied rule by name.""" - - class NoopRule(OptimizerRule): - def name(self) -> str: - return "noop_for_removal" - - def rewrite(self, plan: LogicalPlan) -> LogicalPlan | None: - return None - - ctx = SessionContext() - ctx.add_optimizer_rule(NoopRule()) - assert ctx.remove_optimizer_rule("noop_for_removal") is True - # Second remove returns False — already gone. - assert ctx.remove_optimizer_rule("noop_for_removal") is False - - -def test_analyzer_rule_is_invoked_during_analysis() -> None: - """A registered AnalyzerRule is called and must return a plan.""" - invocations: list[str] = [] - - class IdentityAnalyzer(AnalyzerRule): - def name(self) -> str: - return "identity_analyzer" - - def analyze(self, plan: LogicalPlan) -> LogicalPlan: - invocations.append(plan.display()) - return plan - - ctx = SessionContext() - ctx.add_analyzer_rule(IdentityAnalyzer()) - df = ctx.from_pydict({"a": [1, 2, 3]}) - result = df.collect() - - assert invocations, "analyzer rule was not invoked" - assert result[0].column(0).to_pylist() == [1, 2, 3] - - -def test_analyzer_rule_returning_none_errors() -> None: - """Analyzer rules must return a LogicalPlan; None surfaces as an error.""" - - class BadAnalyzer(AnalyzerRule): - def name(self) -> str: - return "bad_analyzer" - - def analyze(self, plan: LogicalPlan): # type: ignore[override] - return None - - ctx = SessionContext() - ctx.add_analyzer_rule(BadAnalyzer()) - df = ctx.from_pydict({"a": [1]}) - with pytest.raises(Exception, match="bad_analyzer"): - df.collect() - - -def test_optimizer_rule_abc_cannot_be_instantiated() -> None: - """OptimizerRule is abstract — direct instantiation must fail.""" - with pytest.raises(TypeError): - OptimizerRule() # type: ignore[abstract] - - -def test_analyzer_rule_abc_cannot_be_instantiated() -> None: - """AnalyzerRule is abstract — direct instantiation must fail.""" - with pytest.raises(TypeError): - AnalyzerRule() # type: ignore[abstract] From 13dcdd483f54db21de8fac67168d8da7ede5f030 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:01:38 -0400 Subject: [PATCH 03/10] refactor: type physical_optimizer_rules with an Exportable Protocol Replace the `list[Any]` hint on the SessionContext `physical_optimizer_rules` argument with a `PhysicalOptimizerRuleExportable` Protocol, matching the existing `TableProviderExportable` / `*Exportable` pattern used for other FFI-capsule objects. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/datafusion/context.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 695783055..bd6a9ce74 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -130,6 +130,16 @@ class TableProviderExportable(Protocol): def __datafusion_table_provider__(self, session: Any) -> object: ... # noqa: D105 +class PhysicalOptimizerRuleExportable(Protocol): + """Type hint for object that has __datafusion_physical_optimizer_rule__ PyCapsule. + + The capsule wraps an ``FFI_PhysicalOptimizerRule``, typically produced by a + separate compiled extension. + """ + + def __datafusion_physical_optimizer_rule__(self) -> object: ... # noqa: D105 + + class SessionConfig: """Session configuration options.""" @@ -524,7 +534,7 @@ def __init__( self, config: SessionConfig | None = None, runtime: RuntimeEnvBuilder | None = None, - physical_optimizer_rules: list[Any] | None = None, + physical_optimizer_rules: list[PhysicalOptimizerRuleExportable] | None = None, ) -> None: """Main interface for executing queries with DataFusion. From 0a7e45d284d67bf0606b9da9e11feea1ee1e2848 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:03:07 -0400 Subject: [PATCH 04/10] docs: reference PhysicalOptimizerRuleExportable in SessionContext docstring Point the `physical_optimizer_rules` argument docs at the new `PhysicalOptimizerRuleExportable` Protocol instead of describing the duck type inline. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/datafusion/context.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index bd6a9ce74..9727f7ce8 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -546,8 +546,9 @@ def __init__( config: Session configuration options. runtime: Runtime configuration options. physical_optimizer_rules: User-defined physical optimizer rules to - append to the default set. Each item is an object that exposes a - ``__datafusion_physical_optimizer_rule__`` method returning a + append to the default set. Each item is a + :class:`PhysicalOptimizerRuleExportable` — an object exposing a + ``__datafusion_physical_optimizer_rule__`` method that returns a PyCapsule around a ``FFI_PhysicalOptimizerRule`` (typically built in a separate compiled extension). DataFusion provides no FFI bridge for logical optimizer or analyzer rules, and there is no From 108cd136b520dc4208fdfda38a83c47eed7bcb20 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:04:26 -0400 Subject: [PATCH 05/10] docs: move FFI capsule detail to PhysicalOptimizerRuleExportable The PyCapsule / FFI_PhysicalOptimizerRule mechanics describe the Protocol, not the SessionContext constructor. Move that detail onto PhysicalOptimizerRuleExportable and leave the constructor argument docs focused on behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/datafusion/context.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 9727f7ce8..a531c89dc 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -133,8 +133,10 @@ def __datafusion_table_provider__(self, session: Any) -> object: ... # noqa: D1 class PhysicalOptimizerRuleExportable(Protocol): """Type hint for object that has __datafusion_physical_optimizer_rule__ PyCapsule. - The capsule wraps an ``FFI_PhysicalOptimizerRule``, typically produced by a - separate compiled extension. + The method returns a PyCapsule wrapping an ``FFI_PhysicalOptimizerRule``, + typically produced by a separate compiled extension. DataFusion provides no + FFI bridge for logical optimizer or analyzer rules, so only physical + optimizer rules can be supplied this way. """ def __datafusion_physical_optimizer_rule__(self) -> object: ... # noqa: D105 @@ -546,14 +548,10 @@ def __init__( config: Session configuration options. runtime: Runtime configuration options. physical_optimizer_rules: User-defined physical optimizer rules to - append to the default set. Each item is a - :class:`PhysicalOptimizerRuleExportable` — an object exposing a - ``__datafusion_physical_optimizer_rule__`` method that returns a - PyCapsule around a ``FFI_PhysicalOptimizerRule`` (typically built - in a separate compiled extension). DataFusion provides no FFI - bridge for logical optimizer or analyzer rules, and there is no - upstream API to add physical rules to a live context, so these - can only be supplied at construction time. + append to the default set, each a + :class:`PhysicalOptimizerRuleExportable`. There is no upstream + API to add physical rules to a live context, so these can only + be supplied at construction time. Example usage: From 3909faf1485c0d2e3f01c485b35608d404fca315 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:12:14 -0400 Subject: [PATCH 06/10] docs: drop redundant comment in SessionContext constructor Remove the explanatory comment about FFI bridge availability; the same information already lives on PhysicalOptimizerRuleExportable. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/context.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 2d399476f..5d0f6045d 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -397,11 +397,6 @@ impl PySessionContext { .with_config(config) .with_runtime_env(runtime) .with_default_features(); - // DataFusion exposes no FFI bridge for the logical optimizer or - // analyzer, so only physical optimizer rules can be supplied from - // another library. They are appended after the default rules at - // construction time; there is no upstream API to add them to a live - // `SessionContext`. for rule in physical_optimizer_rules.unwrap_or_default() { let rule = crate::physical_optimizer::physical_optimizer_rule_from_pyobject(&rule)?; state_builder = state_builder.with_physical_optimizer_rule(rule); From 734a8131d92fb844fe14ae12d5f00a4387754b6e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:13:07 -0400 Subject: [PATCH 07/10] docs: drop module-level doc comment from physical_optimizer Sibling FFI-import modules (udf, udaf, catalog, table) carry no module-level docs, and the rst-style markup did not match Rust conventions. The function doc comment already states intent. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/physical_optimizer.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/crates/core/src/physical_optimizer.rs b/crates/core/src/physical_optimizer.rs index 8d2e687c8..98b88c41c 100644 --- a/crates/core/src/physical_optimizer.rs +++ b/crates/core/src/physical_optimizer.rs @@ -15,23 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! Imports physical optimizer rules supplied by another library over FFI. -//! -//! DataFusion has no FFI bridge for the logical [`OptimizerRule`] / -//! [`AnalyzerRule`] traits, but it does export -//! [`FFI_PhysicalOptimizerRule`] for the physical -//! [`PhysicalOptimizerRule`] trait. A producer crate (typically a separate -//! compiled extension) exposes an object with a -//! ``__datafusion_physical_optimizer_rule__`` method returning a -//! :class:`PyCapsule` that wraps an [`FFI_PhysicalOptimizerRule`]. This -//! module reads that capsule and converts it into an -//! ``Arc`` so it can be registered with a -//! [`SessionContext`](datafusion::prelude::SessionContext) at construction -//! time. -//! -//! [`OptimizerRule`]: datafusion::optimizer::optimizer::OptimizerRule -//! [`AnalyzerRule`]: datafusion::optimizer::analyzer::AnalyzerRule - use std::ptr::NonNull; use std::sync::Arc; From a35f6f13cc9f05b5bcfca7dc1bba643c2924c3b2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:18:33 -0400 Subject: [PATCH 08/10] refactor: import physical optimizer rule via from_pycapsule! macro Replace the hand-written crates/core/src/physical_optimizer.rs with a `from_pycapsule!` invocation in the util crate, matching `physical_codec_from_pycapsule` and the other FFI capsule importers. The macro already handles the hasattr/getattr/cast/validate/pointer_checked sequence and the infallible `Arc::from(&FFI)` conversion, so the dedicated module is no longer needed. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/context.rs | 5 ++- crates/core/src/lib.rs | 1 - crates/core/src/physical_optimizer.rs | 53 --------------------------- crates/util/src/lib.rs | 9 +++++ 4 files changed, 12 insertions(+), 56 deletions(-) delete mode 100644 crates/core/src/physical_optimizer.rs diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 5d0f6045d..3a3adb3e7 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -59,7 +59,8 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_python_util::{ create_logical_extension_capsule, create_physical_extension_capsule, ffi_logical_codec_from_pycapsule, get_global_ctx, get_tokio_runtime, - physical_codec_from_pycapsule, spawn_future, wait_for_future, + physical_codec_from_pycapsule, physical_optimizer_rule_from_pycapsule, spawn_future, + wait_for_future, }; use object_store::ObjectStore; use pyo3::IntoPyObjectExt; @@ -398,7 +399,7 @@ impl PySessionContext { .with_runtime_env(runtime) .with_default_features(); for rule in physical_optimizer_rules.unwrap_or_default() { - let rule = crate::physical_optimizer::physical_optimizer_rule_from_pyobject(&rule)?; + let rule = physical_optimizer_rule_from_pycapsule(&rule)?; state_builder = state_builder.with_physical_optimizer_rule(rule); } let session_state = state_builder.build(); diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 2ca4237e5..8b622d344 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -46,7 +46,6 @@ pub mod expr; mod functions; pub mod metrics; mod options; -pub mod physical_optimizer; pub mod physical_plan; mod pyarrow_filter_expression; pub mod pyarrow_util; diff --git a/crates/core/src/physical_optimizer.rs b/crates/core/src/physical_optimizer.rs deleted file mode 100644 index 98b88c41c..000000000 --- a/crates/core/src/physical_optimizer.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::ptr::NonNull; -use std::sync::Arc; - -use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule; -use pyo3::prelude::*; -use pyo3::types::PyCapsule; - -use crate::errors::{PyDataFusionError, PyDataFusionResult, to_datafusion_err}; - -/// Convert a Python object exposing ``__datafusion_physical_optimizer_rule__`` -/// into an ``Arc`` by reading its FFI capsule. -pub(crate) fn physical_optimizer_rule_from_pyobject( - obj: &Bound<'_, PyAny>, -) -> PyDataFusionResult> { - if !obj.hasattr("__datafusion_physical_optimizer_rule__")? { - return Err(PyDataFusionError::Common( - "Expected physical optimizer rule object to define \ - __datafusion_physical_optimizer_rule__()" - .to_string(), - )); - } - - let capsule = obj - .getattr("__datafusion_physical_optimizer_rule__")? - .call0()?; - let capsule = capsule.cast::().map_err(to_datafusion_err)?; - let data: NonNull = capsule - .pointer_checked(Some(c"datafusion_physical_optimizer_rule"))? - .cast(); - let ffi_rule = unsafe { data.as_ref() }; - - Ok(Arc::::from( - ffi_rule, - )) -} diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs index 72dc9aafc..28c8834e9 100644 --- a/crates/util/src/lib.rs +++ b/crates/util/src/lib.rs @@ -24,7 +24,9 @@ use datafusion::datasource::TableProvider; use datafusion::execution::TaskContext; use datafusion::execution::context::SessionContext; use datafusion::logical_expr::Volatility; +use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion_ffi::execution::FFI_TaskContextProvider; +use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use datafusion_ffi::proto::physical_extension_codec::FFI_PhysicalExtensionCodec; use datafusion_ffi::table_provider::FFI_TableProvider; @@ -332,6 +334,13 @@ from_pycapsule!( dyn PhysicalExtensionCodec ); +from_pycapsule!( + physical_optimizer_rule_from_pycapsule, + "datafusion_physical_optimizer_rule", + FFI_PhysicalOptimizerRule, + dyn PhysicalOptimizerRule + Send + Sync +); + try_from_pycapsule!( task_context_from_pycapsule, "datafusion_task_context_provider", From c3211dbf885f0fb06e8ea204bee285d952d0c84e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:22:37 -0400 Subject: [PATCH 09/10] docs: trim PhysicalOptimizerRuleExportable docstring Drop the sentence about logical-rule FFI availability; it is background, not type-hint information, and keeps the Protocol docstring in line with the other *Exportable hints. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/datafusion/context.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index a531c89dc..f8fb016d7 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -134,9 +134,7 @@ class PhysicalOptimizerRuleExportable(Protocol): """Type hint for object that has __datafusion_physical_optimizer_rule__ PyCapsule. The method returns a PyCapsule wrapping an ``FFI_PhysicalOptimizerRule``, - typically produced by a separate compiled extension. DataFusion provides no - FFI bridge for logical optimizer or analyzer rules, so only physical - optimizer rules can be supplied this way. + typically produced by a separate compiled extension. """ def __datafusion_physical_optimizer_rule__(self) -> object: ... # noqa: D105 From a9621ee5b4db927c77ec072f7acdc929feef80aa Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 27 May 2026 16:31:43 -0400 Subject: [PATCH 10/10] Minor refactor --- .../python/tests/_test_physical_optimizer_rule.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py index 6e566c0f4..1eee07dcb 100644 --- a/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py +++ b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py @@ -22,7 +22,9 @@ from datafusion_ffi_example import MyPhysicalOptimizerRule -def _setup_session_with_rule() -> tuple[SessionContext, MyPhysicalOptimizerRule]: +def test_ffi_physical_optimizer_rule_runs_during_planning(): + """A rule supplied via physical_optimizer_rules is invoked while the + physical plan is built, and the query still returns correct results.""" rule = MyPhysicalOptimizerRule() ctx = SessionContext(physical_optimizer_rules=[rule]) batch = pa.RecordBatch.from_arrays( @@ -30,13 +32,6 @@ def _setup_session_with_rule() -> tuple[SessionContext, MyPhysicalOptimizerRule] names=["a"], ) ctx.register_record_batches("t", [[batch]]) - return ctx, rule - - -def test_ffi_physical_optimizer_rule_runs_during_planning(): - """A rule supplied via physical_optimizer_rules is invoked while the - physical plan is built, and the query still returns correct results.""" - ctx, rule = _setup_session_with_rule() before = rule.optimize_calls() result = ctx.sql("SELECT a FROM t").collect()