diff --git a/agents-api.php b/agents-api.php index 94ccde5..96d29ca 100644 --- a/agents-api.php +++ b/agents-api.php @@ -162,6 +162,7 @@ require_once AGENTS_API_PATH . 'src/Runtime/class-wp-agent-runtime-tool-continuation.php'; require_once AGENTS_API_PATH . 'src/Runtime/class-wp-agent-runtime-tool-lifecycle.php'; require_once AGENTS_API_PATH . 'src/Runtime/class-wp-agent-conversation-result.php'; +require_once AGENTS_API_PATH . 'src/Runtime/class-wp-agent-run-control.php'; require_once AGENTS_API_PATH . 'src/Runtime/class-wp-agent-chat-run-control.php'; require_once AGENTS_API_PATH . 'src/Tasks/class-wp-agent-task-run-control.php'; require_once AGENTS_API_PATH . 'src/Runtime/class-wp-agent-conversation-loop.php'; diff --git a/src/Runtime/class-wp-agent-chat-run-control.php b/src/Runtime/class-wp-agent-chat-run-control.php index 045b0a9..7f0510b 100644 --- a/src/Runtime/class-wp-agent-chat-run-control.php +++ b/src/Runtime/class-wp-agent-chat-run-control.php @@ -42,16 +42,7 @@ public static function statuses(): array { * Generate an opaque client-addressable run ID. */ public static function generate_run_id(): string { - if ( function_exists( 'wp_generate_uuid4' ) ) { - return 'run_' . str_replace( '-', '', wp_generate_uuid4() ); - } - - try { - return 'run_' . bin2hex( random_bytes( 16 ) ); - } catch ( \Throwable $error ) { - unset( $error ); - return 'run_' . str_replace( '.', '', uniqid( '', true ) ); - } + return WP_Agent_Run_Control::generate_run_id(); } /** @@ -101,7 +92,7 @@ public static function normalize_run( array $run ): array { * Normalize status values while keeping the public vocabulary bounded. */ public static function normalize_status( mixed $status ): string { - $status = is_string( $status ) ? strtolower( trim( $status ) ) : ''; + $status = WP_Agent_Run_Control::normalize_status( $status ); return in_array( $status, self::statuses(), true ) ? $status : self::STATUS_RUNNING; } @@ -114,21 +105,14 @@ public static function normalize_status( mixed $status ): string { * @return array Normalized run. */ public static function start_run( string $run_id, string $session_id, array $metadata = array() ): array { - $now = self::now(); - $run = array( - 'run_id' => $run_id, - 'session_id' => $session_id, - 'status' => self::STATUS_RUNNING, - 'started_at' => $metadata['started_at'] ?? $now, - 'updated_at' => $now, - 'metadata' => $metadata, - ); - - $state = self::state(); - $state['runs'][ $run_id ] = $run; - self::save_state( $state ); - - return self::normalize_run( $run ); + return self::normalize_run( WP_Agent_Run_Control::start_run( + self::OPTION_KEY, + $run_id, + array( + 'session_id' => $session_id, + 'metadata' => $metadata, + ) + ) ); } /** @@ -139,22 +123,8 @@ public static function start_run( string $run_id, string $session_id, array $met * @return array|null Normalized run, or null when absent. */ public static function finish_run( string $run_id, string $status = self::STATUS_COMPLETED ): ?array { - $state = self::state(); - if ( ! isset( $state['runs'][ $run_id ] ) ) { - return null; - } - - $run = $state['runs'][ $run_id ]; - $run['status'] = self::normalize_status( $status ); - $run['updated_at'] = self::now(); - if ( self::STATUS_CANCELLED === $run['status'] ) { - $run['cancelled'] = true; - } - - $state['runs'][ $run_id ] = $run; - self::save_state( $state ); - - return self::normalize_run( $run ); + $run = WP_Agent_Run_Control::finish_run( self::OPTION_KEY, $run_id, $status ); + return null === $run ? null : self::normalize_run( $run ); } /** @@ -164,9 +134,8 @@ public static function finish_run( string $run_id, string $status = self::STATUS * @return array|null Normalized run, or null when absent. */ public static function get_run( string $run_id ): ?array { - $state = self::state(); - $run = $state['runs'][ $run_id ] ?? null; - return is_array( $run ) ? self::normalize_run( $run ) : null; + $run = WP_Agent_Run_Control::get_run( self::OPTION_KEY, $run_id ); + return null === $run ? null : self::normalize_run( $run ); } /** @@ -176,21 +145,8 @@ public static function get_run( string $run_id ): ?array { * @return array|null Normalized run, or null when absent. */ public static function request_cancel( string $run_id ): ?array { - $state = self::state(); - if ( ! isset( $state['runs'][ $run_id ] ) ) { - return null; - } - - $run = $state['runs'][ $run_id ]; - $terminal = in_array( self::normalize_status( $run['status'] ?? '' ), array( self::STATUS_COMPLETED, self::STATUS_FAILED, self::STATUS_CANCELLED ), true ); - $run['status'] = $terminal ? self::normalize_status( $run['status'] ?? '' ) : self::STATUS_CANCELLING; - $run['cancelled'] = ! $terminal; - $run['updated_at'] = self::now(); - - $state['runs'][ $run_id ] = $run; - self::save_state( $state ); - - return self::normalize_run( $run ); + $run = WP_Agent_Run_Control::request_cancel( self::OPTION_KEY, $run_id ); + return null === $run ? null : self::normalize_run( $run ); } /** diff --git a/src/Runtime/class-wp-agent-run-control.php b/src/Runtime/class-wp-agent-run-control.php new file mode 100644 index 0000000..cea29d6 --- /dev/null +++ b/src/Runtime/class-wp-agent-run-control.php @@ -0,0 +1,310 @@ + $run Raw run status. + * @return array + */ + public static function normalize_run( array $run ): array { + $run_id = trim( self::string_value( $run['run_id'] ?? null ) ); + $status = self::normalize_status( $run['status'] ?? self::STATUS_RUNNING ); + + if ( '' === $run_id ) { + throw new \InvalidArgumentException( 'run_id must be a non-empty string' ); + } + + $normalized = array( + 'run_id' => $run_id, + 'status' => $status, + 'started_at' => self::string_value( $run['started_at'] ?? null ), + 'updated_at' => self::string_value( $run['updated_at'] ?? null ), + 'metadata' => isset( $run['metadata'] ) && is_array( $run['metadata'] ) ? $run['metadata'] : array(), + ); + + foreach ( array( 'session_id', 'workflow_id', 'executor_id', 'queued_message_id' ) as $field ) { + if ( isset( $run[ $field ] ) ) { + $normalized[ $field ] = self::string_value( $run[ $field ] ); + } + } + + if ( isset( $run['position'] ) ) { + $normalized['position'] = max( 0, self::int_value( $run['position'] ) ); + } + + if ( isset( $run['cancelled'] ) ) { + $normalized['cancelled'] = (bool) $run['cancelled']; + } + + return $normalized; + } + + /** + * Start or update an addressable run in the selected store. + * + * @param string $store_key Option key used by the backing store. + * @param string $run_id Run ID. + * @param array $run Run fields. + * @return array + */ + public static function start_run( string $store_key, string $run_id, array $run = array() ): array { + $now = self::now(); + $run = array_merge( + $run, + array( + 'run_id' => $run_id, + 'status' => self::STATUS_RUNNING, + 'started_at' => $run['started_at'] ?? $now, + 'updated_at' => $now, + 'metadata' => isset( $run['metadata'] ) && is_array( $run['metadata'] ) ? $run['metadata'] : array(), + ) + ); + + $state = self::state( $store_key ); + $state['runs'][ $run_id ] = $run; + self::save_state( $store_key, $state ); + + return self::normalize_run( $run ); + } + + /** + * Store a normalized run result. + * + * @param string $store_key Option key used by the backing store. + * @param array $run Run payload. + * @return array + */ + public static function save_run( string $store_key, array $run ): array { + $normalized = self::normalize_run( $run ); + $normalized['updated_at'] = '' !== $normalized['updated_at'] ? $normalized['updated_at'] : self::now(); + $run_id = self::string_value( $normalized['run_id'] ); + + $state = self::state( $store_key ); + $state['runs'][ $run_id ] = $normalized; + self::save_state( $store_key, $state ); + + return $normalized; + } + + /** + * Finish a stored run. + * + * @param string $store_key Option key used by the backing store. + * @param string $run_id Run ID. + * @param string $status Terminal status. + * @return array|null + */ + public static function finish_run( string $store_key, string $run_id, string $status = self::STATUS_COMPLETED ): ?array { + $state = self::state( $store_key ); + if ( ! isset( $state['runs'][ $run_id ] ) ) { + return null; + } + + $run = $state['runs'][ $run_id ]; + $run['status'] = self::normalize_status( $status ); + $run['updated_at'] = self::now(); + if ( self::STATUS_CANCELLED === $run['status'] ) { + $run['cancelled'] = true; + } + + $state['runs'][ $run_id ] = $run; + self::save_state( $store_key, $state ); + + return self::normalize_run( $run ); + } + + /** + * Read a stored run. + * + * @return array|null + */ + public static function get_run( string $store_key, string $run_id ): ?array { + $state = self::state( $store_key ); + $run = $state['runs'][ $run_id ] ?? null; + return is_array( $run ) ? self::normalize_run( $run ) : null; + } + + /** + * Request cancellation of a stored run. + * + * @return array|null + */ + public static function request_cancel( string $store_key, string $run_id ): ?array { + $state = self::state( $store_key ); + if ( ! isset( $state['runs'][ $run_id ] ) ) { + return null; + } + + $run = $state['runs'][ $run_id ]; + $terminal = in_array( self::normalize_status( $run['status'] ?? '' ), array( self::STATUS_COMPLETED, self::STATUS_SUCCEEDED, self::STATUS_FAILED, self::STATUS_CANCELLED ), true ); + $run['status'] = $terminal ? self::normalize_status( $run['status'] ?? '' ) : self::STATUS_CANCELLING; + $run['cancelled'] = ! $terminal; + $run['updated_at'] = self::now(); + + $state['runs'][ $run_id ] = $run; + self::save_state( $store_key, $state ); + + return self::normalize_run( $run ); + } + + public static function cancel_requested( string $store_key, string $run_id ): bool { + $run = self::get_run( $store_key, $run_id ); + return null !== $run && self::STATUS_CANCELLING === ( $run['status'] ?? '' ); + } + + /** @return array{runs:array>,queues:array>>} */ + public static function state( string $store_key ): array { + $state = function_exists( 'get_option' ) ? get_option( $store_key, array() ) : array(); + if ( ! is_array( $state ) ) { + $state = array(); + } + + return array( + 'runs' => self::stored_runs( $state['runs'] ?? array() ), + 'queues' => self::stored_queues( $state['queues'] ?? array() ), + ); + } + + /** + * @param array{runs:array>,queues:array>>} $state + */ + public static function save_state( string $store_key, array $state ): void { + if ( function_exists( 'update_option' ) ) { + update_option( $store_key, $state, false ); + } + } + + public static function now(): string { + return gmdate( 'c' ); + } + + private static function string_value( mixed $value ): string { + return is_int( $value ) || is_float( $value ) || is_string( $value ) || is_bool( $value ) ? (string) $value : ''; + } + + private static function int_value( mixed $value ): int { + return is_int( $value ) || is_float( $value ) || is_string( $value ) || is_bool( $value ) ? (int) $value : 0; + } + + /** + * @param mixed $runs Raw stored runs. + * @return array> + */ + private static function stored_runs( mixed $runs ): array { + if ( ! is_array( $runs ) ) { + return array(); + } + + $stored = array(); + foreach ( $runs as $run_id => $run ) { + if ( is_string( $run_id ) && is_array( $run ) ) { + $stored[ $run_id ] = self::assoc_array( $run ); + } + } + + return $stored; + } + + /** + * @param mixed $queues Raw stored queues. + * @return array>> + */ + private static function stored_queues( mixed $queues ): array { + if ( ! is_array( $queues ) ) { + return array(); + } + + $stored = array(); + foreach ( $queues as $scope => $items ) { + if ( ! is_string( $scope ) || ! is_array( $items ) ) { + continue; + } + + $stored[ $scope ] = array(); + foreach ( $items as $item ) { + if ( is_array( $item ) ) { + $stored[ $scope ][] = self::assoc_array( $item ); + } + } + } + + return $stored; + } + + /** + * @param array $value + * @return array + */ + private static function assoc_array( array $value ): array { + $result = array(); + foreach ( $value as $key => $item ) { + if ( is_string( $key ) ) { + $result[ $key ] = $item; + } + } + return $result; + } +} diff --git a/src/Workflows/class-wp-agent-workflow-run-result.php b/src/Workflows/class-wp-agent-workflow-run-result.php index a38d3e4..b6dc0b5 100644 --- a/src/Workflows/class-wp-agent-workflow-run-result.php +++ b/src/Workflows/class-wp-agent-workflow-run-result.php @@ -33,6 +33,7 @@ final class WP_Agent_Workflow_Run_Result { public const STATUS_SUCCEEDED = 'succeeded'; public const STATUS_FAILED = 'failed'; public const STATUS_SKIPPED = 'skipped'; + public const STATUS_CANCELLED = 'cancelled'; /** * @since 0.103.0 diff --git a/src/Workflows/class-wp-agent-workflow-runner.php b/src/Workflows/class-wp-agent-workflow-runner.php index 537fe71..6556513 100644 --- a/src/Workflows/class-wp-agent-workflow-runner.php +++ b/src/Workflows/class-wp-agent-workflow-runner.php @@ -40,12 +40,15 @@ namespace AgentsAPI\AI\Workflows; use AgentsAPI\AI\Abilities\WP_Agent_Ability_Dispatcher; +use AgentsAPI\AI\WP_Agent_Run_Control; use WP_Error; defined( 'ABSPATH' ) || exit; class WP_Agent_Workflow_Runner { + public const RUN_CONTROL_STORE = 'agents_api_workflow_run_control'; + /** * @var array Step type → handler candidate. Each callable handler * receives ( array $resolved_step, array $context ) @@ -148,6 +151,15 @@ public function run( WP_Agent_Workflow_Spec $spec, array $inputs = array(), arra } } + WP_Agent_Run_Control::start_run( + self::RUN_CONTROL_STORE, + $result->get_run_id(), + array( + 'workflow_id' => $spec->get_id(), + 'metadata' => $metadata, + ) + ); + // Validate inputs against the spec's input declarations. $input_error = self::validate_inputs( $spec, $inputs ); if ( null !== $input_error ) { @@ -161,10 +173,19 @@ public function run( WP_Agent_Workflow_Spec $spec, array $inputs = array(), arra if ( $this->recorder ) { $this->recorder->update( $terminal ); } + WP_Agent_Run_Control::finish_run( self::RUN_CONTROL_STORE, $result->get_run_id(), WP_Agent_Run_Control::STATUS_FAILED ); return $terminal; } - $context = WP_Agent_Workflow_Run_Context::from_inputs( $inputs ); + $context = new WP_Agent_Workflow_Run_Context( + array( + 'inputs' => $inputs, + 'steps' => array(), + 'vars' => array(), + '_workflow_run_id' => $result->get_run_id(), + '_workflow_store_key' => self::RUN_CONTROL_STORE, + ) + ); $executor = new WP_Agent_Workflow_Step_Executor( $this->step_handlers ); $step_records = array(); @@ -173,9 +194,25 @@ public function run( WP_Agent_Workflow_Spec $spec, array $inputs = array(), arra $failure_error = array(); foreach ( $spec->get_steps() as $step ) { + if ( self::is_cancel_requested( $result->get_run_id() ) ) { + $result = self::cancelled_result( $result, $step_records ); + if ( $this->recorder ) { + $this->recorder->update( $result ); + } + return $result; + } + $record = $executor->execute( $step, $context ); $step_records[] = $record; + if ( self::is_cancel_requested( $result->get_run_id() ) ) { + $result = self::cancelled_result( $result, $step_records ); + if ( $this->recorder ) { + $this->recorder->update( $result ); + } + return $result; + } + if ( WP_Agent_Workflow_Run_Result::STATUS_SUCCEEDED !== $record['status'] ) { $failed = true; $failure_error = $record['error']; @@ -225,9 +262,38 @@ public function run( WP_Agent_Workflow_Spec $spec, array $inputs = array(), arra if ( $this->recorder ) { $this->recorder->update( $result ); } + WP_Agent_Run_Control::finish_run( + self::RUN_CONTROL_STORE, + $result->get_run_id(), + $failed ? WP_Agent_Run_Control::STATUS_FAILED : WP_Agent_Run_Control::STATUS_SUCCEEDED + ); return $result; } + /** @phpstan-impure */ + private static function is_cancel_requested( string $run_id ): bool { + return WP_Agent_Run_Control::cancel_requested( self::RUN_CONTROL_STORE, $run_id ); + } + + /** + * @param array $step_records Step records completed before cancellation was observed. + */ + private static function cancelled_result( WP_Agent_Workflow_Run_Result $result, array $step_records ): WP_Agent_Workflow_Run_Result { + WP_Agent_Run_Control::finish_run( self::RUN_CONTROL_STORE, $result->get_run_id(), WP_Agent_Run_Control::STATUS_CANCELLED ); + + return $result->with( + array( + 'status' => WP_Agent_Workflow_Run_Result::STATUS_CANCELLED, + 'steps' => $step_records, + 'error' => array( + 'code' => 'cancel_requested', + 'message' => 'Workflow run cancellation was requested.', + ), + 'ended_at' => time(), + ) + ); + } + /** * Validate inputs against the spec's declared input schemas. * @@ -405,6 +471,10 @@ public static function default_foreach_handler( array $step, array $context ) { $iterations = array(); foreach ( array_values( $items ) as $index => $item ) { + if ( self::foreach_cancel_requested( $context ) ) { + return new \WP_Error( 'cancel_requested', 'Workflow run cancellation was requested.' ); + } + $iteration_context = ( new WP_Agent_Workflow_Run_Context( $context ) )->with_vars( array( $as => $item, @@ -415,6 +485,10 @@ public static function default_foreach_handler( array $step, array $context ) { $last_output = null; foreach ( $steps as $nested_step ) { + if ( self::foreach_cancel_requested( $context ) ) { + return new \WP_Error( 'cancel_requested', 'Workflow run cancellation was requested.' ); + } + if ( ! is_array( $nested_step ) ) { return new \WP_Error( 'workflow_foreach_step_invalid', @@ -477,6 +551,16 @@ public static function default_foreach_handler( array $step, array $context ) { ); } + /** + * @param array $context Resolution context. + * @phpstan-impure + */ + private static function foreach_cancel_requested( array $context ): bool { + $run_id = self::string_value( $context['_workflow_run_id'] ?? null ); + $store_key = self::string_value( $context['_workflow_store_key'] ?? null ); + return '' !== $run_id && '' !== $store_key && WP_Agent_Run_Control::cancel_requested( $store_key, $run_id ); + } + /** * Return the filtered default handler map for nested step execution. * diff --git a/tests/workflow-runner-smoke.php b/tests/workflow-runner-smoke.php index a21c99e..c754d34 100644 --- a/tests/workflow-runner-smoke.php +++ b/tests/workflow-runner-smoke.php @@ -45,8 +45,9 @@ public function execute( $input = null ) { } } -$GLOBALS['__filters'] = array(); -$GLOBALS['__abilities'] = array(); +$GLOBALS['__filters'] = array(); +$GLOBALS['__abilities'] = array(); +$GLOBALS['__options'] = array(); if ( ! function_exists( 'add_filter' ) ) { function add_filter( string $hook, callable $cb, int $priority = 10, int $accepted_args = 1 ): void { @@ -87,6 +88,18 @@ function wp_get_ability( string $name ) { return $GLOBALS['__abilities'][ $name ] ?? null; } } +if ( ! function_exists( 'get_option' ) ) { + function get_option( string $option, $default = false ) { + return $GLOBALS['__options'][ $option ] ?? $default; + } +} +if ( ! function_exists( 'update_option' ) ) { + function update_option( string $option, $value, $autoload = null ): bool { + unset( $autoload ); + $GLOBALS['__options'][ $option ] = $value; + return true; + } +} /** * Register a stub ability so the runner can resolve it through wp_get_ability() @@ -170,10 +183,12 @@ function smoke_assert( $expected, $actual, string $name, array &$failures, int & require_once __DIR__ . '/../src/Workflows/class-wp-agent-workflow-store.php'; require_once __DIR__ . '/../src/Workflows/class-wp-agent-workflow-run-recorder.php'; require_once __DIR__ . '/../src/Abilities/class-wp-agent-ability-dispatcher.php'; +require_once __DIR__ . '/../src/Runtime/class-wp-agent-run-control.php'; require_once __DIR__ . '/../src/Workflows/class-wp-agent-workflow-run-context.php'; require_once __DIR__ . '/../src/Workflows/class-wp-agent-workflow-step-executor.php'; require_once __DIR__ . '/../src/Workflows/class-wp-agent-workflow-runner.php'; +use AgentsAPI\AI\WP_Agent_Run_Control; use AgentsAPI\AI\Workflows\WP_Agent_Workflow_Run_Recorder; use AgentsAPI\AI\Workflows\WP_Agent_Workflow_Run_Result; use AgentsAPI\AI\Workflows\WP_Agent_Workflow_Runner; @@ -233,6 +248,13 @@ static function ( array $input ): \WP_Error { return new \WP_Error( 'demo_bang', 'something broke' ); } ); +workflow_runner_smoke_register_ability( + 'demo/request-cancel', + static function ( array $input ): array { + WP_Agent_Run_Control::request_cancel( WP_Agent_Workflow_Runner::RUN_CONTROL_STORE, (string) ( $input['run_id'] ?? '' ) ); + return array( 'cancel_requested' => true ); + } +); workflow_runner_smoke_register_ability( 'agents/chat', static function ( array $input ): \WP_Error { @@ -286,6 +308,36 @@ static function ( array $input ): \WP_Error { smoke_assert( true, 64 === strlen( $result->get_replay_metadata()['workflow_spec_hash'] ?? '' ), 'replay metadata includes sha256 spec hash', $failures, $passes ); smoke_assert( $spec->to_array(), $result->get_replay_metadata()['workflow_spec_snapshot'] ?? array(), 'replay metadata includes workflow spec snapshot', $failures, $passes ); +// ─── Generic run-control cancellation stops before the next step ────── + +$cancel_spec = WP_Agent_Workflow_Spec::from_array( + array( + 'id' => 'demo/cancel-before-next-step', + 'steps' => array( + array( + 'id' => 'request_cancel', + 'type' => 'ability', + 'ability' => 'demo/request-cancel', + 'args' => array( 'run_id' => 'cancel-run-1' ), + ), + array( + 'id' => 'should_not_run', + 'type' => 'ability', + 'ability' => 'demo/uppercase', + 'args' => array( 'text' => 'never reached' ), + ), + ), + ) +); + +$cancel_result = ( new WP_Agent_Workflow_Runner( null ) )->run( $cancel_spec, array(), array( 'run_id' => 'cancel-run-1' ) ); +$cancel_stored = WP_Agent_Run_Control::get_run( WP_Agent_Workflow_Runner::RUN_CONTROL_STORE, 'cancel-run-1' ); + +smoke_assert( WP_Agent_Workflow_Run_Result::STATUS_CANCELLED, $cancel_result->get_status(), 'cancel_requested cancels workflow run', $failures, $passes ); +smoke_assert( 1, count( $cancel_result->get_steps() ), 'cancel_requested stops before next workflow step', $failures, $passes ); +smoke_assert( 'cancel_requested', $cancel_result->get_error()['code'] ?? '', 'cancelled workflow exposes cancel_requested error code', $failures, $passes ); +smoke_assert( WP_Agent_Run_Control::STATUS_CANCELLED, $cancel_stored['status'] ?? '', 'workflow run-control record is marked cancelled', $failures, $passes ); + // ─── Evidence refs stay first-class through results and recorders ───── $evidence_refs = array(