Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/flyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from ._resources import AMD_GPU, GPU, HABANA_GAUDI, TPU, Device, DeviceClass, Neuron, Resources
from ._retry import Backoff, RetryStrategy
from ._reusable_environment import ReusePolicy
from ._run import run, with_runcontext
from ._run import rerun, run, with_runcontext
from ._run_python_script import run_python_script
from ._secret import Secret, SecretRequest
from ._serve import AppHandle, serve, with_servecontext
Expand Down Expand Up @@ -106,6 +106,7 @@ def version() -> str:
"logger",
"map",
"new_condition",
"rerun",
"run",
"run_python_script",
"serve",
Expand Down
754 changes: 508 additions & 246 deletions src/flyte/_run.py

Large diffs are not rendered by default.

119 changes: 119 additions & 0 deletions src/flyte/cli/_rerun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""``flyte rerun <run>`` — re-run an existing run with its own code + exact inputs.

Counterpart to ``flyte run``: where ``run`` launches *local* code (and can recover from a prior
run via ``--recover-from``), ``rerun`` re-launches an *existing* run — fetching its task + inputs
from the platform, no local code needed. ``--recover`` reuses that run's succeeded actions. To
re-run with *new* local code (reusing the prior run's inputs), use ``flyte run <file> <task>
--rerun-from <run>``.

v1 reuses the prior run's exact inputs; changing inputs from the CLI is a follow-up
(`flyte.rerun(run, x=2)` covers it programmatically today).
"""

from __future__ import annotations

import asyncio
from typing import Dict, Optional, Tuple

import rich_click as click

from . import _common as common


def _parse_kv(items: Tuple[str, ...], flag: str) -> Optional[Dict[str, str]]:
"""Parse repeated ``KEY=VALUE`` flag values into a dict (None if none given)."""
if not items:
return None
parsed: Dict[str, str] = {}
for item in items:
if "=" not in item:
raise click.BadParameter(f"Invalid {flag} value {item!r}: expected KEY=VALUE.")
key, value = item.split("=", 1)
if not key:
raise click.BadParameter(f"Invalid {flag} value {item!r}: key must not be empty.")
parsed[key] = value
return parsed


@click.command("rerun", cls=click.RichCommand)
@click.argument("run_name", required=True)
@click.option("-p", "--project", default=None, help="Project for the new run (defaults to config).")
@click.option("-d", "--domain", default=None, help="Domain for the new run (defaults to config).")
@click.option("--name", default=None, help="Name for the new run (a random name is generated if unset).")
@click.option("-e", "--env", "env", multiple=True, help="Env var KEY=VALUE for the new run. Repeatable.")
@click.option("--label", "label", multiple=True, help="Label KEY=VALUE for the new run. Repeatable.")
@click.option("--follow", "-f", is_flag=True, default=False, help="Stream the parent action logs after launch.")
@click.option(
"--recover",
is_flag=True,
default=False,
help="Recover from this run: reuse its succeeded actions, re-run only what failed or changed.",
)
@click.pass_context
def rerun(
ctx: click.Context,
run_name: str,
project: Optional[str],
domain: Optional[str],
name: Optional[str],
env: Tuple[str, ...],
label: Tuple[str, ...],
follow: bool,
recover: bool,
) -> None:
"""Re-run an existing run RUN_NAME with its original code and inputs.

Fetches the prior run's task + inputs from the platform (no local code needed) and launches a
new run that returns the same way ``flyte run`` does. ``--recover`` reuses the prior run's
succeeded actions (re-running only what failed or changed). To re-run with *new* local code
(reusing the prior run's inputs), use ``flyte run <file> <task> --rerun-from <run>``.

Examples:

$ flyte rerun ul56wcvgqrb9vzhzz5l2
$ flyte rerun ul56wcvgqrb9vzhzz5l2 --name retry-1 --follow
$ flyte rerun ul56wcvgqrb9vzhzz5l2 --recover
"""
config = common.initialize_config(ctx, project=project, domain=domain)
asyncio.run(_execute(run_name, name, env, label, follow, recover, config))


async def _execute(
run_name: str,
name: Optional[str],
env: Tuple[str, ...],
label: Tuple[str, ...],
follow: bool,
recover: bool,
config: common.CLIConfig,
) -> None:
import flyte
from flyte._status import status

console = common.get_console()
try:
status.step(f"Re-running {run_name}...")
runner = flyte.with_runcontext(
mode="remote",
name=name,
env_vars=_parse_kv(env, "--env"),
labels=_parse_kv(label, "--label"),
recover=recover,
)
result = await runner.rerun.aio(run_name)
except Exception as e:
console.print(f"[red]✕ Re-run failed:[/red] {e}")
return

if config.output_format in ("json", "table-simple"):
run_info = f"Created Run: {result.name}\nURL: {result.url}"
else:
run_info = (
f"[green bold]Created Run: {result.name}[/green bold]\n"
f"➡️ [blue bold][link={result.url}]{result.url}[/link][/blue bold]"
)
console.print(common.get_panel("Rerun", run_info, config.output_format))

if follow:
status.step("Waiting for log stream...")
await result.show_logs.aio(max_lines=30, show_ts=True, raw=False)
39 changes: 38 additions & 1 deletion src/flyte/cli/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,30 @@ class RunArguments:
)
},
)
recover_from: str | None = field(
default=None,
metadata={
"click.option": click.Option(
["--recover-from"],
type=str,
default=None,
help="Recover a fresh run from a prior run: reuse its succeeded actions and re-run "
"only what failed or changed. Remote-only.",
)
},
)
rerun_from: str | None = field(
default=None,
metadata={
"click.option": click.Option(
["--rerun-from"],
type=str,
default=None,
help="Re-run an existing run with THIS local code, reusing that run's inputs "
"(no per-task input flags are needed). Remote-only.",
)
},
)

@classmethod
def from_dict(cls, d: Dict[str, Any]) -> RunArguments:
Expand Down Expand Up @@ -401,8 +425,13 @@ async def _execute_and_render(self, ctx: click.Context, config: common.CLIConfig
env_vars=self.run_args.parsed_env_vars(),
max_action_concurrency=self.run_args.max_action_concurrency,
labels=self.run_args.parsed_labels(),
recover=self.run_args.recover_from,
)
result = await execution_context.run.aio(self.obj, **ctx.params)
if self.run_args.rerun_from:
# Re-run a prior run with THIS local code, reusing the prior run's inputs.
result = await execution_context.rerun.aio(self.run_args.rerun_from, task_template=self.obj)
else:
result = await execution_context.run.aio(self.obj, **ctx.params)
except Exception as e:
if isinstance(e, RuntimeSystemError):
capture_exception(e)
Expand Down Expand Up @@ -480,6 +509,8 @@ def invoke(self, ctx: click.Context):
tuple(self.run_args.image) or None,
not self.run_args.no_sync_local_sys_paths,
)
if self.run_args.rerun_from and self.run_args.local:
raise click.UsageError("--rerun-from requires remote mode (it cannot be combined with --local)")
self._validate_required_params(ctx)
if self.run_args.tui:
if not self.run_args.local:
Expand All @@ -491,6 +522,11 @@ def invoke(self, ctx: click.Context):

def get_params(self, ctx: click.Context) -> List[click.Parameter]:
# Note this function may be called multiple times by click.
# With --rerun-from, inputs come from the prior run, so don't expose (or require) per-task
# input options. (Overriding specific inputs alongside --rerun-from is a follow-up.)
if self.run_args.rerun_from:
return super().get_params(ctx)

task = self.obj
from .._internal.runtime.types_serde import transform_native_to_typed_interface

Expand Down Expand Up @@ -626,6 +662,7 @@ async def _execute_and_render(self, ctx: click.Context, config: common.CLIConfig
env_vars=self.run_args.parsed_env_vars(),
max_action_concurrency=self.run_args.max_action_concurrency,
labels=self.run_args.parsed_labels(),
recover=self.run_args.recover_from,
)
result = await execution_context.run.aio(task, **ctx.params)
except Exception as e:
Expand Down
4 changes: 3 additions & 1 deletion src/flyte/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ._get import get
from ._plugins import discover_and_register_plugins
from ._prefetch import prefetch
from ._rerun import rerun
from ._run import run
from ._serve import serve
from ._signal import signal
Expand All @@ -31,7 +32,7 @@
"flyte": [
{
"name": "Run and stop tasks",
"commands": ["run", "abort", "signal"],
"commands": ["run", "rerun", "abort", "signal"],
},
{
"name": "Serve Apps",
Expand Down Expand Up @@ -283,6 +284,7 @@ def main(


main.add_command(run)
main.add_command(rerun)
main.add_command(deploy)
main.add_command(get) # type: ignore
main.add_command(create) # type: ignore
Expand Down
63 changes: 63 additions & 0 deletions tests/cli/test_rerun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Tests for the `flyte rerun <run>` CLI command."""

from unittest import mock

from click.testing import CliRunner
from mock.mock import AsyncMock

from flyte.cli._rerun import _parse_kv, rerun
from flyte.cli.main import main


def test_rerun_registered_on_main():
assert "rerun" in main.commands


def test_rerun_has_recover_flag():
opts = {o for p in rerun.params for o in p.opts}
assert "--recover" in opts
# Takes the run name as a positional argument.
assert any(p.name == "run_name" for p in rerun.params)


def test_parse_kv():
assert _parse_kv((), "--env") is None
assert _parse_kv(("A=1", "B=2"), "--env") == {"A": "1", "B": "2"}


def test_rerun_delegates_to_runner_rerun():
"""`flyte rerun <run> --name n -e K=V` builds the run context and calls runner.rerun(run)."""
runner_obj = mock.MagicMock()
runner_obj.rerun = mock.MagicMock(return_value=mock.MagicMock())
runner_obj.rerun.aio = AsyncMock(return_value=mock.MagicMock(name="new", url="http://x"))

with (
mock.patch("flyte.cli._common.initialize_config") as init_cfg,
mock.patch("flyte.with_runcontext", return_value=runner_obj) as wrc,
):
init_cfg.return_value = mock.MagicMock(output_format="table")
result = CliRunner().invoke(rerun, ["my-run", "--name", "n", "-e", "K=V"])

assert result.exit_code == 0, result.output
# recover flag default False, env parsed, name forwarded.
_, kwargs = wrc.call_args
assert kwargs["recover"] is False
assert kwargs["name"] == "n"
assert kwargs["env_vars"] == {"K": "V"}
assert kwargs["mode"] == "remote"
runner_obj.rerun.aio.assert_awaited_once_with("my-run")


def test_rerun_recover_flag_passed_through():
runner_obj = mock.MagicMock()
runner_obj.rerun.aio = AsyncMock(return_value=mock.MagicMock(name="new", url="http://x"))

with (
mock.patch("flyte.cli._common.initialize_config") as init_cfg,
mock.patch("flyte.with_runcontext", return_value=runner_obj) as wrc,
):
init_cfg.return_value = mock.MagicMock(output_format="table")
result = CliRunner().invoke(rerun, ["my-run", "--recover"])

assert result.exit_code == 0, result.output
assert wrc.call_args.kwargs["recover"] is True
63 changes: 63 additions & 0 deletions tests/cli/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ def test_run_arguments_max_action_concurrency_from_dict():
assert RunArguments.from_dict({}).max_action_concurrency is None


def test_run_command_has_recover_from_option():
option_names = {decl for p in run.params for decl in p.opts}
assert "--recover-from" in option_names


def test_run_arguments_recover_from_from_dict():
from flyte.cli._run import RunArguments

assert RunArguments.from_dict({"recover_from": "r1"}).recover_from == "r1"
assert RunArguments.from_dict({}).recover_from is None


def test_run_max_action_concurrency_rejects_negative(runner):
result = runner.invoke(run, ["--max-action-concurrency", "-1", str(HELLO_WORLD_PY), "say_hello"])
assert result.exit_code != 0
Expand Down Expand Up @@ -85,6 +97,57 @@ def test_run_hello_world(runner):
raise ve


def test_run_command_has_rerun_from_option():
"""--rerun-from is a visible option on `flyte run` (not hidden — rerun works today)."""
opt_names = {decl for p in run.params for decl in p.opts}
assert "--rerun-from" in opt_names
rerun_opt = next(p for p in run.params if "--rerun-from" in p.opts)
assert rerun_opt.hidden is False


def test_run_rerun_from_routes_to_rerun(runner):
"""`flyte run <file> <task> --rerun-from r` routes to runner.rerun(r, task_template=task).

The required `name` input is NOT demanded — inputs come from the prior run.
"""
from unittest import mock

from mock.mock import AsyncMock

runner_obj = mock.MagicMock()
runner_obj.rerun.aio = AsyncMock(return_value=mock.MagicMock())
runner_obj.run.aio = AsyncMock()

with mock.patch("flyte.with_runcontext", return_value=runner_obj):
cmd = ["--rerun-from", "r1", "--project", "p", "--domain", "d", str(HELLO_WORLD_PY), "say_hello"]
try:
result = runner.invoke(run, cmd)
except ValueError as ve:
if "I/O operation on closed file" in str(ve):
return
raise

assert result.exit_code == 0, result.output
runner_obj.rerun.aio.assert_awaited_once()
args, kwargs = runner_obj.rerun.aio.call_args
assert args[0] == "r1"
assert "task_template" in kwargs # this local say_hello task is passed as the substitute code
runner_obj.run.aio.assert_not_awaited()


def test_run_rerun_from_rejects_local(runner):
"""--rerun-from cannot be combined with --local (rerun is remote-only)."""
cmd = ["--local", "--rerun-from", "r1", str(HELLO_WORLD_PY), "say_hello"]
try:
result = runner.invoke(run, cmd)
except ValueError as ve:
if "I/O operation on closed file" in str(ve):
return
raise
assert result.exit_code != 0
assert "requires remote" in result.output.lower()


@pytest.mark.integration
def test_run_complex_inputs(runner):
result = runner.invoke(
Expand Down
Loading
Loading