From ec3ba6f3e7ccec47ed2c58df04e360b30e517fd6 Mon Sep 17 00:00:00 2001 From: Caspian Zhao Date: Fri, 12 Jun 2026 20:12:49 -0700 Subject: [PATCH 1/4] feat: add scheduled automations --- .announcements/automations.json | 7 + .changeset/calm-clocks-tick.md | 9 + src-tauri/src/agents.rs | 64 ++- src-tauri/src/agents/persistence.rs | 41 +- src-tauri/src/agents/streaming/cleanup.rs | 1 + src-tauri/src/agents/streaming/mod.rs | 13 +- src-tauri/src/agents/streaming/stream_hub.rs | 1 + src-tauri/src/automations/dispatch.rs | 132 ++++++ src-tauri/src/automations/mod.rs | 20 + src-tauri/src/automations/ops.rs | 279 +++++++++++++ src-tauri/src/automations/schedule.rs | 393 ++++++++++++++++++ src-tauri/src/automations/scheduler.rs | 185 +++++++++ src-tauri/src/cli/args.rs | 58 +++ src-tauri/src/cli/automation.rs | 292 +++++++++++++ src-tauri/src/cli/mod.rs | 2 + src-tauri/src/commands/automation_commands.rs | 113 +++++ src-tauri/src/commands/mod.rs | 1 + src-tauri/src/lib.rs | 12 + src-tauri/src/models/automations.rs | 365 ++++++++++++++++ src-tauri/src/models/mod.rs | 1 + src-tauri/src/models/sessions.rs | 22 + src-tauri/src/pipeline/adapter/codex_items.rs | 11 + src-tauri/src/pipeline/adapter/grouping.rs | 1 + src-tauri/src/pipeline/adapter/labels.rs | 3 + src-tauri/src/pipeline/adapter/mod.rs | 10 + src-tauri/src/pipeline/collapse.rs | 1 + src-tauri/src/pipeline/types.rs | 5 + src-tauri/src/schema.rs | 22 + src-tauri/src/ui_sync/events.rs | 5 + src-tauri/tests/common/builders.rs | 11 + src-tauri/tests/common/normalize.rs | 5 + src-tauri/tests/pipeline_scenarios.rs | 12 + ...s__user_prompt_with_automation_source.snap | 13 + .../automations/automation-detail.tsx | 256 ++++++++++++ .../automations/create-automation-dialog.tsx | 216 ++++++++++ src/features/automations/index.test.tsx | 93 +++++ src/features/automations/index.tsx | 295 +++++++++++++ src/features/automations/interval-picker.tsx | 249 +++++++++++ src/features/automations/schedule.test.ts | 102 +++++ src/features/automations/schedule.ts | 99 +++++ .../automations/use-automation-mutations.ts | 96 +++++ .../automations/use-automation-targets.ts | 53 +++ .../panel/message-components/user-message.tsx | 7 + src/lib/api.ts | 86 ++++ src/lib/query-client.ts | 1 + src/router/index.tsx | 7 + src/router/location-mapping.test.ts | 36 +- src/router/location-mapping.ts | 25 +- src/router/navigate-selection.ts | 11 +- src/shell/components/app-shell.tsx | 2 + src/shell/components/shell-sidebar-pane.tsx | 30 +- .../components/workspace-pane-surface.tsx | 28 +- .../controllers/use-selection-controller.ts | 2 +- src/shell/hooks/use-ui-sync-bridge.ts | 5 + 54 files changed, 3773 insertions(+), 36 deletions(-) create mode 100644 .announcements/automations.json create mode 100644 .changeset/calm-clocks-tick.md create mode 100644 src-tauri/src/automations/dispatch.rs create mode 100644 src-tauri/src/automations/mod.rs create mode 100644 src-tauri/src/automations/ops.rs create mode 100644 src-tauri/src/automations/schedule.rs create mode 100644 src-tauri/src/automations/scheduler.rs create mode 100644 src-tauri/src/cli/automation.rs create mode 100644 src-tauri/src/commands/automation_commands.rs create mode 100644 src-tauri/src/models/automations.rs create mode 100644 src-tauri/tests/snapshots/pipeline_scenarios__user_prompt_with_automation_source.snap create mode 100644 src/features/automations/automation-detail.tsx create mode 100644 src/features/automations/create-automation-dialog.tsx create mode 100644 src/features/automations/index.test.tsx create mode 100644 src/features/automations/index.tsx create mode 100644 src/features/automations/interval-picker.tsx create mode 100644 src/features/automations/schedule.test.ts create mode 100644 src/features/automations/schedule.ts create mode 100644 src/features/automations/use-automation-mutations.ts create mode 100644 src/features/automations/use-automation-targets.ts diff --git a/.announcements/automations.json b/.announcements/automations.json new file mode 100644 index 000000000..8ea1801c2 --- /dev/null +++ b/.announcements/automations.json @@ -0,0 +1,7 @@ +{ + "items": [ + { + "text": "New: Automations — schedule a prompt to run in a chat on an interval (hourly, daily, weekly, or custom). Find it via the clock icon at the bottom of the sidebar, or ask an agent to set one up for you." + } + ] +} diff --git a/.changeset/calm-clocks-tick.md b/.changeset/calm-clocks-tick.md new file mode 100644 index 000000000..1070a23da --- /dev/null +++ b/.changeset/calm-clocks-tick.md @@ -0,0 +1,9 @@ +--- +"helmor": minor +--- + +Add Automations — scheduled prompts that run on an interval, like Codex's: +- New Automations page (clock icon at the bottom of the sidebar): create automations manually or via chat, pause/resume, edit the interval, or run one immediately. +- Each run sends the saved prompt into its chat as a normal agent turn, labeled "Sent via automation" — the chat itself is the run history. Runs can target an existing chat or create a fresh session per run in a workspace. +- Intervals: hourly, daily, weekly, or every N minutes/hours. Schedules survive restarts and sleep — a slot missed while Helmor was closed catches up exactly once on next launch, and never double-fires. +- New `helmor automation` CLI (list/create/show/pause/resume/run/delete), so agents can set up automations for you straight from a conversation. diff --git a/src-tauri/src/agents.rs b/src-tauri/src/agents.rs index be76580fd..d9ec0e070 100644 --- a/src-tauri/src/agents.rs +++ b/src-tauri/src/agents.rs @@ -191,6 +191,13 @@ pub struct AgentSendRequest { /// round-trip without regex re-extraction. #[serde(default)] pub images: Option>, + /// Who initiated this turn. `None` = the user (frontend/CLI never set + /// it); `Some("automation")` = the automations scheduler. Persisted into + /// the `user_prompt` payload so the chat renders a "Sent via automation" + /// badge, and background turns skip the mark-read / active-session + /// side effects a human send performs. + #[serde(default)] + pub source: Option, } #[cfg(test)] @@ -202,6 +209,11 @@ pub(crate) struct ExchangeContext { pub(crate) model_id: String, pub(crate) model_provider: String, pub(crate) user_message_id: String, + /// True for scheduler-initiated turns (`request.source` set). Finalize + /// then skips marking the session read and stealing the workspace's + /// active session — a background run must surface as unread, not + /// rearrange the user's UI. + pub(crate) is_background: bool, } #[tauri::command] @@ -335,6 +347,46 @@ fn resolve_stream_working_directory( resolve_working_directory(request.working_directory.as_deref()) } +/// Start an agent turn with no owning IPC channel — used by the automations +/// scheduler. Identical to `send_agent_message_stream` minus the triage +/// priming: persistence, ActiveStreams busy-locking, and watcher fan-out +/// (`SessionStreamHub`) all run as for a frontend-initiated send, so an open +/// conversation still streams the turn live. The no-op channel only drops the +/// initiator-facing copy nobody is listening to. +pub(crate) fn start_background_turn(app: &AppHandle, request: AgentSendRequest) -> CmdResult<()> { + let prompt = request.prompt.trim().to_string(); + if prompt.is_empty() { + return Err(anyhow::anyhow!("Prompt cannot be empty.").into()); + } + + let model = resolve_model(&request.model_id, Some(request.provider.as_str())); + if request.provider != model.provider { + return Err(anyhow::anyhow!( + "Model {} does not belong to provider {}.", + request.model_id, + request.provider + ) + .into()); + } + + let working_directory = resolve_stream_working_directory(&request)?; + let stream_id = Uuid::new_v4().to_string(); + let sidecar = app.state::(); + let active_streams = app.state::(); + + stream_via_sidecar( + app.clone(), + Channel::new(|_| Ok(())), + &sidecar, + &active_streams, + &stream_id, + &model, + &prompt, + &request, + &working_directory, + ) +} + #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AgentStopRequest { @@ -840,10 +892,11 @@ mod tests { model_id: "opus-1m".to_string(), model_provider: "claude".to_string(), user_message_id: Uuid::new_v4().to_string(), + is_background: false, }; // 1. Persist user message - persist_user_message(&conn, &ctx, "Hello", &[], &[]).unwrap(); + persist_user_message(&conn, &ctx, "Hello", &[], &[], None).unwrap(); persist_result_and_finalize( &conn, @@ -913,9 +966,10 @@ mod tests { model_id: "opus-1m".to_string(), model_provider: "claude".to_string(), user_message_id: Uuid::new_v4().to_string(), + is_background: false, }; - persist_user_message(&conn, &ctx, "Hi", &[], &[]).unwrap(); + persist_user_message(&conn, &ctx, "Hi", &[], &[], None).unwrap(); persist_result_and_finalize( &conn, &ctx, @@ -971,10 +1025,11 @@ mod tests { model_id: "opus-1m".to_string(), model_provider: "claude".to_string(), user_message_id: Uuid::new_v4().to_string(), + is_background: false, }; // Persist user message - persist_user_message(&conn, &ctx, "Do something", &[], &[]).unwrap(); + persist_user_message(&conn, &ctx, "Do something", &[], &[], None).unwrap(); // Persist two intermediate turns let turn1 = CollectedTurn { @@ -1042,10 +1097,11 @@ mod tests { model_id: "opus-1m".to_string(), model_provider: "claude".to_string(), user_message_id: "user-initial".to_string(), + is_background: false, }; // 1. Initial prompt persisted via the normal path. - persist_user_message(&conn, &ctx, "investigate the bug", &[], &[]).unwrap(); + persist_user_message(&conn, &ctx, "investigate the bug", &[], &[], None).unwrap(); // 2. Drive the accumulator the same way the streaming loop does: // assistant deltas, steer event, more assistant deltas, result. diff --git a/src-tauri/src/agents/persistence.rs b/src-tauri/src/agents/persistence.rs index d9e6b5bc3..1725405ca 100644 --- a/src-tauri/src/agents/persistence.rs +++ b/src-tauri/src/agents/persistence.rs @@ -9,13 +9,15 @@ use super::ExchangeContext; /// Persist the user's prompt as the first message of the exchange. /// Wraps as `{"type":"user_prompt","text":"...","files":[...],"images":[...]}`. -/// Empty arrays are omitted from the JSON. +/// Empty arrays are omitted from the JSON; `source` (e.g. `"automation"`) is +/// only written when present so pre-existing rows keep their exact shape. pub(super) fn persist_user_message( conn: &Connection, ctx: &ExchangeContext, prompt: &str, files: &[String], images: &[String], + source: Option<&str>, ) -> Result<()> { let now = current_timestamp_string()?; let user_message_id = ctx.user_message_id.clone(); @@ -23,6 +25,9 @@ pub(super) fn persist_user_message( "type": "user_prompt", "text": prompt, }); + if let Some(source) = source { + payload["source"] = serde_json::Value::String(source.to_string()); + } if !files.is_empty() { payload["files"] = serde_json::Value::Array( files @@ -285,17 +290,22 @@ fn finalize_session_metadata_in_transaction( ], )?; - transaction.execute( - r#" - UPDATE workspaces - SET - active_session_id = ?2 - WHERE id = (SELECT workspace_id FROM sessions WHERE id = ?1) - "#, - params![ctx.helmor_session_id, ctx.helmor_session_id], - )?; - - mark_session_read_in_transaction(transaction, &ctx.helmor_session_id)?; + // Background (automation) turns must not rearrange the user's UI: the + // result should surface as *unread*, and the workspace's active session + // should stay wherever the user left it. + if !ctx.is_background { + transaction.execute( + r#" + UPDATE workspaces + SET + active_session_id = ?2 + WHERE id = (SELECT workspace_id FROM sessions WHERE id = ?1) + "#, + params![ctx.helmor_session_id, ctx.helmor_session_id], + )?; + + mark_session_read_in_transaction(transaction, &ctx.helmor_session_id)?; + } Ok(()) } @@ -313,6 +323,7 @@ mod tests { model_id: "gpt-5.4".to_string(), model_provider: "codex".to_string(), user_message_id: "user-1".to_string(), + is_background: false, } } @@ -376,7 +387,7 @@ mod tests { let conn = Connection::open_in_memory().unwrap(); make_messages_table(&conn); let ctx = test_exchange_context(); - persist_user_message(&conn, &ctx, "fix bug X", &[], &[]).unwrap(); + persist_user_message(&conn, &ctx, "fix bug X", &[], &[], None).unwrap(); let (role, content, id): (String, String, String) = conn .query_row( @@ -401,7 +412,7 @@ mod tests { make_messages_table(&conn); let ctx = test_exchange_context(); let files = vec!["a.rs".to_string(), "b.rs".to_string()]; - persist_user_message(&conn, &ctx, "refactor", &files, &[]).unwrap(); + persist_user_message(&conn, &ctx, "refactor", &files, &[], None).unwrap(); let content: String = conn .query_row( @@ -426,7 +437,7 @@ mod tests { let images = vec![ "/Users/me/Library/Application Support/CleanShot/CleanShot 2026-04-29 at 08.24.35@2x.jpg".to_string(), ]; - persist_user_message(&conn, &ctx, "look at this", &[], &images).unwrap(); + persist_user_message(&conn, &ctx, "look at this", &[], &images, None).unwrap(); let content: String = conn .query_row( diff --git a/src-tauri/src/agents/streaming/cleanup.rs b/src-tauri/src/agents/streaming/cleanup.rs index 9ff495b4a..4216e08cb 100644 --- a/src-tauri/src/agents/streaming/cleanup.rs +++ b/src-tauri/src/agents/streaming/cleanup.rs @@ -151,6 +151,7 @@ mod tests { model_id: "opus".to_string(), model_provider: "claude".to_string(), user_message_id: "user-1".to_string(), + is_background: false, } } diff --git a/src-tauri/src/agents/streaming/mod.rs b/src-tauri/src/agents/streaming/mod.rs index 94caac381..989773aba 100644 --- a/src-tauri/src/agents/streaming/mod.rs +++ b/src-tauri/src/agents/streaming/mod.rs @@ -256,6 +256,7 @@ pub(super) fn stream_via_sidecar( let user_message_id_copy = request.user_message_id.clone(); let files_copy = request.files.clone().unwrap_or_default(); let images_copy = request.images.clone().unwrap_or_default(); + let source_copy = request.source.clone(); let sidecar_session_id_copy = sidecar_session_id.clone(); let rid = request_id.clone(); @@ -312,6 +313,7 @@ pub(super) fn stream_via_sidecar( user_message_id: user_message_id_copy .clone() .unwrap_or_else(|| Uuid::new_v4().to_string()), + is_background: source_copy.is_some(), }; match crate::models::db::write_conn() { @@ -323,8 +325,14 @@ pub(super) fn stream_via_sidecar( tracing::error!(rid = %rid, "Failed to update fast_mode: {e}"); } - match persist_user_message(&conn, &ctx, &prompt_copy, &files_copy, &images_copy) - { + match persist_user_message( + &conn, + &ctx, + &prompt_copy, + &files_copy, + &images_copy, + source_copy.as_deref(), + ) { Ok(()) => { tracing::debug!(rid = %rid, "User message persisted to DB"); exchange_ctx = Some(ctx); @@ -1366,6 +1374,7 @@ fn build_exit_plan_review_message( })], status: None, streaming: None, + source: None, } } diff --git a/src-tauri/src/agents/streaming/stream_hub.rs b/src-tauri/src/agents/streaming/stream_hub.rs index 3528cc508..4918ac56c 100644 --- a/src-tauri/src/agents/streaming/stream_hub.rs +++ b/src-tauri/src/agents/streaming/stream_hub.rs @@ -152,6 +152,7 @@ mod tests { })], status: None, streaming: None, + source: None, }], } } diff --git a/src-tauri/src/automations/dispatch.rs b/src-tauri/src/automations/dispatch.rs new file mode 100644 index 000000000..cd15b55e1 --- /dev/null +++ b/src-tauri/src/automations/dispatch.rs @@ -0,0 +1,132 @@ +//! Turn dispatch for automation runs. +//! +//! Shared by the scheduler tick and the "Run now" command. A run is a +//! completely normal agent turn started through +//! `agents::start_background_turn` (no-op IPC channel): persistence, +//! watcher fan-out, busy-locking, and shutdown handling are all the regular +//! streaming machinery. This module only resolves *where* the turn goes. + +use anyhow::anyhow; +use tauri::AppHandle; + +use crate::agents::AgentSendRequest; +use crate::models::automations::{AutomationRecord, RUNS_IN_CHAT, RUNS_IN_WORKSPACE}; +use crate::models::sessions; + +pub struct StartedRun { + pub session_id: String, +} + +pub enum RunError { + /// The bound session already has a turn in flight (claim/dispatch raced a + /// user send). The scheduler rolls the claim back and retries next tick. + SessionBusy, + /// The bound session/workspace no longer exists (or never resolved). The + /// scheduler pauses the automation instead of retrying forever. + TargetMissing(String), + /// Anything else. Deliberately not retried — the next scheduled slot is + /// the recovery path. + Other(anyhow::Error), +} + +/// Resolve the automation's target session and start a background turn with +/// its prompt. Returns once the sidecar accepted the turn (streaming +/// continues on the event-loop thread). +pub fn run_automation_now( + app: &AppHandle, + automation: &AutomationRecord, +) -> Result { + let (session_id, workspace_id, permission_mode) = resolve_target(automation)?; + + let workspace = crate::workspaces::get_workspace(&workspace_id) + .map_err(|error| RunError::TargetMissing(format!("workspace {workspace_id}: {error:#}")))?; + let root_path = workspace.root_path.ok_or_else(|| { + RunError::TargetMissing(format!("workspace {workspace_id} has no root_path")) + })?; + + // Model: session row > "default", with the session's agent_type as the + // provider hint — same resolution as `helmor send` (service.rs). + let (session_model, session_provider) = + sessions::get_session_model_and_provider(&session_id).unwrap_or((None, None)); + let model_id = session_model.unwrap_or_else(|| "default".to_string()); + let model = crate::agents::resolve_model(&model_id, session_provider.as_deref()); + + let request = AgentSendRequest { + provider: model.provider.to_string(), + model_id: model.id.to_string(), + prompt: automation.prompt.clone(), + prompt_prefix: None, + session_id: None, + helmor_session_id: Some(session_id.clone()), + working_directory: Some(root_path), + effort_level: None, + permission_mode, + fast_mode: None, + user_message_id: None, + files: None, + images: None, + source: Some("automation".to_string()), + }; + + crate::agents::start_background_turn(app, request).map_err(|error| { + // CommandError exposes the chain via Debug; the busy rejection is the + // one failure we must distinguish (roll back + retry next tick). + let message = format!("{error:?}"); + if message.contains("still running for this session") { + RunError::SessionBusy + } else { + RunError::Other(anyhow!(message)) + } + })?; + + Ok(StartedRun { session_id }) +} + +/// (session_id, workspace_id, permission_mode override) for this run. +/// `chat` mode targets the bound session; `workspace` mode creates a fresh +/// session named after the automation. +fn resolve_target( + automation: &AutomationRecord, +) -> Result<(String, String, Option), RunError> { + match automation.runs_in.as_str() { + RUNS_IN_CHAT => { + let session_id = automation.session_id.clone().ok_or_else(|| { + RunError::TargetMissing("chat automation has no bound session".to_string()) + })?; + let row = sessions::get_session_workspace_and_permission(&session_id) + .map_err(RunError::Other)? + .ok_or_else(|| { + RunError::TargetMissing(format!("bound session {session_id} no longer exists")) + })?; + let workspace_id = row.0.ok_or_else(|| { + RunError::TargetMissing(format!("session {session_id} has no workspace")) + })?; + Ok((session_id, workspace_id, Some(row.1))) + } + RUNS_IN_WORKSPACE => { + let workspace_id = automation.workspace_id.clone().ok_or_else(|| { + RunError::TargetMissing("workspace automation has no bound workspace".to_string()) + })?; + let created = sessions::create_session( + &workspace_id, + None, + None, + sessions::CreateSessionOverrides::default(), + ) + .map_err(RunError::Other)?; + // Best-effort: name the fresh session after the automation so the + // sidebar reads "Target order monitor", not "Untitled". + if let Err(error) = sessions::rename_session(&created.session_id, &automation.title) { + tracing::warn!( + session_id = %created.session_id, + error = %format!("{error:#}"), + "automations: failed to title run session" + ); + } + Ok((created.session_id, workspace_id, None)) + } + other => Err(RunError::TargetMissing(format!( + "unknown runs_in value {other:?}" + ))), + } +} diff --git a/src-tauri/src/automations/mod.rs b/src-tauri/src/automations/mod.rs new file mode 100644 index 000000000..0b814c80a --- /dev/null +++ b/src-tauri/src/automations/mod.rs @@ -0,0 +1,20 @@ +//! Scheduled automations — Codex-style recurring prompts. +//! +//! An automation periodically injects a fixed prompt into a session and runs +//! a normal agent turn; the chat itself is the run history. Reliability model: +//! +//! - SQLite is the single source of truth (`models::automations`); the +//! scheduler thread holds no durable state. +//! - The scheduler is a stateless 30s poll loop (`scheduler`); a tick that +//! arrives late (app restart, machine sleep) simply sees overdue rows. +//! - Claim-before-dispatch: a CAS UPDATE on `next_run_at` makes each slot +//! fire at most once, with `next_run_at` always recomputed from "now" +//! (`schedule`) so long offline gaps produce exactly one catch-up run. +//! - Dispatch (`dispatch`) reuses the regular streaming engine with a no-op +//! IPC channel — watchers and persistence behave exactly like a +//! user-initiated send, and no UI focus is stolen. + +pub mod dispatch; +pub mod ops; +pub mod schedule; +pub mod scheduler; diff --git a/src-tauri/src/automations/ops.rs b/src-tauri/src/automations/ops.rs new file mode 100644 index 000000000..ea16409cc --- /dev/null +++ b/src-tauri/src/automations/ops.rs @@ -0,0 +1,279 @@ +//! Domain operations shared by the Tauri commands and the `helmor +//! automation` CLI. Pure validation + persistence — UI notification +//! (`ui_sync::publish` vs `notify_running_app`) stays with the callers. + +use anyhow::{bail, Context, Result}; +use chrono::Utc; +use serde_json::Value; + +use super::schedule::{format_utc, next_run_after, Schedule}; +use crate::models::automations::{ + self, AutomationRecord, NewAutomation, RUNS_IN_CHAT, RUNS_IN_WORKSPACE, STATUS_ACTIVE, + STATUS_PAUSED, +}; + +pub struct CreateAutomationInput { + pub title: String, + pub prompt: String, + pub runs_in: String, + pub session_id: Option, + pub workspace_id: Option, + pub schedule: Value, +} + +#[derive(Default)] +pub struct UpdateAutomationInput { + pub title: Option, + pub prompt: Option, + pub runs_in: Option, + pub session_id: Option, + pub workspace_id: Option, + pub schedule: Option, +} + +pub fn create_automation(input: CreateAutomationInput) -> Result { + let title = input.title.trim(); + let prompt = input.prompt.trim(); + if title.is_empty() { + bail!("Automation title cannot be empty"); + } + if prompt.is_empty() { + bail!("Automation prompt cannot be empty"); + } + let schedule = parse_schedule(&input.schedule)?; + validate_target( + &input.runs_in, + input.session_id.as_deref(), + input.workspace_id.as_deref(), + )?; + let next_run_at = format_utc(next_run_after(&schedule, Utc::now())?); + // Store the canonical serde form, not caller-provided JSON verbatim. + let canonical = serde_json::to_value(&schedule)?; + automations::insert_automation(&NewAutomation { + title, + prompt, + runs_in: &input.runs_in, + session_id: input.session_id.as_deref(), + workspace_id: input.workspace_id.as_deref(), + schedule: &canonical, + next_run_at: &next_run_at, + }) +} + +/// Read-modify-write edit. A schedule change recomputes `next_run_at` from +/// now; binding changes are re-validated as a whole. +pub fn update_automation(id: &str, input: UpdateAutomationInput) -> Result { + let mut record = + automations::get_automation(id)?.with_context(|| format!("Automation {id} not found"))?; + + if let Some(title) = input.title { + let title = title.trim().to_string(); + if title.is_empty() { + bail!("Automation title cannot be empty"); + } + record.title = title; + } + if let Some(prompt) = input.prompt { + let prompt = prompt.trim().to_string(); + if prompt.is_empty() { + bail!("Automation prompt cannot be empty"); + } + record.prompt = prompt; + } + if let Some(runs_in) = input.runs_in { + record.runs_in = runs_in; + } + if let Some(session_id) = input.session_id { + record.session_id = Some(session_id); + } + if let Some(workspace_id) = input.workspace_id { + record.workspace_id = Some(workspace_id); + } + validate_target( + &record.runs_in, + record.session_id.as_deref(), + record.workspace_id.as_deref(), + )?; + + if let Some(schedule_json) = input.schedule { + let schedule = parse_schedule(&schedule_json)?; + record.schedule = serde_json::to_value(&schedule)?; + record.next_run_at = format_utc(next_run_after(&schedule, Utc::now())?); + } + + automations::update_automation_record(&record)?; + automations::get_automation(id)?.with_context(|| format!("Automation {id} not found")) +} + +/// Pause or resume. Resume recomputes `next_run_at` from now so a +/// long-paused automation never fires immediately. +pub fn set_status(id: &str, status: &str) -> Result { + let record = + automations::get_automation(id)?.with_context(|| format!("Automation {id} not found"))?; + match status { + STATUS_PAUSED => automations::set_automation_status(id, STATUS_PAUSED, None)?, + STATUS_ACTIVE => { + let schedule: Schedule = serde_json::from_value(record.schedule.clone()) + .context("Automation has an unparseable schedule")?; + let next_run_at = format_utc(next_run_after(&schedule, Utc::now())?); + automations::set_automation_status(id, STATUS_ACTIVE, Some(&next_run_at))?; + } + other => bail!("Invalid status {other:?} — expected active or paused"), + } + automations::get_automation(id)?.with_context(|| format!("Automation {id} not found")) +} + +/// Manual "Run now": dispatch immediately, record `last_run_at`, leave the +/// schedule (`next_run_at`) untouched. Returns the session the run landed in. +pub fn run_now(app: &tauri::AppHandle, id: &str) -> Result { + let record = + automations::get_automation(id)?.with_context(|| format!("Automation {id} not found"))?; + let started = + super::dispatch::run_automation_now(app, &record).map_err(|error| match error { + super::dispatch::RunError::SessionBusy => anyhow::anyhow!( + "A turn is already running in this automation's chat. Wait for it to finish." + ), + super::dispatch::RunError::TargetMissing(reason) => { + anyhow::anyhow!("Automation target is gone: {reason}") + } + super::dispatch::RunError::Other(error) => error, + })?; + automations::set_last_run_at(id, &crate::models::db::current_timestamp()?)?; + Ok(started.session_id) +} + +pub fn parse_schedule(value: &Value) -> Result { + let schedule: Schedule = serde_json::from_value(value.clone()) + .context("Invalid schedule — expected {kind: hourly|daily|weekly|every, ...}")?; + schedule.validate()?; + Ok(schedule) +} + +fn validate_target( + runs_in: &str, + session_id: Option<&str>, + workspace_id: Option<&str>, +) -> Result<()> { + match runs_in { + RUNS_IN_CHAT => { + let session_id = + session_id.context("runs_in=chat requires a bound session (sessionId)")?; + let exists = crate::models::sessions::get_session_workspace_and_permission(session_id)? + .is_some(); + if !exists { + bail!("Session {session_id} does not exist"); + } + } + RUNS_IN_WORKSPACE => { + let workspace_id = + workspace_id.context("runs_in=workspace requires a workspace (workspaceId)")?; + crate::workspaces::get_workspace(workspace_id) + .with_context(|| format!("Workspace {workspace_id} does not exist"))?; + } + other => bail!("Invalid runs_in {other:?} — expected chat or workspace"), + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::automations::STATUS_ACTIVE; + + fn workspace_fixture() -> (String, String) { + // Minimal repo+workspace+session rows so target validation passes. + let conn = crate::models::db::write_conn().unwrap(); + conn.execute_batch( + r#" + INSERT INTO repos (id, name, root_path) VALUES ('repo-1', 'demo', '/tmp/demo'); + INSERT INTO workspaces (id, repository_id, directory_name) VALUES ('ws-1', 'repo-1', 'demo-ws'); + INSERT INTO sessions (id, workspace_id, title) VALUES ('sess-1', 'ws-1', 'Chat'); + "#, + ) + .unwrap(); + ("ws-1".into(), "sess-1".into()) + } + + fn chat_input(session_id: &str) -> CreateAutomationInput { + CreateAutomationInput { + title: "Order monitor".into(), + prompt: "check the thing".into(), + runs_in: RUNS_IN_CHAT.into(), + session_id: Some(session_id.to_string()), + workspace_id: None, + schedule: serde_json::json!({"kind": "hourly"}), + } + } + + #[test] + fn create_validates_and_computes_next_run() { + let _env = crate::testkit::TestEnv::new("automation-ops-create"); + let (_ws, session) = workspace_fixture(); + + let record = create_automation(chat_input(&session)).unwrap(); + assert_eq!(record.status, STATUS_ACTIVE); + assert!(record.next_run_at > crate::models::db::current_timestamp().unwrap()); + + // Missing session → rejected. + let bad = create_automation(chat_input("nope")); + assert!(bad.is_err()); + + // Garbage schedule → rejected. + let mut input = chat_input(&session); + input.schedule = serde_json::json!({"kind": "fortnightly"}); + assert!(create_automation(input).is_err()); + } + + #[test] + fn resume_recomputes_next_run_from_now() { + let _env = crate::testkit::TestEnv::new("automation-ops-resume"); + let (_ws, session) = workspace_fixture(); + let record = create_automation(chat_input(&session)).unwrap(); + + set_status(&record.id, STATUS_PAUSED).unwrap(); + // Make the stored next_run_at stale, as after a long pause. + crate::models::automations::set_automation_status( + &record.id, + STATUS_PAUSED, + Some("2000-01-01T00:00:00.000Z"), + ) + .unwrap(); + + let resumed = set_status(&record.id, STATUS_ACTIVE).unwrap(); + assert_eq!(resumed.status, STATUS_ACTIVE); + // No immediate fire: next_run_at is in the future again. + assert!(resumed.next_run_at > crate::models::db::current_timestamp().unwrap()); + } + + #[test] + fn update_schedule_recomputes_but_title_edit_does_not() { + let _env = crate::testkit::TestEnv::new("automation-ops-update"); + let (_ws, session) = workspace_fixture(); + let record = create_automation(chat_input(&session)).unwrap(); + let original_next = record.next_run_at.clone(); + + let renamed = update_automation( + &record.id, + UpdateAutomationInput { + title: Some("Renamed".into()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(renamed.title, "Renamed"); + assert_eq!(renamed.next_run_at, original_next); + + let rescheduled = update_automation( + &record.id, + UpdateAutomationInput { + schedule: Some( + serde_json::json!({"kind": "every", "amount": 5, "unit": "minutes"}), + ), + ..Default::default() + }, + ) + .unwrap(); + assert_ne!(rescheduled.next_run_at, original_next); + assert_eq!(rescheduled.schedule["kind"], "every"); + } +} diff --git a/src-tauri/src/automations/schedule.rs b/src-tauri/src/automations/schedule.rs new file mode 100644 index 000000000..51f51875c --- /dev/null +++ b/src-tauri/src/automations/schedule.rs @@ -0,0 +1,393 @@ +//! Schedule spec + next-run computation. +//! +//! `next_run_after` is a pure function of (schedule, now) so it needs no +//! clock mocking in tests. Interval kinds are plain UTC arithmetic; daily and +//! weekly resolve the next *local wall-clock* occurrence (generic over +//! `chrono::TimeZone` — production passes `Local`, tests pass `FixedOffset`). +//! Crucially the result is always computed from "now", never incremented from +//! the previous value, so a machine that slept through N slots schedules +//! exactly one catch-up run and falls back into cadence. + +use anyhow::{bail, Context, Result}; +use chrono::{ + DateTime, Datelike, Duration, LocalResult, NaiveDateTime, NaiveTime, SecondsFormat, TimeZone, + Utc, Weekday, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "camelCase")] +pub enum Schedule { + /// Every hour, anchored to the moment of (re)computation. + Hourly, + /// Every day at a local wall-clock time, `"HH:MM"`. + Daily { time: String }, + /// Every week on `weekday` (0 = Sunday … 6 = Saturday, JS convention) + /// at a local wall-clock time, `"HH:MM"`. + Weekly { weekday: u8, time: String }, + /// Every N minutes/hours, anchored to the moment of (re)computation. + Every { amount: u32, unit: EveryUnit }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum EveryUnit { + Minutes, + Hours, +} + +impl Schedule { + pub fn validate(&self) -> Result<()> { + match self { + Schedule::Hourly => Ok(()), + Schedule::Daily { time } => parse_hhmm(time).map(|_| ()), + Schedule::Weekly { weekday, time } => { + weekday_from_index(*weekday)?; + parse_hhmm(time).map(|_| ()) + } + Schedule::Every { amount, .. } => { + if *amount == 0 { + bail!("Custom interval must be at least 1"); + } + Ok(()) + } + } + } + + /// Human summary for list rows and chat badges ("Hourly", "Daily at 09:00"). + pub fn summary(&self) -> String { + match self { + Schedule::Hourly => "Hourly".to_string(), + Schedule::Daily { time } => format!("Daily at {time}"), + Schedule::Weekly { weekday, time } => { + let day = weekday_from_index(*weekday) + .map(weekday_label) + .unwrap_or("?"); + format!("Weekly on {day} at {time}") + } + Schedule::Every { amount, unit } => match unit { + EveryUnit::Minutes => format!("Every {amount}m"), + EveryUnit::Hours => format!("Every {amount}h"), + }, + } + } +} + +/// Next fire instant strictly after `now`, in the local timezone. +pub fn next_run_after(schedule: &Schedule, now: DateTime) -> Result> { + next_run_after_in(schedule, now, &chrono::Local) +} + +/// Timezone-generic core of [`next_run_after`] — unit-testable with +/// `FixedOffset`. +pub fn next_run_after_in( + schedule: &Schedule, + now: DateTime, + tz: &Tz, +) -> Result> { + schedule.validate()?; + Ok(match schedule { + Schedule::Hourly => now + Duration::hours(1), + Schedule::Every { amount, unit } => { + now + match unit { + EveryUnit::Minutes => Duration::minutes(i64::from(*amount)), + EveryUnit::Hours => Duration::hours(i64::from(*amount)), + } + } + Schedule::Daily { time } => next_local_occurrence(now, tz, parse_hhmm(time)?, None), + Schedule::Weekly { weekday, time } => next_local_occurrence( + now, + tz, + parse_hhmm(time)?, + Some(weekday_from_index(*weekday)?), + ), + }) +} + +/// Storage format for `automations.next_run_at` — matches +/// `db::current_timestamp()` (RFC3339 UTC millis) so strings compare +/// chronologically. +pub fn format_utc(instant: DateTime) -> String { + instant.to_rfc3339_opts(SecondsFormat::Millis, true) +} + +/// Next local wall-clock occurrence of `time` (optionally constrained to a +/// weekday) strictly after `now`. Recomputed from scratch every call, so DST +/// shifts self-correct on the following cycle. +fn next_local_occurrence( + now: DateTime, + tz: &Tz, + time: NaiveTime, + weekday: Option, +) -> DateTime { + let local_now = now.with_timezone(tz); + let mut date = local_now.date_naive(); + // 8 days covers a full weekday cycle plus the "today's slot already + // passed" case; the unreachable fallback below is pure defense. + for _ in 0..=8 { + let weekday_matches = weekday.is_none_or(|w| date.weekday() == w); + if weekday_matches { + if let Some(candidate) = resolve_local(tz, date.and_time(time)) { + if candidate > now { + return candidate; + } + } + } + match date.succ_opt() { + Some(next) => date = next, + None => break, + } + } + now + Duration::days(1) +} + +/// Resolve a naive local datetime to UTC. DST fold (ambiguous) takes the +/// earliest instant; DST gap (nonexistent) advances minute-by-minute to the +/// first valid instant after the gap. +fn resolve_local(tz: &Tz, naive: NaiveDateTime) -> Option> { + let mut probe = naive; + // Bounded walk: real DST gaps are ≤ 2h; 240 minutes is generous. + for _ in 0..240 { + match tz.from_local_datetime(&probe) { + LocalResult::Single(dt) => return Some(dt.with_timezone(&Utc)), + LocalResult::Ambiguous(earliest, _) => return Some(earliest.with_timezone(&Utc)), + LocalResult::None => probe += Duration::minutes(1), + } + } + None +} + +fn parse_hhmm(time: &str) -> Result { + NaiveTime::parse_from_str(time, "%H:%M") + .with_context(|| format!("Invalid time {time:?} — expected HH:MM")) +} + +fn weekday_from_index(weekday: u8) -> Result { + Ok(match weekday { + 0 => Weekday::Sun, + 1 => Weekday::Mon, + 2 => Weekday::Tue, + 3 => Weekday::Wed, + 4 => Weekday::Thu, + 5 => Weekday::Fri, + 6 => Weekday::Sat, + other => bail!("Invalid weekday {other} — expected 0 (Sunday) … 6 (Saturday)"), + }) +} + +fn weekday_label(weekday: Weekday) -> &'static str { + match weekday { + Weekday::Sun => "Sunday", + Weekday::Mon => "Monday", + Weekday::Tue => "Tuesday", + Weekday::Wed => "Wednesday", + Weekday::Thu => "Thursday", + Weekday::Fri => "Friday", + Weekday::Sat => "Saturday", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::FixedOffset; + + fn utc(s: &str) -> DateTime { + s.parse().unwrap() + } + + /// UTC+8 — no DST, deterministic across test machines. + fn tz_plus8() -> FixedOffset { + FixedOffset::east_opt(8 * 3600).unwrap() + } + + #[test] + fn hourly_and_every_are_anchored_to_now() { + let now = utc("2026-06-11T10:00:00Z"); + assert_eq!( + next_run_after_in(&Schedule::Hourly, now, &tz_plus8()).unwrap(), + utc("2026-06-11T11:00:00Z") + ); + let every = Schedule::Every { + amount: 5, + unit: EveryUnit::Minutes, + }; + assert_eq!( + next_run_after_in(&every, now, &tz_plus8()).unwrap(), + utc("2026-06-11T10:05:00Z") + ); + let every_hours = Schedule::Every { + amount: 3, + unit: EveryUnit::Hours, + }; + assert_eq!( + next_run_after_in(&every_hours, now, &tz_plus8()).unwrap(), + utc("2026-06-11T13:00:00Z") + ); + } + + #[test] + fn daily_today_when_time_not_yet_passed() { + // 2026-06-11 10:00 UTC = 18:00 local (+8). Daily at 21:00 local → + // today 21:00 local = 13:00 UTC. + let now = utc("2026-06-11T10:00:00Z"); + let schedule = Schedule::Daily { + time: "21:00".into(), + }; + assert_eq!( + next_run_after_in(&schedule, now, &tz_plus8()).unwrap(), + utc("2026-06-11T13:00:00Z") + ); + } + + #[test] + fn daily_tomorrow_when_time_already_passed() { + // 18:00 local, daily at 09:00 → tomorrow 09:00 local = 01:00 UTC. + let now = utc("2026-06-11T10:00:00Z"); + let schedule = Schedule::Daily { + time: "09:00".into(), + }; + assert_eq!( + next_run_after_in(&schedule, now, &tz_plus8()).unwrap(), + utc("2026-06-12T01:00:00Z") + ); + } + + #[test] + fn daily_exact_boundary_rolls_to_next_day() { + // Exactly 09:00 local: "strictly after now" → tomorrow. + let now = utc("2026-06-11T01:00:00Z"); // 09:00 local (+8) + let schedule = Schedule::Daily { + time: "09:00".into(), + }; + assert_eq!( + next_run_after_in(&schedule, now, &tz_plus8()).unwrap(), + utc("2026-06-12T01:00:00Z") + ); + } + + #[test] + fn weekly_same_day_later_time_fires_today() { + // 2026-06-11 is a Thursday. 18:00 local, weekly Thu 20:00 → today. + let now = utc("2026-06-11T10:00:00Z"); + let schedule = Schedule::Weekly { + weekday: 4, + time: "20:00".into(), + }; + assert_eq!( + next_run_after_in(&schedule, now, &tz_plus8()).unwrap(), + utc("2026-06-11T12:00:00Z") + ); + } + + #[test] + fn weekly_wraps_to_next_week() { + // Thursday 18:00 local, weekly Thu 09:00 → next Thursday. + let now = utc("2026-06-11T10:00:00Z"); + let schedule = Schedule::Weekly { + weekday: 4, + time: "09:00".into(), + }; + assert_eq!( + next_run_after_in(&schedule, now, &tz_plus8()).unwrap(), + utc("2026-06-18T01:00:00Z") + ); + } + + #[test] + fn weekly_crosses_into_earlier_weekday_next_week() { + // Thursday, weekly Monday 08:00 → the coming Monday (Jun 15). + let now = utc("2026-06-11T10:00:00Z"); + let schedule = Schedule::Weekly { + weekday: 1, + time: "08:00".into(), + }; + assert_eq!( + next_run_after_in(&schedule, now, &tz_plus8()).unwrap(), + utc("2026-06-15T00:00:00Z") + ); + } + + #[test] + fn result_is_always_strictly_after_now() { + let schedules = [ + Schedule::Hourly, + Schedule::Daily { + time: "00:00".into(), + }, + Schedule::Weekly { + weekday: 0, + time: "23:59".into(), + }, + Schedule::Every { + amount: 1, + unit: EveryUnit::Minutes, + }, + ]; + let nows = [ + utc("2026-06-11T00:00:00Z"), + utc("2026-06-11T23:59:00Z"), + utc("2026-12-31T23:59:59Z"), + ]; + for schedule in &schedules { + for now in nows { + let next = next_run_after_in(schedule, now, &tz_plus8()).unwrap(); + assert!(next > now, "{schedule:?} at {now} produced {next}"); + } + } + } + + #[test] + fn validation_rejects_bad_inputs() { + assert!(next_run_after_in( + &Schedule::Daily { + time: "25:00".into() + }, + utc("2026-06-11T00:00:00Z"), + &tz_plus8(), + ) + .is_err()); + assert!(Schedule::Weekly { + weekday: 7, + time: "09:00".into() + } + .validate() + .is_err()); + assert!(Schedule::Every { + amount: 0, + unit: EveryUnit::Hours + } + .validate() + .is_err()); + } + + #[test] + fn schedule_json_shape_is_stable() { + // The JSON tag shape is the storage + IPC contract. + let weekly = Schedule::Weekly { + weekday: 4, + time: "09:30".into(), + }; + assert_eq!( + serde_json::to_value(&weekly).unwrap(), + serde_json::json!({"kind": "weekly", "weekday": 4, "time": "09:30"}) + ); + let every: Schedule = serde_json::from_value( + serde_json::json!({"kind": "every", "amount": 15, "unit": "minutes"}), + ) + .unwrap(); + assert_eq!( + every, + Schedule::Every { + amount: 15, + unit: EveryUnit::Minutes + } + ); + } + + #[test] + fn format_utc_matches_db_timestamp_shape() { + let formatted = format_utc(utc("2026-06-11T10:00:00Z")); + assert_eq!(formatted, "2026-06-11T10:00:00.000Z"); + } +} diff --git a/src-tauri/src/automations/scheduler.rs b/src-tauri/src/automations/scheduler.rs new file mode 100644 index 000000000..4afb6c50f --- /dev/null +++ b/src-tauri/src/automations/scheduler.rs @@ -0,0 +1,185 @@ +//! Stateless poll-loop scheduler for automations. +//! +//! Same shape as `triage::fetcher::spawn_scheduler`: a dedicated std::thread +//! that ticks every 30s. All durable state lives in SQLite — a tick reads due +//! rows, CAS-claims each slot (advancing `next_run_at` computed from *now*), +//! and only then dispatches. Consequences, by construction: +//! +//! - App quit / crash / machine sleep: the next tick (whenever it happens) +//! sees overdue rows and fires each exactly once — one catch-up run, no +//! backlog, no double-fire. +//! - Busy target session: skipped *without claiming*; the 30s cadence is the +//! retry loop, which serializes runs naturally. +//! - Dispatch failure after a claim: the claim stands (run lost, next slot +//! recovers) — except the busy race, which rolls the claim back, and a +//! missing target, which pauses the automation. + +use std::collections::{HashMap, HashSet}; +use std::thread; +use std::time::{Duration, Instant}; + +use chrono::Utc; +use tauri::{AppHandle, Manager}; + +use super::dispatch::{self, RunError}; +use super::schedule::{format_utc, next_run_after, Schedule}; +use crate::agents::ActiveStreams; +use crate::models::automations::{self, AutomationRecord, RUNS_IN_CHAT, STATUS_PAUSED}; +use crate::models::db; + +const STARTUP_DELAY_SEC: u64 = 20; +const TICK_INTERVAL_SEC: u64 = 30; + +pub fn spawn_scheduler(app: AppHandle) { + if let Err(error) = thread::Builder::new() + .name("automations-scheduler".into()) + .spawn(move || scheduler_loop(app)) + { + tracing::error!(error = %error, "automations: failed to spawn scheduler thread"); + } +} + +fn scheduler_loop(app: AppHandle) { + // Defer the first tick so startup catch-up runs don't compete with the + // boot sequence for the single-writer DB pool. + thread::sleep(Duration::from_secs(STARTUP_DELAY_SEC)); + // automation_id → session_id of a run this process started and believes + // is still streaming. Purely an overlap guard; pruned against + // ActiveStreams each tick, so it self-heals and survives nothing. + let mut in_flight: HashMap = HashMap::new(); + loop { + let start = Instant::now(); + if let Err(error) = tick(&app, &mut in_flight) { + tracing::warn!(error = %format!("{error:#}"), "automations: tick failed"); + } + let elapsed = start.elapsed(); + thread::sleep(Duration::from_secs(TICK_INTERVAL_SEC).saturating_sub(elapsed)); + } +} + +fn tick(app: &AppHandle, in_flight: &mut HashMap) -> anyhow::Result<()> { + let active_sessions: HashSet = app + .state::() + .snapshot_for_ui() + .into_iter() + .map(|stream| stream.session_id) + .collect(); + in_flight.retain(|_, session_id| active_sessions.contains(session_id)); + + let now = db::current_timestamp()?; + let due = automations::due_automations(&now)?; + if due.is_empty() { + return Ok(()); + } + + let mut changed = false; + for automation in due { + match fire(app, &automation, &active_sessions, in_flight) { + Ok(row_changed) => changed |= row_changed, + Err(error) => tracing::warn!( + automation_id = %automation.id, + error = %format!("{error:#}"), + "automations: firing failed" + ), + } + } + if changed { + crate::ui_sync::publish(app, crate::ui_sync::UiMutationEvent::AutomationsChanged); + } + Ok(()) +} + +/// Claim-then-dispatch one due automation. Returns true when the row changed +/// (claimed, fired, or paused) so the tick knows to publish a UI event. +fn fire( + app: &AppHandle, + automation: &AutomationRecord, + active_sessions: &HashSet, + in_flight: &mut HashMap, +) -> anyhow::Result { + // Overlap guards — skip WITHOUT claiming; the next tick retries. + if in_flight.contains_key(&automation.id) { + return Ok(false); + } + if automation.runs_in == RUNS_IN_CHAT { + if let Some(session_id) = automation.session_id.as_deref() { + if active_sessions.contains(session_id) { + return Ok(false); + } + } + } + + let schedule: Schedule = match serde_json::from_value(automation.schedule.clone()) { + Ok(schedule) => schedule, + Err(error) => { + // A corrupt schedule would stay due forever — pause loudly + // instead of warning every 30s. + tracing::error!( + automation_id = %automation.id, + error = %error, + "automations: unparseable schedule — pausing" + ); + automations::set_automation_status(&automation.id, STATUS_PAUSED, None)?; + return Ok(true); + } + }; + + // Claim before dispatch: CAS on the observed `next_run_at` makes this + // slot fire at most once, and computing the new value from *now* means a + // long-offline automation catches up exactly once. + let new_next = format_utc(next_run_after(&schedule, Utc::now())?); + let claim_now = db::current_timestamp()?; + if !automations::claim_automation( + &automation.id, + &automation.next_run_at, + &new_next, + &claim_now, + )? { + // Someone else won (concurrent edit / second claimer). Not ours. + return Ok(false); + } + + match dispatch::run_automation_now(app, automation) { + Ok(started) => { + tracing::info!( + automation_id = %automation.id, + session_id = %started.session_id, + next_run_at = %new_next, + "automations: run dispatched" + ); + in_flight.insert(automation.id.clone(), started.session_id); + Ok(true) + } + Err(RunError::SessionBusy) => { + // The pre-check raced a user send. Roll the claim back so the + // slot retries on the next tick instead of losing the run. + automations::unclaim_automation( + &automation.id, + &new_next, + &automation.next_run_at, + automation.last_run_at.as_deref(), + )?; + Ok(false) + } + Err(RunError::TargetMissing(reason)) => { + tracing::warn!( + automation_id = %automation.id, + reason, + "automations: target missing — pausing" + ); + automations::set_automation_status(&automation.id, STATUS_PAUSED, None)?; + Ok(true) + } + Err(RunError::Other(error)) => { + // Keep the claim — deliberate no-retry policy. Unclaiming would + // hot-retry a persistent failure every 30s; the next scheduled + // slot is the recovery path. + tracing::error!( + automation_id = %automation.id, + error = %format!("{error:#}"), + "automations: dispatch failed — run skipped until next slot" + ); + Ok(true) + } + } +} diff --git a/src-tauri/src/cli/args.rs b/src-tauri/src/cli/args.rs index 72a990bb6..c6c3f8150 100644 --- a/src-tauri/src/cli/args.rs +++ b/src-tauri/src/cli/args.rs @@ -165,6 +165,11 @@ pub enum Commands { #[command(subcommand)] action: SessionAction, }, + /// Scheduled automations — recurring prompts on an interval. + Automation { + #[command(subcommand)] + action: AutomationAction, + }, /// File listing, reading, writing, staging (editor surface). Files { #[command(subcommand)] @@ -573,6 +578,59 @@ pub enum ReadState { Unread, } +// --------------------------------------------------------------------------- +// automation +// --------------------------------------------------------------------------- + +#[derive(Subcommand)] +pub enum AutomationAction { + /// List all automations. + List, + /// Show one automation in full (prompt, schedule, next/last run). + Show { automation: String }, + /// Create an automation. + /// + /// Examples: + /// helmor automation create --title "Order monitor" --prompt "check order status" \ + /// --chat --hourly + /// helmor automation create --title "Daily digest" --prompt "summarize inbox" \ + /// --workspace my-repo/main --daily 09:00 + /// helmor automation create ... --weekly mon:09:30 + /// helmor automation create ... --every 15m + Create { + #[arg(long)] + title: String, + #[arg(long)] + prompt: String, + /// Bind to an existing chat session (each run appends a turn there). + #[arg(long, conflicts_with = "workspace")] + chat: Option, + /// Bind to a workspace (each run creates a fresh session there). + #[arg(long)] + workspace: Option, + /// Run every hour. + #[arg(long, group = "interval")] + hourly: bool, + /// Run every day at a local wall-clock time (HH:MM). + #[arg(long, group = "interval", value_name = "HH:MM")] + daily: Option, + /// Run weekly: sun|mon|…|sat followed by HH:MM (e.g. mon:09:30). + #[arg(long, group = "interval", value_name = "DAY:HH:MM")] + weekly: Option, + /// Run every N minutes/hours (e.g. 15m, 2h). + #[arg(long, group = "interval", value_name = "Nm|Nh")] + every: Option, + }, + /// Pause an automation (keeps it listed; stops firing). + Pause { automation: String }, + /// Resume a paused automation. Next run is computed from now. + Resume { automation: String }, + /// Delete an automation. The chats it wrote into are untouched. + Delete { automation: String }, + /// Fire an automation on the app's next scheduler tick (≤30s). + Run { automation: String }, +} + // --------------------------------------------------------------------------- // session // --------------------------------------------------------------------------- diff --git a/src-tauri/src/cli/automation.rs b/src-tauri/src/cli/automation.rs new file mode 100644 index 000000000..9db7e0314 --- /dev/null +++ b/src-tauri/src/cli/automation.rs @@ -0,0 +1,292 @@ +//! `helmor automation` — scheduled automations (recurring prompts). +//! +//! Mutations write the shared SQLite DB directly and nudge a running app via +//! `AutomationsChanged`. Note `run`: the CLI never dispatches a turn itself — +//! it marks the automation due (`next_run_at = now`) and lets the app's +//! scheduler tick (≤30s) fire it, so runs always stream through the app's +//! shared sidecar without stealing UI focus. With the app closed, the due +//! row simply fires on next launch — same catch-up path as a missed slot. + +use anyhow::{bail, Context, Result}; + +use crate::automations::ops; +use crate::automations::schedule::Schedule; +use crate::models::automations::{ + self, AutomationRecord, RUNS_IN_CHAT, RUNS_IN_WORKSPACE, STATUS_ACTIVE, STATUS_PAUSED, +}; +use crate::service; +use crate::ui_sync::UiMutationEvent; + +use super::args::{AutomationAction, Cli}; +use super::notify_ui_event; +use super::output; + +pub fn dispatch(action: &AutomationAction, cli: &Cli) -> Result<()> { + match action { + AutomationAction::List => list(cli), + AutomationAction::Show { automation } => show(automation, cli), + AutomationAction::Create { + title, + prompt, + chat, + workspace, + hourly, + daily, + weekly, + every, + } => create( + title, + prompt, + chat.as_deref(), + workspace.as_deref(), + *hourly, + daily.as_deref(), + weekly.as_deref(), + every.as_deref(), + cli, + ), + AutomationAction::Pause { automation } => set_status(automation, STATUS_PAUSED, cli), + AutomationAction::Resume { automation } => set_status(automation, STATUS_ACTIVE, cli), + AutomationAction::Delete { automation } => delete(automation, cli), + AutomationAction::Run { automation } => run(automation, cli), + } +} + +fn list(cli: &Cli) -> Result<()> { + let records = automations::list_automations()?; + output::print(cli, &records, |records| { + if records.is_empty() { + return "No automations. Create one with `helmor automation create`.".to_string(); + } + records + .iter() + .map(|r| { + format!( + "{} [{}] {} — {} · next {}", + r.id, + r.status, + r.title, + schedule_summary(r), + r.next_run_at + ) + }) + .collect::>() + .join("\n") + }) +} + +fn show(automation: &str, cli: &Cli) -> Result<()> { + let record = get(automation)?; + output::print(cli, &record, |r| { + format!( + "{}\n id: {}\n status: {}\n runs in: {}\n schedule: {}\n next run: {}\n last run: {}\n prompt:\n{}", + r.title, + r.id, + r.status, + match r.runs_in.as_str() { + RUNS_IN_CHAT => format!("chat {}", r.session_id.as_deref().unwrap_or("?")), + _ => format!("workspace {}", r.workspace_id.as_deref().unwrap_or("?")), + }, + schedule_summary(r), + r.next_run_at, + r.last_run_at.as_deref().unwrap_or("never"), + indent(&r.prompt), + ) + }) +} + +#[allow(clippy::too_many_arguments)] +fn create( + title: &str, + prompt: &str, + chat: Option<&str>, + workspace: Option<&str>, + hourly: bool, + daily: Option<&str>, + weekly: Option<&str>, + every: Option<&str>, + cli: &Cli, +) -> Result<()> { + let schedule = parse_schedule_flags(hourly, daily, weekly, every)?; + let (runs_in, session_id, workspace_id) = match (chat, workspace) { + (Some(session), None) => (RUNS_IN_CHAT, Some(session.to_string()), None), + (None, Some(reference)) => { + let workspace_id = service::resolve_workspace_ref(reference)?; + (RUNS_IN_WORKSPACE, None, Some(workspace_id)) + } + _ => bail!("Pass exactly one of --chat or --workspace "), + }; + + let record = ops::create_automation(ops::CreateAutomationInput { + title: title.to_string(), + prompt: prompt.to_string(), + runs_in: runs_in.to_string(), + session_id, + workspace_id, + schedule: serde_json::to_value(&schedule)?, + })?; + notify_ui_event(UiMutationEvent::AutomationsChanged); + output::print_id(cli, "automation_id", &record.id); + Ok(()) +} + +fn set_status(automation: &str, status: &str, cli: &Cli) -> Result<()> { + let record = get(automation)?; + ops::set_status(&record.id, status)?; + notify_ui_event(UiMutationEvent::AutomationsChanged); + output::print_ok( + cli, + &format!("Automation {} is now {status}.", record.title), + ); + Ok(()) +} + +fn delete(automation: &str, cli: &Cli) -> Result<()> { + let record = get(automation)?; + automations::delete_automation(&record.id)?; + notify_ui_event(UiMutationEvent::AutomationsChanged); + output::print_ok(cli, &format!("Deleted automation {}.", record.title)); + Ok(()) +} + +/// Mark the automation due now. The app's scheduler tick claims and fires it +/// (≤30s) without any UI focus change; if the app isn't running, it fires on +/// next launch via the normal catch-up path. +fn run(automation: &str, cli: &Cli) -> Result<()> { + let record = get(automation)?; + if record.status != STATUS_ACTIVE { + bail!( + "Automation {} is paused — `helmor automation resume {}` first.", + record.title, + record.id + ); + } + let now = crate::models::db::current_timestamp()?; + automations::set_automation_status(&record.id, STATUS_ACTIVE, Some(&now))?; + notify_ui_event(UiMutationEvent::AutomationsChanged); + let human = if service::is_app_running() { + format!( + "Automation {} will run within ~30s (next scheduler tick).", + record.title + ) + } else { + format!( + "Automation {} is due now — Helmor isn't running, so it runs on next launch.", + record.title + ) + }; + output::print_ok(cli, &human); + Ok(()) +} + +fn get(reference: &str) -> Result { + automations::get_automation(reference)? + .with_context(|| format!("Automation {reference} not found — try `helmor automation list`")) +} + +fn schedule_summary(record: &AutomationRecord) -> String { + serde_json::from_value::(record.schedule.clone()) + .map(|s| s.summary()) + .unwrap_or_else(|_| "invalid schedule".to_string()) +} + +fn parse_schedule_flags( + hourly: bool, + daily: Option<&str>, + weekly: Option<&str>, + every: Option<&str>, +) -> Result { + if hourly { + return Ok(Schedule::Hourly); + } + if let Some(time) = daily { + return Ok(Schedule::Daily { + time: time.to_string(), + }); + } + if let Some(spec) = weekly { + // "mon:09:30" — day prefix, rest is HH:MM. + let (day, time) = spec + .split_once(':') + .context("Invalid --weekly — expected DAY:HH:MM (e.g. mon:09:30)")?; + let weekday = match day.to_ascii_lowercase().as_str() { + "sun" | "sunday" => 0, + "mon" | "monday" => 1, + "tue" | "tuesday" => 2, + "wed" | "wednesday" => 3, + "thu" | "thursday" => 4, + "fri" | "friday" => 5, + "sat" | "saturday" => 6, + other => bail!("Invalid weekday {other:?} — expected sun|mon|tue|wed|thu|fri|sat"), + }; + return Ok(Schedule::Weekly { + weekday, + time: time.to_string(), + }); + } + if let Some(spec) = every { + let spec = spec.trim().to_ascii_lowercase(); + let (digits, unit) = spec.split_at(spec.len().saturating_sub(1)); + let amount: u32 = digits + .parse() + .with_context(|| format!("Invalid --every {spec:?} — expected e.g. 15m or 2h"))?; + let unit = match unit { + "m" => crate::automations::schedule::EveryUnit::Minutes, + "h" => crate::automations::schedule::EveryUnit::Hours, + _ => bail!("Invalid --every {spec:?} — unit must be m or h"), + }; + return Ok(Schedule::Every { amount, unit }); + } + bail!("Pass exactly one of --hourly, --daily HH:MM, --weekly DAY:HH:MM, --every Nm|Nh") +} + +fn indent(text: &str) -> String { + text.lines() + .map(|line| format!(" {line}")) + .collect::>() + .join("\n") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::automations::schedule::EveryUnit; + + #[test] + fn schedule_flags_parse() { + assert_eq!( + parse_schedule_flags(true, None, None, None).unwrap(), + Schedule::Hourly + ); + assert_eq!( + parse_schedule_flags(false, Some("09:00"), None, None).unwrap(), + Schedule::Daily { + time: "09:00".into() + } + ); + assert_eq!( + parse_schedule_flags(false, None, Some("mon:09:30"), None).unwrap(), + Schedule::Weekly { + weekday: 1, + time: "09:30".into() + } + ); + assert_eq!( + parse_schedule_flags(false, None, None, Some("15m")).unwrap(), + Schedule::Every { + amount: 15, + unit: EveryUnit::Minutes + } + ); + assert_eq!( + parse_schedule_flags(false, None, None, Some("2h")).unwrap(), + Schedule::Every { + amount: 2, + unit: EveryUnit::Hours + } + ); + assert!(parse_schedule_flags(false, None, None, None).is_err()); + assert!(parse_schedule_flags(false, None, Some("noday"), None).is_err()); + assert!(parse_schedule_flags(false, None, None, Some("15x")).is_err()); + } +} diff --git a/src-tauri/src/cli/mod.rs b/src-tauri/src/cli/mod.rs index d865e4610..7445ec15c 100644 --- a/src-tauri/src/cli/mod.rs +++ b/src-tauri/src/cli/mod.rs @@ -12,6 +12,7 @@ //! / human formatting) and `refs` (UUID / name disambiguation). pub mod args; +mod automation; mod conductor; mod data; mod files; @@ -153,6 +154,7 @@ fn dispatch(cli: &Cli) -> Result<()> { C::Repo { action } => repo::dispatch(action, cli), C::Workspace { action } => workspace::dispatch(action, cli), C::Session { action } => session::dispatch(action, cli), + C::Automation { action } => automation::dispatch(action, cli), C::Files { action } => files::dispatch(action, cli), C::Send(opts) => send::send(opts, cli), C::Models { action } => send::dispatch_models(action, cli), diff --git a/src-tauri/src/commands/automation_commands.rs b/src-tauri/src/commands/automation_commands.rs new file mode 100644 index 000000000..b79c08529 --- /dev/null +++ b/src-tauri/src/commands/automation_commands.rs @@ -0,0 +1,113 @@ +//! Tauri commands for automations — IPC glue over `automations::ops`. +//! Every mutation publishes `UiMutationEvent::AutomationsChanged` so the +//! frontend invalidates the `automations` query through the global bridge. + +use serde::Deserialize; +use tauri::AppHandle; + +use super::common::{run_blocking, CmdResult}; +use crate::automations::ops; +use crate::models::automations::AutomationRecord; +use crate::ui_sync::{self, UiMutationEvent}; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateAutomationRequest { + pub title: String, + pub prompt: String, + pub runs_in: String, + pub session_id: Option, + pub workspace_id: Option, + pub schedule: serde_json::Value, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateAutomationRequest { + pub id: String, + pub title: Option, + pub prompt: Option, + pub runs_in: Option, + pub session_id: Option, + pub workspace_id: Option, + pub schedule: Option, +} + +#[tauri::command] +pub async fn list_automations() -> CmdResult> { + run_blocking(crate::models::automations::list_automations).await +} + +#[tauri::command] +pub async fn create_automation( + app: AppHandle, + request: CreateAutomationRequest, +) -> CmdResult { + let record = run_blocking(move || { + ops::create_automation(ops::CreateAutomationInput { + title: request.title, + prompt: request.prompt, + runs_in: request.runs_in, + session_id: request.session_id, + workspace_id: request.workspace_id, + schedule: request.schedule, + }) + }) + .await?; + ui_sync::publish(&app, UiMutationEvent::AutomationsChanged); + Ok(record) +} + +#[tauri::command] +pub async fn update_automation( + app: AppHandle, + request: UpdateAutomationRequest, +) -> CmdResult { + let record = run_blocking(move || { + ops::update_automation( + &request.id, + ops::UpdateAutomationInput { + title: request.title, + prompt: request.prompt, + runs_in: request.runs_in, + session_id: request.session_id, + workspace_id: request.workspace_id, + schedule: request.schedule, + }, + ) + }) + .await?; + ui_sync::publish(&app, UiMutationEvent::AutomationsChanged); + Ok(record) +} + +#[tauri::command] +pub async fn delete_automation(app: AppHandle, automation_id: String) -> CmdResult<()> { + run_blocking(move || crate::models::automations::delete_automation(&automation_id)).await?; + ui_sync::publish(&app, UiMutationEvent::AutomationsChanged); + Ok(()) +} + +/// Pause (`paused`) or resume (`active`). Resume recomputes `next_run_at` +/// from now — no immediate fire. +#[tauri::command] +pub async fn set_automation_status( + app: AppHandle, + automation_id: String, + status: String, +) -> CmdResult { + let record = run_blocking(move || ops::set_status(&automation_id, &status)).await?; + ui_sync::publish(&app, UiMutationEvent::AutomationsChanged); + Ok(record) +} + +/// Dispatch immediately; records `last_run_at` only — the schedule is +/// untouched. Returns the session id the run landed in so the frontend can +/// offer a "view chat" jump. +#[tauri::command] +pub async fn run_automation_now(app: AppHandle, automation_id: String) -> CmdResult { + let handle = app.clone(); + let session_id = run_blocking(move || ops::run_now(&handle, &automation_id)).await?; + ui_sync::publish(&app, UiMutationEvent::AutomationsChanged); + Ok(session_id) +} diff --git a/src-tauri/src/commands/mod.rs b/src-tauri/src/commands/mod.rs index d8918683c..20601cb5d 100644 --- a/src-tauri/src/commands/mod.rs +++ b/src-tauri/src/commands/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod automation_commands; mod common; pub(crate) mod companion_commands; pub(crate) mod conductor_commands; diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index bc80fca5d..80a0c5059 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -1,4 +1,5 @@ pub mod agents; +pub mod automations; pub mod cli; pub(crate) mod codex_config; pub(crate) mod commands; @@ -485,6 +486,11 @@ pub fn run() { // Triage: fetcher + auto-fire tick on the same 5-min thread. triage::fetcher::spawn_scheduler(app.handle().clone()); + // Automations: stateless 30s poll over `automations.next_run_at`. + // Overdue rows (app was closed / machine slept) catch up once on + // the first tick after the startup delay. + automations::scheduler::spawn_scheduler(app.handle().clone()); + // Mobile browser companion (experimental, opt-in via env). Starts a // loopback-bound HTTP/SSE server that mirrors the IPC surface so the // same frontend can be served to a phone browser. Default app @@ -578,6 +584,12 @@ pub fn run() { agents::list_slash_commands, agents::prewarm_slash_commands_for_workspace, agents::prewarm_slash_commands_for_repo, + commands::automation_commands::list_automations, + commands::automation_commands::create_automation, + commands::automation_commands::update_automation, + commands::automation_commands::delete_automation, + commands::automation_commands::set_automation_status, + commands::automation_commands::run_automation_now, commands::workspace_commands::prepare_archive_workspace, commands::workspace_commands::start_archive_workspace, commands::workspace_commands::validate_archive_workspace, diff --git a/src-tauri/src/models/automations.rs b/src-tauri/src/models/automations.rs new file mode 100644 index 000000000..a080a4e41 --- /dev/null +++ b/src-tauri/src/models/automations.rs @@ -0,0 +1,365 @@ +//! Persistence for scheduled automations. +//! +//! SQLite is the single source of truth for scheduling: `next_run_at` and +//! `status` live here, and the in-process scheduler is a stateless poll loop +//! over this table. The two scheduler primitives (`due_automations`, +//! `claim_automation`) implement claim-before-dispatch: a CAS-style UPDATE on +//! `next_run_at` guarantees at-most-once firing per slot across restarts, +//! crashes, and racing ticks. Timestamps use the `db::current_timestamp()` +//! RFC3339-UTC-millis format, which orders chronologically as plain strings. + +use anyhow::{Context, Result}; +use rusqlite::params; +use serde::Serialize; + +use crate::models::db; + +pub const RUNS_IN_CHAT: &str = "chat"; +pub const RUNS_IN_WORKSPACE: &str = "workspace"; +pub const STATUS_ACTIVE: &str = "active"; +pub const STATUS_PAUSED: &str = "paused"; + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AutomationRecord { + pub id: String, + pub title: String, + pub prompt: String, + /// `chat` (append runs to the bound session) or `workspace` (create a new + /// session per run in the bound workspace). + pub runs_in: String, + pub session_id: Option, + pub workspace_id: Option, + /// Schedule spec as JSON (see `automations::schedule::Schedule`). Stored + /// opaque here so the persistence layer stays independent of domain types. + pub schedule: serde_json::Value, + pub status: String, + pub next_run_at: String, + pub last_run_at: Option, + pub created_at: String, + pub updated_at: String, +} + +pub struct NewAutomation<'a> { + pub title: &'a str, + pub prompt: &'a str, + pub runs_in: &'a str, + pub session_id: Option<&'a str>, + pub workspace_id: Option<&'a str>, + pub schedule: &'a serde_json::Value, + pub next_run_at: &'a str, +} + +const SELECT_COLUMNS: &str = "id, title, prompt, runs_in, session_id, workspace_id, schedule, \ + status, next_run_at, last_run_at, created_at, updated_at"; + +fn record_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(AutomationRecord, String)> { + let schedule_raw: String = row.get(6)?; + Ok(( + AutomationRecord { + id: row.get(0)?, + title: row.get(1)?, + prompt: row.get(2)?, + runs_in: row.get(3)?, + session_id: row.get(4)?, + workspace_id: row.get(5)?, + schedule: serde_json::Value::Null, + status: row.get(7)?, + next_run_at: row.get(8)?, + last_run_at: row.get(9)?, + created_at: row.get(10)?, + updated_at: row.get(11)?, + }, + schedule_raw, + )) +} + +fn parse_schedule( + (mut record, schedule_raw): (AutomationRecord, String), +) -> Result { + record.schedule = serde_json::from_str(&schedule_raw).with_context(|| { + format!( + "automation {} has unparseable schedule JSON: {schedule_raw}", + record.id + ) + })?; + Ok(record) +} + +/// List all automations, newest first. +pub fn list_automations() -> Result> { + let conn = db::read_conn()?; + let mut stmt = conn.prepare(&format!( + "SELECT {SELECT_COLUMNS} FROM automations ORDER BY created_at DESC" + ))?; + let rows = stmt + .query_map([], record_from_row)? + .collect::>>()?; + rows.into_iter().map(parse_schedule).collect() +} + +pub fn get_automation(id: &str) -> Result> { + let conn = db::read_conn()?; + let mut stmt = conn.prepare(&format!( + "SELECT {SELECT_COLUMNS} FROM automations WHERE id = ?1" + ))?; + let mut rows = stmt + .query_map(params![id], record_from_row)? + .collect::>>()?; + match rows.pop() { + Some(raw) => Ok(Some(parse_schedule(raw)?)), + None => Ok(None), + } +} + +pub fn insert_automation(new: &NewAutomation<'_>) -> Result { + let id = uuid::Uuid::new_v4().to_string(); + let now = db::current_timestamp()?; + let schedule_raw = new.schedule.to_string(); + let conn = db::write_conn()?; + conn.execute( + "INSERT INTO automations \ + (id, title, prompt, runs_in, session_id, workspace_id, schedule, status, next_run_at, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)", + params![ + id, + new.title, + new.prompt, + new.runs_in, + new.session_id, + new.workspace_id, + schedule_raw, + STATUS_ACTIVE, + new.next_run_at, + now, + ], + )?; + Ok(AutomationRecord { + id, + title: new.title.to_string(), + prompt: new.prompt.to_string(), + runs_in: new.runs_in.to_string(), + session_id: new.session_id.map(str::to_string), + workspace_id: new.workspace_id.map(str::to_string), + schedule: new.schedule.clone(), + status: STATUS_ACTIVE.to_string(), + next_run_at: new.next_run_at.to_string(), + last_run_at: None, + created_at: now.clone(), + updated_at: now, + }) +} + +/// Write back every editable field of a (read-modify-write) record. +/// Callers recompute `next_run_at` before saving when the schedule changed. +pub fn update_automation_record(record: &AutomationRecord) -> Result<()> { + let now = db::current_timestamp()?; + let conn = db::write_conn()?; + conn.execute( + "UPDATE automations SET title = ?2, prompt = ?3, runs_in = ?4, session_id = ?5, \ + workspace_id = ?6, schedule = ?7, status = ?8, next_run_at = ?9, updated_at = ?10 \ + WHERE id = ?1", + params![ + record.id, + record.title, + record.prompt, + record.runs_in, + record.session_id, + record.workspace_id, + record.schedule.to_string(), + record.status, + record.next_run_at, + now, + ], + )?; + Ok(()) +} + +/// Pause/resume. Resume passes a freshly computed `next_run_at` (from now) so +/// a long-paused automation never fires immediately on resume. +pub fn set_automation_status(id: &str, status: &str, next_run_at: Option<&str>) -> Result<()> { + let now = db::current_timestamp()?; + let conn = db::write_conn()?; + match next_run_at { + Some(next) => conn.execute( + "UPDATE automations SET status = ?2, next_run_at = ?3, updated_at = ?4 WHERE id = ?1", + params![id, status, next, now], + )?, + None => conn.execute( + "UPDATE automations SET status = ?2, updated_at = ?3 WHERE id = ?1", + params![id, status, now], + )?, + }; + Ok(()) +} + +/// Record a manual "Run now" without touching the schedule. +pub fn set_last_run_at(id: &str, last_run_at: &str) -> Result<()> { + let conn = db::write_conn()?; + conn.execute( + "UPDATE automations SET last_run_at = ?2, updated_at = ?2 WHERE id = ?1", + params![id, last_run_at], + )?; + Ok(()) +} + +pub fn delete_automation(id: &str) -> Result<()> { + let conn = db::write_conn()?; + conn.execute("DELETE FROM automations WHERE id = ?1", params![id])?; + Ok(()) +} + +// ── Scheduler primitives ──────────────────────────────────────────────────── + +/// Active automations whose `next_run_at` is due at `now`, oldest first. +pub fn due_automations(now: &str) -> Result> { + let conn = db::read_conn()?; + let mut stmt = conn.prepare(&format!( + "SELECT {SELECT_COLUMNS} FROM automations \ + WHERE status = ?1 AND next_run_at <= ?2 ORDER BY next_run_at ASC" + ))?; + let rows = stmt + .query_map(params![STATUS_ACTIVE, now], record_from_row)? + .collect::>>()?; + rows.into_iter().map(parse_schedule).collect() +} + +/// Claim a due slot: CAS on `next_run_at` so exactly one claimer wins, even +/// across racing ticks or instances. Sets `last_run_at = now` as part of the +/// claim. Returns false when someone else already claimed (or the automation +/// was edited/paused since it was read). +pub fn claim_automation( + id: &str, + old_next_run_at: &str, + new_next_run_at: &str, + now: &str, +) -> Result { + let conn = db::write_conn()?; + let changed = conn.execute( + "UPDATE automations SET next_run_at = ?3, last_run_at = ?4, updated_at = ?4 \ + WHERE id = ?1 AND next_run_at = ?2 AND status = ?5", + params![id, old_next_run_at, new_next_run_at, now, STATUS_ACTIVE], + )?; + Ok(changed == 1) +} + +/// Roll back a claim whose dispatch was rejected (e.g. the bound session was +/// concurrently busy). CAS-guarded on the claimed value so a concurrent edit +/// is never stomped; `last_run_at` is restored because the run never happened. +pub fn unclaim_automation( + id: &str, + claimed_next_run_at: &str, + previous_next_run_at: &str, + previous_last_run_at: Option<&str>, +) -> Result<()> { + let now = db::current_timestamp()?; + let conn = db::write_conn()?; + conn.execute( + "UPDATE automations SET next_run_at = ?3, last_run_at = ?4, updated_at = ?5 \ + WHERE id = ?1 AND next_run_at = ?2", + params![ + id, + claimed_next_run_at, + previous_next_run_at, + previous_last_run_at, + now + ], + )?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn insert_sample(title: &str, next_run_at: &str) -> AutomationRecord { + insert_automation(&NewAutomation { + title, + prompt: "check the thing", + runs_in: RUNS_IN_CHAT, + session_id: Some("session-1"), + workspace_id: None, + schedule: &serde_json::json!({"kind": "hourly"}), + next_run_at, + }) + .unwrap() + } + + #[test] + fn crud_roundtrip() { + let _env = crate::testkit::TestEnv::new("automations-crud"); + + let created = insert_sample("Order monitor", "2026-01-01T00:00:00.000Z"); + let listed = list_automations().unwrap(); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].id, created.id); + assert_eq!(listed[0].schedule, serde_json::json!({"kind": "hourly"})); + assert_eq!(listed[0].status, STATUS_ACTIVE); + + let mut record = get_automation(&created.id).unwrap().unwrap(); + record.title = "Renamed".into(); + record.schedule = serde_json::json!({"kind": "daily", "time": "09:00"}); + update_automation_record(&record).unwrap(); + let reloaded = get_automation(&created.id).unwrap().unwrap(); + assert_eq!(reloaded.title, "Renamed"); + assert_eq!(reloaded.schedule["kind"], "daily"); + + delete_automation(&created.id).unwrap(); + assert!(get_automation(&created.id).unwrap().is_none()); + } + + #[test] + fn due_query_excludes_paused_and_future() { + let _env = crate::testkit::TestEnv::new("automations-due"); + + let due = insert_sample("due", "2020-01-01T00:00:00.000Z"); + let paused = insert_sample("paused", "2020-01-01T00:00:00.000Z"); + set_automation_status(&paused.id, STATUS_PAUSED, None).unwrap(); + insert_sample("future", "2999-01-01T00:00:00.000Z"); + + let now = db::current_timestamp().unwrap(); + let found = due_automations(&now).unwrap(); + assert_eq!(found.len(), 1); + assert_eq!(found[0].id, due.id); + } + + #[test] + fn claim_is_exactly_once_and_unclaim_restores() { + let _env = crate::testkit::TestEnv::new("automations-claim"); + + let old_next = "2020-01-01T00:00:00.000Z"; + let record = insert_sample("claimable", old_next); + let now = db::current_timestamp().unwrap(); + let new_next = "2999-01-01T00:00:00.000Z"; + + // Two racing claims with the same observed value: exactly one wins. + assert!(claim_automation(&record.id, old_next, new_next, &now).unwrap()); + assert!(!claim_automation(&record.id, old_next, new_next, &now).unwrap()); + + let claimed = get_automation(&record.id).unwrap().unwrap(); + assert_eq!(claimed.next_run_at, new_next); + assert_eq!(claimed.last_run_at.as_deref(), Some(now.as_str())); + + // Rolling back a rejected dispatch restores both fields. + unclaim_automation(&record.id, new_next, old_next, None).unwrap(); + let restored = get_automation(&record.id).unwrap().unwrap(); + assert_eq!(restored.next_run_at, old_next); + assert_eq!(restored.last_run_at, None); + + // Unclaim is CAS-guarded: a stale rollback never stomps a newer value. + unclaim_automation(&record.id, new_next, "1999-01-01T00:00:00.000Z", None).unwrap(); + let unchanged = get_automation(&record.id).unwrap().unwrap(); + assert_eq!(unchanged.next_run_at, old_next); + } + + #[test] + fn paused_claim_is_rejected() { + let _env = crate::testkit::TestEnv::new("automations-claim-paused"); + + let old_next = "2020-01-01T00:00:00.000Z"; + let record = insert_sample("paused-claim", old_next); + set_automation_status(&record.id, STATUS_PAUSED, None).unwrap(); + let now = db::current_timestamp().unwrap(); + assert!(!claim_automation(&record.id, old_next, "2999-01-01T00:00:00.000Z", &now).unwrap()); + } +} diff --git a/src-tauri/src/models/mod.rs b/src-tauri/src/models/mod.rs index 4f241b92b..1291b9473 100644 --- a/src-tauri/src/models/mod.rs +++ b/src-tauri/src/models/mod.rs @@ -1,3 +1,4 @@ +pub mod automations; pub mod db; pub mod paired_devices; pub mod repos; diff --git a/src-tauri/src/models/sessions.rs b/src-tauri/src/models/sessions.rs index 52518f881..e54d1a6a3 100644 --- a/src-tauri/src/models/sessions.rs +++ b/src-tauri/src/models/sessions.rs @@ -652,6 +652,28 @@ pub fn get_session_model(session_id: &str) -> Result> { Ok(model.filter(|s| !s.is_empty())) } +/// (workspace_id, permission_mode) for dispatching a background turn into an +/// existing session (automations). `Ok(None)` when the session row is gone — +/// callers treat that as "target missing", not an error. +pub fn get_session_workspace_and_permission( + session_id: &str, +) -> Result, String)>> { + let conn = db::read_conn()?; + conn.query_row( + "SELECT workspace_id, permission_mode FROM sessions WHERE id = ?1", + [session_id], + |row| { + Ok(( + row.get::<_, Option>(0)?, + row.get::<_, Option>(1)? + .unwrap_or_else(|| "default".to_string()), + )) + }, + ) + .optional() + .with_context(|| format!("Failed to read workspace+permission for session {session_id}")) +} + /// (model, agent_type) for a session — provider hint for `resolve_model` /// when ids like `"default"` are ambiguous. pub fn get_session_model_and_provider( diff --git a/src-tauri/src/pipeline/adapter/codex_items.rs b/src-tauri/src/pipeline/adapter/codex_items.rs index 6fc148edd..9653b25c6 100644 --- a/src-tauri/src/pipeline/adapter/codex_items.rs +++ b/src-tauri/src/pipeline/adapter/codex_items.rs @@ -49,6 +49,7 @@ pub(super) fn render_item_completed( reason: Some("stop".to_string()), }), streaming: None, + source: None, }); } } @@ -109,6 +110,7 @@ fn render_command_execution( reason: Some("stop".to_string()), }), streaming: None, + source: None, }); } @@ -136,6 +138,7 @@ fn render_context_compaction( })], status: None, streaming: None, + source: None, }); } @@ -154,6 +157,7 @@ fn render_todo_list(msg: &IntermediateMessage, item: &Value, result: &mut Vec ThreadMessag })], status: None, streaming: None, + source: None, } } @@ -40,6 +41,7 @@ pub(super) fn make_turn_result_system(msg: &IntermediateMessage, text: &str) -> })], status: None, streaming: None, + source: None, } } @@ -54,6 +56,7 @@ pub(super) fn make_system_notice( content: vec![ExtendedMessagePart::Basic(part)], status: None, streaming: None, + source: None, } } diff --git a/src-tauri/src/pipeline/adapter/mod.rs b/src-tauri/src/pipeline/adapter/mod.rs index de662ff15..29857269b 100644 --- a/src-tauri/src/pipeline/adapter/mod.rs +++ b/src-tauri/src/pipeline/adapter/mod.rs @@ -265,6 +265,7 @@ fn convert_flat(messages: &[IntermediateMessage]) -> (Vec, Wo })], status: None, streaming: None, + source: None, }); } i += 1; @@ -308,6 +309,7 @@ fn convert_flat(messages: &[IntermediateMessage]) -> (Vec, Wo content, status: None, streaming: if msg.is_streaming { Some(true) } else { None }, + source: None, }); } i += 1; @@ -391,6 +393,7 @@ fn convert_flat(messages: &[IntermediateMessage]) -> (Vec, Wo content: parts.into_iter().map(ExtendedMessagePart::Basic).collect(), status: Some(map_stop_reason(parsed)), streaming: if is_streaming { Some(true) } else { None }, + source: None, }); // Re-emit any system messages we skipped over so they still @@ -440,6 +443,10 @@ fn convert_flat(messages: &[IntermediateMessage]) -> (Vec, Wo }; let files = extract_strs("files"); let images = extract_strs("images"); + let source = parsed + .and_then(|p| p.get("source")) + .and_then(Value::as_str) + .map(str::to_string); let parts = grouping::split_user_text_with_files(&text, &files, &images, &msg.id); result.push(ThreadMessageLike { role: MessageRole::User, @@ -448,6 +455,7 @@ fn convert_flat(messages: &[IntermediateMessage]) -> (Vec, Wo content: parts.into_iter().map(ExtendedMessagePart::Basic).collect(), status: None, streaming: None, + source, }); i += 1; continue; @@ -627,6 +635,7 @@ fn convert_user_type_msg( })], status: None, streaming: None, + source: None, }); } return; @@ -799,5 +808,6 @@ fn convert_exit_plan_mode_msg( })], status: None, streaming: None, + source: None, } } diff --git a/src-tauri/src/pipeline/collapse.rs b/src-tauri/src/pipeline/collapse.rs index 4e64d9c75..c2891777b 100644 --- a/src-tauri/src/pipeline/collapse.rs +++ b/src-tauri/src/pipeline/collapse.rs @@ -908,6 +908,7 @@ mod tests { ], status: None, streaming: None, + source: None, }]; collapse_pass(&mut messages); assert_eq!(messages[0].content.len(), 3); // text + Agent + text diff --git a/src-tauri/src/pipeline/types.rs b/src-tauri/src/pipeline/types.rs index c85d2a01c..6beebb91d 100644 --- a/src-tauri/src/pipeline/types.rs +++ b/src-tauri/src/pipeline/types.rs @@ -456,6 +456,11 @@ pub struct ThreadMessageLike { /// True when this message is still being streamed from an agent. #[serde(skip_serializing_if = "Option::is_none")] pub streaming: Option, + /// Who initiated a user message: `None` = human, `Some("automation")` = + /// the automations scheduler ("Sent via automation" badge). Absent from + /// the wire when unset so historical snapshots stay byte-identical. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub source: Option, } // --------------------------------------------------------------------------- diff --git a/src-tauri/src/schema.rs b/src-tauri/src/schema.rs index 4e5d52756..47c2b966e 100644 --- a/src-tauri/src/schema.rs +++ b/src-tauri/src/schema.rs @@ -1212,6 +1212,27 @@ CREATE TABLE IF NOT EXISTS paired_devices ( revoked_at TEXT ); +-- Scheduled automations: periodically inject a fixed prompt into a session +-- and run a normal agent turn. SQLite is the single source of truth for the +-- schedule — the in-process scheduler is a stateless poll loop over +-- `next_run_at`, so restarts/sleep just mean a late tick sees overdue rows. +-- Timestamps use the db::current_timestamp() RFC3339-UTC-millis format, +-- which compares chronologically as plain strings. +CREATE TABLE IF NOT EXISTS automations ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + prompt TEXT NOT NULL, + runs_in TEXT NOT NULL, + session_id TEXT, + workspace_id TEXT, + schedule TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'active', + next_run_at TEXT NOT NULL, + last_run_at TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); + -- Indexes CREATE INDEX IF NOT EXISTS idx_session_messages_sent_at ON session_messages(session_id, sent_at); CREATE INDEX IF NOT EXISTS idx_sessions_workspace_id ON sessions(workspace_id); @@ -1219,6 +1240,7 @@ CREATE INDEX IF NOT EXISTS idx_workspaces_repository_id ON workspaces(repository CREATE INDEX IF NOT EXISTS idx_runtime_processes_ended_at ON runtime_processes(ended_at); CREATE INDEX IF NOT EXISTS idx_triage_candidate_open ON triage_candidate(source_time DESC) WHERE decision IS NULL; CREATE INDEX IF NOT EXISTS idx_triage_candidate_source ON triage_candidate(source, source_time DESC); +CREATE INDEX IF NOT EXISTS idx_automations_due ON automations(status, next_run_at); -- idx_workspaces_kind + idx_workspaces_triage_source are created in -- `run_migrations` (after the ALTERs on upgraded DBs). diff --git a/src-tauri/src/ui_sync/events.rs b/src-tauri/src/ui_sync/events.rs index c59c2497b..2c35c15dc 100644 --- a/src-tauri/src/ui_sync/events.rs +++ b/src-tauri/src/ui_sync/events.rs @@ -122,6 +122,10 @@ pub enum UiMutationEvent { /// The mobile-companion paired-device list changed (paired or revoked). /// Frontends invalidate the `pairedDevices` query. PairedDevicesChanged, + /// An automation was created/edited/deleted/paused, or the scheduler + /// fired a run (shifting next/last run). Frontends invalidate the + /// `automations` query. + AutomationsChanged, /// "Open in Helmor" from the quick panel. Only the MAIN window acts on /// this (navigates to the workspace/session); the quick panel ignores it. WorkspaceRevealRequested { @@ -278,6 +282,7 @@ mod tests { UiMutationEvent::ActiveStreamsChanged, "activeStreamsChanged", ), + (UiMutationEvent::AutomationsChanged, "automationsChanged"), ]; for (event, expected) in cases { let json = serde_json::to_value(&event).unwrap(); diff --git a/src-tauri/tests/common/builders.rs b/src-tauri/tests/common/builders.rs index 4bb988fa0..829233119 100644 --- a/src-tauri/tests/common/builders.rs +++ b/src-tauri/tests/common/builders.rs @@ -75,6 +75,17 @@ pub fn user_prompt_with_files_and_images( make_record(id, "user", &serde_json::to_string(&parsed).unwrap()) } +/// Automation-initiated prompt. Same shape as `user_prompt` but with the +/// `source` marker written by `persist_user_message` for scheduler turns. +pub fn user_prompt_from_automation(id: &str, text: &str) -> HistoricalRecord { + let parsed = json!({ + "type": "user_prompt", + "text": text, + "source": "automation", + }); + make_record(id, "user", &serde_json::to_string(&parsed).unwrap()) +} + /// Mid-turn steer prompt. Same shape as `user_prompt` but with the /// `steer: true` marker written by `persist_steer_message`. pub fn user_prompt_steer(id: &str, text: &str) -> HistoricalRecord { diff --git a/src-tauri/tests/common/normalize.rs b/src-tauri/tests/common/normalize.rs index 5431bfbca..669022a7c 100644 --- a/src-tauri/tests/common/normalize.rs +++ b/src-tauri/tests/common/normalize.rs @@ -18,6 +18,10 @@ pub struct NormThreadMessage { pub content: Vec, pub status: Option, pub streaming: Option, + /// Initiator marker (`"automation"`). Skipped when absent so the + /// pre-existing snapshot corpus stays byte-identical. + #[serde(skip_serializing_if = "Option::is_none")] + pub source: Option, } #[derive(Debug, Serialize)] @@ -292,6 +296,7 @@ pub fn normalize_message(msg: &ThreadMessageLike) -> NormThreadMessage { reason: s.reason.clone(), }), streaming: msg.streaming, + source: msg.source.clone(), } } diff --git a/src-tauri/tests/pipeline_scenarios.rs b/src-tauri/tests/pipeline_scenarios.rs index e1c486b6b..2b9de1bfe 100644 --- a/src-tauri/tests/pipeline_scenarios.rs +++ b/src-tauri/tests/pipeline_scenarios.rs @@ -107,6 +107,18 @@ fn user_prompt_wrapped() { assert_yaml_snapshot!(run_normalized(msgs)); } +#[test] +fn user_prompt_with_automation_source() { + // Scheduler-initiated prompt: persist_user_message adds + // `"source":"automation"`, which must surface as + // `ThreadMessageLike.source` so the chat renders the + // "Sent via automation" badge. Human prompts (no `source` key) must + // keep their exact wire shape — covered by every other snapshot in + // this file staying byte-identical. + let msgs = vec![user_prompt_from_automation("u1", "check the order status")]; + assert_yaml_snapshot!(run_normalized(msgs)); +} + #[test] fn user_prompt_with_brace_content() { // Latent-bug regression: prompts that happened to start with `{` were diff --git a/src-tauri/tests/snapshots/pipeline_scenarios__user_prompt_with_automation_source.snap b/src-tauri/tests/snapshots/pipeline_scenarios__user_prompt_with_automation_source.snap new file mode 100644 index 000000000..0f42e1a3e --- /dev/null +++ b/src-tauri/tests/snapshots/pipeline_scenarios__user_prompt_with_automation_source.snap @@ -0,0 +1,13 @@ +--- +source: tests/pipeline_scenarios.rs +expression: run_normalized(msgs) +--- +- role: user + id: msg-1 + content_length: 1 + content: + - type: text + text: check the order status + status: ~ + streaming: ~ + source: automation diff --git a/src/features/automations/automation-detail.tsx b/src/features/automations/automation-detail.tsx new file mode 100644 index 000000000..591b49a79 --- /dev/null +++ b/src/features/automations/automation-detail.tsx @@ -0,0 +1,256 @@ +import { ChevronRight, Pause, Play, Trash2 } from "lucide-react"; +import { type ReactNode, useState } from "react"; +import { Button } from "@/components/ui/button"; +import { ConfirmDialog } from "@/components/ui/confirm-dialog"; +import { Textarea } from "@/components/ui/textarea"; +import type { Automation } from "@/lib/api"; +import { cn } from "@/lib/utils"; +import { IntervalPicker } from "./interval-picker"; +import { formatRunTime, statusDotClass, statusLabel } from "./schedule"; +import { useAutomationMutations } from "./use-automation-mutations"; +import { + useSessionOptions, + useWorkspaceOptions, +} from "./use-automation-targets"; + +function SidebarGroup({ + title, + children, +}: { + title: string; + children: ReactNode; +}) { + return ( +
+

