-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy path_harness.py
More file actions
172 lines (137 loc) · 5.63 KB
/
_harness.py
File metadata and controls
172 lines (137 loc) · 5.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from __future__ import annotations
import sys
import json
import time
import asyncio
from typing import Any, TypeVar, Callable, Awaitable
from dataclasses import asdict
from .example_types import (
ExampleCheck,
RecipeOutput,
ExampleResult,
RecipeContext,
ExampleCleanupStatus,
ExampleCleanupFailure,
)
T = TypeVar("T")
def unique_name(prefix: str) -> str:
"""Generate a unique name with timestamp and random suffix."""
return f"{prefix}-{int(time.time())}-{hex(int(time.time() * 1000) % 0xFFFFFF)[2:]}"
class _CleanupTracker:
"""Tracks cleanup actions and executes them in LIFO order."""
def __init__(self, status: ExampleCleanupStatus) -> None:
self._status = status
self._actions: list[tuple[str, Callable[[], Any]]] = []
def add(self, resource: str, action: Callable[[], Any]) -> None:
"""Register a cleanup action for a resource."""
self._actions.append((resource, action))
async def run(self) -> None:
"""Execute all cleanup actions in reverse order."""
while self._actions:
resource, action = self._actions.pop()
self._status.attempted.append(resource)
try:
result = action()
if asyncio.iscoroutine(result):
await result
self._status.succeeded.append(resource)
except Exception as e:
self._status.failed.append(ExampleCleanupFailure(resource, str(e)))
if self._status.attempted:
if not self._status.failed:
print("Cleanup completed.") # noqa: T201
else:
print("Cleanup finished with errors.") # noqa: T201
def _should_fail_process(result: ExampleResult) -> bool:
"""Determine if the process should exit with failure."""
has_failed_checks = any(not check.passed for check in result.checks)
return result.skipped or has_failed_checks or len(result.cleanup_status.failed) > 0
def _run_recipe_impl(
recipe_call: Callable[[], RecipeOutput | Awaitable[RecipeOutput]],
cleanup: _CleanupTracker,
cleanup_status: ExampleCleanupStatus,
) -> ExampleResult:
"""Shared implementation for running recipes with cleanup."""
async def _run_async() -> RecipeOutput:
try:
result = recipe_call()
if asyncio.iscoroutine(result):
output: RecipeOutput = await result
return output
return result # type: ignore[return-value]
finally:
await cleanup.run()
loop = asyncio.new_event_loop()
try:
output = loop.run_until_complete(_run_async())
return ExampleResult(
resources_created=output.resources_created,
checks=output.checks,
cleanup_status=cleanup_status,
)
finally:
loop.close()
def wrap_recipe(
recipe: Callable[[RecipeContext], RecipeOutput] | Callable[[RecipeContext], Awaitable[RecipeOutput]],
validate_env: Callable[[], tuple[bool, list[ExampleCheck]]] | None = None,
) -> Callable[[], ExampleResult]:
"""Wrap a recipe function with cleanup tracking and result handling.
Args:
recipe: The recipe function to wrap. Can be sync or async.
validate_env: Optional function to validate environment before running.
Returns (skip, checks) tuple.
Returns:
A callable that runs the recipe and returns ExampleResult.
"""
def run() -> ExampleResult:
cleanup_status = ExampleCleanupStatus()
cleanup = _CleanupTracker(cleanup_status)
if validate_env is not None:
skip, checks = validate_env()
if skip:
return ExampleResult(
resources_created=[],
checks=checks,
cleanup_status=cleanup_status,
skipped=True,
)
ctx = RecipeContext(cleanup=cleanup)
return _run_recipe_impl(lambda: recipe(ctx), cleanup, cleanup_status)
return run
def wrap_recipe_with_options(
recipe: Callable[[RecipeContext, T], RecipeOutput] | Callable[[RecipeContext, T], Awaitable[RecipeOutput]],
validate_env: Callable[[T], tuple[bool, list[ExampleCheck]]] | None = None,
) -> Callable[[T], ExampleResult]:
"""Wrap a recipe function that takes options with cleanup tracking.
Args:
recipe: The recipe function to wrap. Can be sync or async. Takes options parameter.
validate_env: Optional function to validate environment before running.
Takes options and returns (skip, checks) tuple.
Returns:
A callable that runs the recipe with options and returns ExampleResult.
"""
def run(options: T) -> ExampleResult:
cleanup_status = ExampleCleanupStatus()
cleanup = _CleanupTracker(cleanup_status)
if validate_env is not None:
skip, checks = validate_env(options)
if skip:
return ExampleResult(
resources_created=[],
checks=checks,
cleanup_status=cleanup_status,
skipped=True,
)
ctx = RecipeContext(cleanup=cleanup)
return _run_recipe_impl(lambda: recipe(ctx, options), cleanup, cleanup_status)
return run
def run_as_cli(run: Callable[[], ExampleResult]) -> None:
"""Run an example and exit with appropriate status code."""
try:
result = run()
print(json.dumps(asdict(result), indent=2)) # noqa: T201
if _should_fail_process(result):
sys.exit(1)
except Exception as e:
print(f"Error: {e}") # noqa: T201
sys.exit(1)