Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: Release

on:
push:
tags:
- '*'

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.14'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Run tests
run: pytest tests/ -v

publish:
runs-on: ubuntu-latest
needs: test

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.14'

- name: Build wheel
run: |
python -m pip install --upgrade pip
python -m pip install --upgrade build
python -m build --wheel

- name: Publish package to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
6 changes: 5 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name: Tests

on: [push]
on:
push:
branches:
- '**'
pull_request:

jobs:
test:
Expand Down
1 change: 1 addition & 0 deletions src/mock/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .local_workflow_runner import LocalWorkflowRunner
131 changes: 131 additions & 0 deletions src/mock/local_workflow_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import json
import os
from graphlib import CycleError, TopologicalSorter


class LocalWorkflowRunner:
def __init__(self, source_dir, workflow_json_path):
self.source_dir = source_dir
self.workflow_json_path = workflow_json_path

workflow = self._load_workflow()
tasks = workflow.get("tasks")
if not isinstance(tasks, list):
raise ValueError("Workflow JSON must contain a 'tasks' list")

self._task_to_notebook = {}
self._task_dependencies = {}
self._notebook_insertion_order = []
self._parse_tasks(tasks)

self.dag = self._build_dag()
self.execution_order = self._build_execution_order()

def _load_workflow(self):
with open(self.workflow_json_path, "r", encoding="utf-8") as workflow_file:
return json.load(workflow_file)

def _parse_tasks(self, tasks):
for task in tasks:
if not isinstance(task, dict):
raise ValueError("Each task must be a JSON object")

task_key = task.get("task_key")
if not task_key:
raise ValueError("Each task must include a non-empty 'task_key'")
if task_key in self._task_to_notebook:
raise ValueError(f"Duplicate task_key found: {task_key}")

notebook_task = task.get("notebook_task")
if not isinstance(notebook_task, dict):
raise ValueError(f"Task '{task_key}' is missing 'notebook_task'")

notebook_path = notebook_task.get("notebook_path")
notebook_name = self._extract_notebook_name(notebook_path, task_key)
if notebook_name in self._notebook_insertion_order:
raise ValueError(f"Duplicate notebook name found: {notebook_name}")

depends_on = task.get("depends_on", [])
dependency_keys = []
if not isinstance(depends_on, list):
raise ValueError(f"Task '{task_key}' has invalid 'depends_on' format")
for dependency in depends_on:
if not isinstance(dependency, dict) or not dependency.get("task_key"):
raise ValueError(
f"Task '{task_key}' has malformed dependency entry"
)
dependency_keys.append(dependency["task_key"])

self._task_to_notebook[task_key] = notebook_name
self._task_dependencies[task_key] = dependency_keys
self._notebook_insertion_order.append(notebook_name)

def _extract_notebook_name(self, notebook_path, task_key):
if not isinstance(notebook_path, str) or not notebook_path.strip():
raise ValueError(f"Task '{task_key}' has an invalid notebook_path")

notebook_name = notebook_path.rstrip("/").split("/")[-1]
if not notebook_name:
raise ValueError(f"Task '{task_key}' has an invalid notebook_path")
return notebook_name

def _build_dag(self):
dag = {notebook_name: set() for notebook_name in self._notebook_insertion_order}

for task_key, dependency_keys in self._task_dependencies.items():
current_notebook = self._task_to_notebook[task_key]
for dependency_key in dependency_keys:
if dependency_key not in self._task_to_notebook:
raise ValueError(
f"Task '{task_key}' depends on unknown task '{dependency_key}'"
)

dependency_notebook = self._task_to_notebook[dependency_key]
dag[dependency_notebook].add(current_notebook)

return dag

def _build_execution_order(self):
predecessors = {
notebook_name: set() for notebook_name in self._notebook_insertion_order
}

for source_notebook, target_notebooks in self.dag.items():
for target_notebook in target_notebooks:
predecessors[target_notebook].add(source_notebook)

try:
return list(TopologicalSorter(predecessors).static_order())
except CycleError as exc:
raise ValueError("Workflow graph contains a cycle") from exc

def format_dag(self):
lines = []
for notebook_name in self._notebook_insertion_order:
outgoing = sorted(self.dag.get(notebook_name, set()))
if outgoing:
lines.append(f"- {notebook_name} -> [{', '.join(outgoing)}]")
else:
lines.append(f"- {notebook_name} -> []")

lines.append("Execution order:")
lines.append(" -> ".join(self.execution_order))
return "\n".join(lines)


def _execfile(self, file_path, global_namespace, local_namespace):
with open(file_path, "r", encoding="utf-8") as file:
code = compile(file.read(), file_path, "exec")
exec(code, global_namespace, local_namespace)

def run_workflow(self):
execution_globals = {"__name__": "__main__"}
print(f"\nExecuting workflow: {self.workflow_json_path}\n")
print("==========================================")
print(self.format_dag())
for notebook_name in self.execution_order:
notebook_path = os.path.join(self.source_dir, f"{notebook_name}.py")
if not os.path.exists(notebook_path):
raise FileNotFoundError(f"Notebook file not found: {notebook_path}")
execution_globals["__file__"] = notebook_path
self._execfile(notebook_path, execution_globals, execution_globals)
2 changes: 0 additions & 2 deletions src/mock/workflow_mock.py

This file was deleted.

59 changes: 59 additions & 0 deletions tests/test_local_workflow_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
import os

from src.mock.local_workflow_runner import LocalWorkflowRunner


ROOT_DIR = os.path.dirname(os.path.dirname(__file__))
WORKFLOW_SAMPLE_PATH = os.path.join(ROOT_DIR, "specs", "workflow_sample.json")


def test_builds_dag_using_notebook_names():
runner = LocalWorkflowRunner("src", WORKFLOW_SAMPLE_PATH)

assert runner.dag["auxillary_dims"] == {"data_quality"}
assert runner.dag["reviews_fact"] == {"data_quality"}
assert runner.dag["data_quality"] == {"semantic_layer"}
assert runner.dag["semantic_layer"] == set()


def test_execution_order_respects_dependencies():
runner = LocalWorkflowRunner("src", WORKFLOW_SAMPLE_PATH)
order = runner.execution_order

assert order.index("data_quality") > order.index("auxillary_dims")
assert order.index("data_quality") > order.index("reviews_fact")
assert order.index("semantic_layer") > order.index("data_quality")


def test_run_workflow_executes_in_dependency_order(tmp_path):
source_dir = tmp_path / "local_src"
source_dir.mkdir()
log_file = tmp_path / "execution.log"

workflow = {
"tasks": [
{
"task_key": "first_task",
"notebook_task": {"notebook_path": "/Workspace/any/first"},
},
{
"task_key": "second_task",
"depends_on": [{"task_key": "first_task"}],
"notebook_task": {"notebook_path": "/Workspace/any/second"},
},
{
"task_key": "third_task",
"depends_on": [{"task_key": "first_task"}],
"notebook_task": {"notebook_path": "/Workspace/any/third"},
},
]
}
workflow_path = tmp_path / "workflow.json"
workflow_path.write_text(json.dumps(workflow), encoding="utf-8")
for notebook_name in ["first", "second", "third"]:
script_path = source_dir / f"{notebook_name}.py"
script_path.write_text(f'print("Hello from notebook {notebook_name}")')

runner = LocalWorkflowRunner(str(source_dir), str(workflow_path))
runner.run_workflow()
Loading