+ {title} +

+
{children}
+
+ ); +} + +function SidebarRow({ + label, + children, +}: { + label: string; + children: ReactNode; +}) { + return ( +
+ {label} + + {children} + +
+ ); +} + +/** In-page detail view (no route). Mounted with `key={automation.id}` so the + * title/prompt drafts reset whenever a different automation opens. */ +export function AutomationDetail({ + automation, + onBack, + onOpenSession, +}: { + automation: Automation; + onBack: () => void; + onOpenSession: (workspaceId: string, sessionId: string) => void; +}) { + const [titleDraft, setTitleDraft] = useState(automation.title); + const [promptDraft, setPromptDraft] = useState(automation.prompt); + const [confirmDelete, setConfirmDelete] = useState(false); + + const { update, remove, setStatus, runNow } = useAutomationMutations(); + const workspaces = useWorkspaceOptions(); + const sessions = useSessionOptions( + automation.runsIn === "chat" ? automation.workspaceId : null, + ); + + const workspaceName = automation.workspaceId + ? (workspaces.find((workspace) => workspace.id === automation.workspaceId) + ?.title ?? "Unknown workspace") + : "—"; + const sessionName = automation.sessionId + ? (sessions.find((session) => session.id === automation.sessionId)?.title ?? + "Unknown chat") + : "—"; + + const promptDirty = promptDraft !== automation.prompt; + const paused = automation.status === "paused"; + + const saveTitle = () => { + const next = titleDraft.trim(); + if (next === "" || next === automation.title) { + setTitleDraft(automation.title); + return; + } + update.mutate({ id: automation.id, title: next }); + }; + + const savePrompt = () => { + if (!promptDirty || promptDraft.trim() === "") return; + update.mutate({ id: automation.id, prompt: promptDraft }); + }; + + const handleRunNow = () => { + runNow.mutate(automation.id, { + onSuccess: (sessionId) => { + if (automation.workspaceId) { + onOpenSession(automation.workspaceId, sessionId); + } + }, + }); + }; + + return ( +
+
+
+ +
+ + + +
+
+ +
+
+ setTitleDraft(event.target.value)} + onBlur={saveTitle} + aria-label="Automation title" + className="w-full bg-transparent text-heading font-semibold text-foreground outline-none placeholder:text-muted-foreground/60" + /> +