diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..f6d1246 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,50 @@ +name: Release + +on: + push: + tags: + - '*' + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.14' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: pytest tests/ -v + + publish: + runs-on: ubuntu-latest + needs: test + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.14' + + - name: Build wheel + run: | + python -m pip install --upgrade pip + python -m pip install --upgrade build + python -m build --wheel + + - name: Publish package to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7e80239..2850f80 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,6 +1,10 @@ name: Tests -on: [push] +on: + push: + branches: + - '**' + pull_request: jobs: test: diff --git a/src/mock/__init__.py b/src/mock/__init__.py index e69de29..41350bb 100644 --- a/src/mock/__init__.py +++ b/src/mock/__init__.py @@ -0,0 +1 @@ +from .local_workflow_runner import LocalWorkflowRunner diff --git a/src/mock/local_workflow_runner.py b/src/mock/local_workflow_runner.py new file mode 100644 index 0000000..4c5f545 --- /dev/null +++ b/src/mock/local_workflow_runner.py @@ -0,0 +1,131 @@ +import json +import os +from graphlib import CycleError, TopologicalSorter + + +class LocalWorkflowRunner: + def __init__(self, source_dir, workflow_json_path): + self.source_dir = source_dir + self.workflow_json_path = workflow_json_path + + workflow = self._load_workflow() + tasks = workflow.get("tasks") + if not isinstance(tasks, list): + raise ValueError("Workflow JSON must contain a 'tasks' list") + + self._task_to_notebook = {} + self._task_dependencies = {} + self._notebook_insertion_order = [] + self._parse_tasks(tasks) + + self.dag = self._build_dag() + self.execution_order = self._build_execution_order() + + def _load_workflow(self): + with open(self.workflow_json_path, "r", encoding="utf-8") as workflow_file: + return json.load(workflow_file) + + def _parse_tasks(self, tasks): + for task in tasks: + if not isinstance(task, dict): + raise ValueError("Each task must be a JSON object") + + task_key = task.get("task_key") + if not task_key: + raise ValueError("Each task must include a non-empty 'task_key'") + if task_key in self._task_to_notebook: + raise ValueError(f"Duplicate task_key found: {task_key}") + + notebook_task = task.get("notebook_task") + if not isinstance(notebook_task, dict): + raise ValueError(f"Task '{task_key}' is missing 'notebook_task'") + + notebook_path = notebook_task.get("notebook_path") + notebook_name = self._extract_notebook_name(notebook_path, task_key) + if notebook_name in self._notebook_insertion_order: + raise ValueError(f"Duplicate notebook name found: {notebook_name}") + + depends_on = task.get("depends_on", []) + dependency_keys = [] + if not isinstance(depends_on, list): + raise ValueError(f"Task '{task_key}' has invalid 'depends_on' format") + for dependency in depends_on: + if not isinstance(dependency, dict) or not dependency.get("task_key"): + raise ValueError( + f"Task '{task_key}' has malformed dependency entry" + ) + dependency_keys.append(dependency["task_key"]) + + self._task_to_notebook[task_key] = notebook_name + self._task_dependencies[task_key] = dependency_keys + self._notebook_insertion_order.append(notebook_name) + + def _extract_notebook_name(self, notebook_path, task_key): + if not isinstance(notebook_path, str) or not notebook_path.strip(): + raise ValueError(f"Task '{task_key}' has an invalid notebook_path") + + notebook_name = notebook_path.rstrip("/").split("/")[-1] + if not notebook_name: + raise ValueError(f"Task '{task_key}' has an invalid notebook_path") + return notebook_name + + def _build_dag(self): + dag = {notebook_name: set() for notebook_name in self._notebook_insertion_order} + + for task_key, dependency_keys in self._task_dependencies.items(): + current_notebook = self._task_to_notebook[task_key] + for dependency_key in dependency_keys: + if dependency_key not in self._task_to_notebook: + raise ValueError( + f"Task '{task_key}' depends on unknown task '{dependency_key}'" + ) + + dependency_notebook = self._task_to_notebook[dependency_key] + dag[dependency_notebook].add(current_notebook) + + return dag + + def _build_execution_order(self): + predecessors = { + notebook_name: set() for notebook_name in self._notebook_insertion_order + } + + for source_notebook, target_notebooks in self.dag.items(): + for target_notebook in target_notebooks: + predecessors[target_notebook].add(source_notebook) + + try: + return list(TopologicalSorter(predecessors).static_order()) + except CycleError as exc: + raise ValueError("Workflow graph contains a cycle") from exc + + def format_dag(self): + lines = [] + for notebook_name in self._notebook_insertion_order: + outgoing = sorted(self.dag.get(notebook_name, set())) + if outgoing: + lines.append(f"- {notebook_name} -> [{', '.join(outgoing)}]") + else: + lines.append(f"- {notebook_name} -> []") + + lines.append("Execution order:") + lines.append(" -> ".join(self.execution_order)) + return "\n".join(lines) + + + def _execfile(self, file_path, global_namespace, local_namespace): + with open(file_path, "r", encoding="utf-8") as file: + code = compile(file.read(), file_path, "exec") + exec(code, global_namespace, local_namespace) + + def run_workflow(self): + execution_globals = {"__name__": "__main__"} + print(f"\nExecuting workflow: {self.workflow_json_path}\n") + print("==========================================") + print(self.format_dag()) + for notebook_name in self.execution_order: + notebook_path = os.path.join(self.source_dir, f"{notebook_name}.py") + if not os.path.exists(notebook_path): + raise FileNotFoundError(f"Notebook file not found: {notebook_path}") + execution_globals["__file__"] = notebook_path + self._execfile(notebook_path, execution_globals, execution_globals) diff --git a/src/mock/workflow_mock.py b/src/mock/workflow_mock.py deleted file mode 100644 index dadf372..0000000 --- a/src/mock/workflow_mock.py +++ /dev/null @@ -1,2 +0,0 @@ -class WorkflowMock: - pass \ No newline at end of file diff --git a/tests/test_local_workflow_runner.py b/tests/test_local_workflow_runner.py new file mode 100644 index 0000000..12f54d3 --- /dev/null +++ b/tests/test_local_workflow_runner.py @@ -0,0 +1,59 @@ +import json +import os + +from src.mock.local_workflow_runner import LocalWorkflowRunner + + +ROOT_DIR = os.path.dirname(os.path.dirname(__file__)) +WORKFLOW_SAMPLE_PATH = os.path.join(ROOT_DIR, "specs", "workflow_sample.json") + + +def test_builds_dag_using_notebook_names(): + runner = LocalWorkflowRunner("src", WORKFLOW_SAMPLE_PATH) + + assert runner.dag["auxillary_dims"] == {"data_quality"} + assert runner.dag["reviews_fact"] == {"data_quality"} + assert runner.dag["data_quality"] == {"semantic_layer"} + assert runner.dag["semantic_layer"] == set() + + +def test_execution_order_respects_dependencies(): + runner = LocalWorkflowRunner("src", WORKFLOW_SAMPLE_PATH) + order = runner.execution_order + + assert order.index("data_quality") > order.index("auxillary_dims") + assert order.index("data_quality") > order.index("reviews_fact") + assert order.index("semantic_layer") > order.index("data_quality") + + +def test_run_workflow_executes_in_dependency_order(tmp_path): + source_dir = tmp_path / "local_src" + source_dir.mkdir() + log_file = tmp_path / "execution.log" + + workflow = { + "tasks": [ + { + "task_key": "first_task", + "notebook_task": {"notebook_path": "/Workspace/any/first"}, + }, + { + "task_key": "second_task", + "depends_on": [{"task_key": "first_task"}], + "notebook_task": {"notebook_path": "/Workspace/any/second"}, + }, + { + "task_key": "third_task", + "depends_on": [{"task_key": "first_task"}], + "notebook_task": {"notebook_path": "/Workspace/any/third"}, + }, + ] + } + workflow_path = tmp_path / "workflow.json" + workflow_path.write_text(json.dumps(workflow), encoding="utf-8") + for notebook_name in ["first", "second", "third"]: + script_path = source_dir / f"{notebook_name}.py" + script_path.write_text(f'print("Hello from notebook {notebook_name}")') + + runner = LocalWorkflowRunner(str(source_dir), str(workflow_path)) + runner.run_workflow() \ No newline at end of file