From ba9cb1086fbc7ed6fbe6c5227b9414ebd41969b1 Mon Sep 17 00:00:00 2001 From: isty2e Date: Thu, 2 Jul 2026 22:05:00 +0900 Subject: [PATCH 1/6] feat(study): enforce hard evaluation budgets by default --- src/variopt/__init__.py | 4 + .../algorithms/population/csa/optimizer.py | 56 +++- src/variopt/execution.py | 81 ++++++ src/variopt/kernel.py | 14 +- src/variopt/methods/run.py | 18 ++ src/variopt/study/core.py | 23 +- src/variopt/study/execution.py | 270 +++++++++++++++--- src/variopt/study/stale_async.py | 125 ++++++-- tests/csa/test_csa_checkpoint.py | 55 +++- tests/study/test_study.py | 90 +++--- tests/study/test_study_stale_async.py | 79 +++-- 11 files changed, 678 insertions(+), 137 deletions(-) diff --git a/src/variopt/__init__.py b/src/variopt/__init__.py index 7bb827c..2e2664a 100644 --- a/src/variopt/__init__.py +++ b/src/variopt/__init__.py @@ -19,6 +19,8 @@ SEQUENTIAL_EXECUTION_MODEL, STALE_ASYNC_EXECUTION_MODEL, SYNC_BATCH_EXECUTION_MODEL, + EvaluationBudget, + EvaluationBudgetExhausted, ExecutionAssimilationMode, ExecutionCompletionMode, ExecutionModel, @@ -64,6 +66,8 @@ "CategoricalSpace", "DiversityMetric", "EvaluationOutcome", + "EvaluationBudget", + "EvaluationBudgetExhausted", "EvaluationProtocol", "EvaluationRecord", "EvaluationRequest", diff --git a/src/variopt/algorithms/population/csa/optimizer.py b/src/variopt/algorithms/population/csa/optimizer.py index 7988a4f..d77b048 100644 --- a/src/variopt/algorithms/population/csa/optimizer.py +++ b/src/variopt/algorithms/population/csa/optimizer.py @@ -85,7 +85,8 @@ @dataclass(frozen=True, slots=True) -class CSAOptimizer(FrozenGenericSlotsCompat, +class CSAOptimizer( + FrozenGenericSlotsCompat, RunMethod[ CSAEngineState[CandidateT], Proposal[CandidateT], @@ -237,7 +238,10 @@ def from_space_defaults( effective_profile = CSAProfile( perturbation_schedule=effective_schedule, ) - elif perturbation_schedule is not None or effective_profile.perturbation_schedule is None: + elif ( + perturbation_schedule is not None + or effective_profile.perturbation_schedule is None + ): effective_profile = replace( effective_profile, perturbation_schedule=effective_schedule, @@ -401,6 +405,23 @@ def state_to_dict( ) return state.to_dict(candidate_to_dict=serializer) + @override + def is_checkpoint_safe_state(self, state: CSAEngineState[CandidateT]) -> bool: + """Return whether a CSA state is at a safe checkpoint boundary. + + Parameters + ---------- + state : CSAEngineState[CandidateT] + Engine state to inspect. + + Returns + ------- + bool + ``True`` when no pending proposals or in-flight generation runtime + would be lost by checkpoint serialization. + """ + return state.pending_proposals.is_empty and not state.generation_state.is_active + def state_from_dict( self, data: Mapping[str, JSONValue], @@ -592,7 +613,9 @@ def tell( def tell_outcomes( self, state: CSAEngineState[CandidateT], - outcomes: Sequence[EvaluationOutcome[OutcomeCandidateT, Observation[CandidateT]]], + outcomes: Sequence[ + EvaluationOutcome[OutcomeCandidateT, Observation[CandidateT]] + ], ) -> CSAEngineState[CandidateT]: """Advance CSA state with full outcomes when refinement metadata exists. @@ -645,7 +668,10 @@ def _tell_with_explicit_local_displacements( Callable[[CandidateT, CandidateT], tuple[LeafPath, ...]] | None ) = None numeric_subspace_displacement_inference: ( - Callable[[ProposalAttribution, CandidateT], NumericSubspaceDisplacement | None] | None + Callable[ + [ProposalAttribution, CandidateT], NumericSubspaceDisplacement | None + ] + | None ) = None if is_structured_candidate_space(self.space): structured_space = self.space @@ -855,14 +881,16 @@ def propose_candidate( ) if ask_plan.kind == "materialize_generation": - materialized_generation, next_random_state = engine_state.random_state.advance( - lambda random_state: materialize_generation( - engine_state=engine_state, - resolved_profile=self.resolved_profile, - space=self.space, - diversity_metric=self.diversity_metric, - random_state=random_state, - ), + materialized_generation, next_random_state = ( + engine_state.random_state.advance( + lambda random_state: materialize_generation( + engine_state=engine_state, + resolved_profile=self.resolved_profile, + space=self.space, + diversity_metric=self.diversity_metric, + random_state=random_state, + ), + ) ) generated_candidate, next_engine_state = commit_materialized_generation( engine_state.replace_random_state(next_random_state), @@ -875,7 +903,9 @@ def propose_candidate( next_engine_state, ) - generated_candidate, next_engine_state = dequeue_generation_candidate(engine_state) + generated_candidate, next_engine_state = dequeue_generation_candidate( + engine_state + ) return ( generated_candidate.candidate, True, diff --git a/src/variopt/execution.py b/src/variopt/execution.py index 0573a25..02ce2e1 100644 --- a/src/variopt/execution.py +++ b/src/variopt/execution.py @@ -5,6 +5,87 @@ from typing import Literal +class EvaluationBudgetExhausted(RuntimeError): + """Raised when an execution path would exceed its evaluation budget.""" + + +@dataclass(slots=True) +class EvaluationBudget: + """Mutable runtime ledger for a hard evaluation budget. + + Parameters + ---------- + remaining : int + Number of evaluation units still available. + + Notes + ----- + Study orchestration consumes this ledger before submitting evaluator work. + Kernels that compute outcomes without calling the evaluator may consume it + directly, and the study layer reconciles any unmetered reported cost before + assimilating outcomes. + """ + + remaining: int + + def __post_init__(self) -> None: + """Validate the initial budget.""" + if self.remaining < 0: + msg = "remaining evaluation budget must be non-negative" + raise ValueError(msg) + + @property + def is_exhausted(self) -> bool: + """Return whether no evaluation units remain.""" + return self.remaining == 0 + + def can_consume(self, count: int = 1) -> bool: + """Return whether ``count`` evaluation units can be consumed. + + Parameters + ---------- + count : int, default=1 + Requested evaluation-unit count. + + Returns + ------- + bool + ``True`` when the ledger has at least ``count`` units remaining. + + Raises + ------ + ValueError + If ``count`` is negative. + """ + if count < 0: + msg = "evaluation budget count must be non-negative" + raise ValueError(msg) + return count <= self.remaining + + def consume(self, count: int = 1) -> None: + """Consume ``count`` evaluation units or raise. + + Parameters + ---------- + count : int, default=1 + Evaluation-unit count to consume. + + Raises + ------ + ValueError + If ``count`` is negative. + EvaluationBudgetExhausted + If consuming ``count`` would exceed the budget. + """ + if count < 0: + msg = "evaluation budget count must be non-negative" + raise ValueError(msg) + if count > self.remaining: + msg = "evaluation budget exhausted" + raise EvaluationBudgetExhausted(msg) + self.remaining -= count + + class ExecutionCompletionMode(Enum): """Completion-order axis for an execution model. diff --git a/src/variopt/kernel.py b/src/variopt/kernel.py index f2d20b3..56351dd 100644 --- a/src/variopt/kernel.py +++ b/src/variopt/kernel.py @@ -16,7 +16,7 @@ ProposalEvaluationSpec, RequestAlignedEvaluationRecord, ) -from .execution import ExecutionResources +from .execution import EvaluationBudget, ExecutionResources from .problem import Problem from .spaces import LeafPath @@ -137,7 +137,9 @@ def __post_init__(self) -> None: msg = "local_budget must be positive when provided" raise ValueError(msg) - normalized_leaf_paths = tuple(tuple(path) for path in self.prioritized_leaf_paths) + normalized_leaf_paths = tuple( + tuple(path) for path in self.prioritized_leaf_paths + ) if len(set(normalized_leaf_paths)) != len(normalized_leaf_paths): msg = "prioritized_leaf_paths must not contain duplicates" raise ValueError(msg) @@ -146,7 +148,9 @@ def __post_init__(self) -> None: @dataclass(frozen=True, slots=True) -class ProposalBatchQuery(FrozenGenericSlotsCompat, Generic[BoundaryT, CandidateT, QueryEvaluationRecordT]): +class ProposalBatchQuery( + FrozenGenericSlotsCompat, Generic[BoundaryT, CandidateT, QueryEvaluationRecordT] +): """Canonical kernel query over a proposal batch. Parameters @@ -162,6 +166,9 @@ class ProposalBatchQuery(FrozenGenericSlotsCompat, Generic[BoundaryT, CandidateT proposal_kernel_hints : tuple[ProposalKernelHint | None, ...] | None, optional Optional per-proposal kernel hints aligned one-to-one with ``proposals``. + evaluation_budget : EvaluationBudget | None, optional + Shared runtime ledger for hard evaluation budgeting. Kernels may inspect + or consume this ledger before issuing evaluator work. Notes ----- @@ -174,6 +181,7 @@ class ProposalBatchQuery(FrozenGenericSlotsCompat, Generic[BoundaryT, CandidateT execution_resources: ExecutionResources proposal_evaluation_specs: tuple[ProposalEvaluationSpec | None, ...] | None = None proposal_kernel_hints: tuple[ProposalKernelHint | None, ...] | None = None + evaluation_budget: EvaluationBudget | None = None def __post_init__(self) -> None: """Validate aligned per-proposal metadata. diff --git a/src/variopt/methods/run.py b/src/variopt/methods/run.py index de4d434..95e5bf6 100644 --- a/src/variopt/methods/run.py +++ b/src/variopt/methods/run.py @@ -191,3 +191,21 @@ def supported_execution_models(self) -> frozenset[ExecutionModel]: SYNC_BATCH_EXECUTION_MODEL, }, ) + + def is_checkpoint_safe_state(self, state: RunMethodStateT) -> bool: + """Return whether ``state`` can be persisted as a checkpoint. + + Parameters + ---------- + state : RunMethodStateT + Run-method state to inspect. + + Returns + ------- + bool + ``True`` when checkpointing this state preserves resumable optimizer + semantics. Stateless or always-serializable run methods use the + default ``True`` contract. + """ + _ = state + return True diff --git a/src/variopt/study/core.py b/src/variopt/study/core.py index 5954f3d..36a916d 100644 --- a/src/variopt/study/core.py +++ b/src/variopt/study/core.py @@ -43,8 +43,9 @@ @dataclass(frozen=True, slots=True, init=False) -class Study(FrozenGenericSlotsCompat, - Generic[BoundaryT, CandidateT, RunMethodStateT, StudyEvaluationRecordT] +class Study( + FrozenGenericSlotsCompat, + Generic[BoundaryT, CandidateT, RunMethodStateT, StudyEvaluationRecordT], ): """User-facing facade for running an optimization study. @@ -230,8 +231,9 @@ def run( batch_size: int = 1, *, execution_model: ExecutionModel = SYNC_BATCH_EXECUTION_MODEL, - count_evaluation_cost: bool = False, + count_evaluation_cost: bool = True, initial_state: RunMethodStateT | None = None, + stop_at_checkpoint_boundary: bool = False, ) -> tuple[RunReport[CandidateT, StudyEvaluationRecordT], RunMethodStateT]: """Run the study until the evaluation budget is exhausted. @@ -244,11 +246,13 @@ def run( execution_model : ExecutionModel, default=SYNC_BATCH_EXECUTION_MODEL Execution-model contract that controls completion and assimilation order. - count_evaluation_cost : bool, default=False + count_evaluation_cost : bool, default=True Whether to consume budget using each outcome's logical evaluation cost instead of simple record count. initial_state : RunMethodStateT | None, optional Optional run-method state to start from. + stop_at_checkpoint_boundary : bool, default=False + Whether to return a checkpoint-safe terminal state boundary. Returns ------- @@ -262,6 +266,7 @@ def run( batch_size=batch_size, count_evaluation_cost=count_evaluation_cost, initial_state=initial_state, + stop_at_checkpoint_boundary=stop_at_checkpoint_boundary, ) return run_study( @@ -271,6 +276,7 @@ def run( execution_model=execution_model, count_evaluation_cost=count_evaluation_cost, initial_state=initial_state, + stop_at_checkpoint_boundary=stop_at_checkpoint_boundary, ) def optimize( @@ -279,8 +285,9 @@ def optimize( batch_size: int = 1, *, execution_model: ExecutionModel = SYNC_BATCH_EXECUTION_MODEL, - count_evaluation_cost: bool = False, + count_evaluation_cost: bool = True, initial_state: RunMethodStateT | None = None, + stop_at_checkpoint_boundary: bool = False, ) -> tuple[RunResult[CandidateT], RunMethodStateT]: """Run the study and materialize the scalar best-result summary. @@ -293,11 +300,13 @@ def optimize( execution_model : ExecutionModel, default=SYNC_BATCH_EXECUTION_MODEL Execution-model contract that controls completion and assimilation order. - count_evaluation_cost : bool, default=False + count_evaluation_cost : bool, default=True Whether to consume budget using each outcome's logical evaluation cost instead of simple record count. initial_state : RunMethodStateT | None, optional Optional run-method state to start from. + stop_at_checkpoint_boundary : bool, default=False + Whether to return a checkpoint-safe terminal state boundary. Returns ------- @@ -311,6 +320,7 @@ def optimize( execution_model=execution_model, count_evaluation_cost=count_evaluation_cost, initial_state=initial_state, + stop_at_checkpoint_boundary=stop_at_checkpoint_boundary, ) return ( materialize_scalar_run_result( @@ -327,6 +337,7 @@ def optimize( execution_model=execution_model, count_evaluation_cost=count_evaluation_cost, initial_state=initial_state, + stop_at_checkpoint_boundary=stop_at_checkpoint_boundary, ) diff --git a/src/variopt/study/execution.py b/src/variopt/study/execution.py index 51eb3e7..d3216b3 100644 --- a/src/variopt/study/execution.py +++ b/src/variopt/study/execution.py @@ -1,6 +1,6 @@ """Generic study step and run orchestration.""" -from dataclasses import dataclass +from dataclasses import dataclass, replace from typing import Generic, Protocol, TypeGuard, cast from typing_extensions import TypeVar @@ -24,6 +24,7 @@ SEQUENTIAL_EXECUTION_MODEL, STALE_ASYNC_EXECUTION_MODEL, SYNC_BATCH_EXECUTION_MODEL, + EvaluationBudget, ExecutionAssimilationMode, ExecutionModel, ) @@ -47,7 +48,10 @@ @dataclass(frozen=True, slots=True) -class StudyStepResult(FrozenGenericSlotsCompat, Generic[CandidateT, RunMethodStateT, StudyEvaluationRecordT]): +class StudyStepResult( + FrozenGenericSlotsCompat, + Generic[CandidateT, RunMethodStateT, StudyEvaluationRecordT], +): """Canonical in-process result for one ask/evaluate/tell study step. Parameters @@ -56,10 +60,26 @@ class StudyStepResult(FrozenGenericSlotsCompat, Generic[CandidateT, RunMethodSta Evaluator outcomes returned for the step. state : RunMethodStateT Run-method state after assimilating ``outcomes``. + evaluation_count : int + Evaluation units consumed by the step after hard-budget reconciliation. """ outcomes: tuple[EvaluationOutcome[CandidateT, StudyEvaluationRecordT], ...] state: RunMethodStateT + evaluation_count: int + + +@dataclass(frozen=True, slots=True) +class _CheckpointSafeRunSnapshot( + Generic[CandidateT, RunMethodStateT, StudyEvaluationRecordT] +): + """Last known checkpoint-safe run projection.""" + + records: tuple[StudyEvaluationRecordT, ...] + refinements: tuple[CandidateRefinement[CandidateT] | None, ...] | None + trace: Trace + evaluation_count: int + state: RunMethodStateT class StudyExecutionOwner( @@ -158,7 +178,9 @@ def _supports_direct_scalar_sequential_path( ], *, execution_model: ExecutionModel, -) -> TypeGuard[DirectScalarSequentialStudyOwner[BoundaryT, CandidateT, RunMethodStateT]]: +) -> TypeGuard[ + DirectScalarSequentialStudyOwner[BoundaryT, CandidateT, RunMethodStateT] +]: if execution_model not in { SEQUENTIAL_EXECUTION_MODEL, SYNC_BATCH_EXECUTION_MODEL, @@ -185,6 +207,61 @@ def _uses_default_scalar_request_evaluation( return False +def _query_with_evaluation_budget( + query: ProposalBatchQuery[BoundaryT, CandidateT, StudyEvaluationRecordT], + evaluation_budget: EvaluationBudget | None, +) -> ProposalBatchQuery[BoundaryT, CandidateT, StudyEvaluationRecordT]: + """Return ``query`` with the active budget ledger attached.""" + if evaluation_budget is None or query.evaluation_budget is not None: + return query + return replace(query, evaluation_budget=evaluation_budget) + + +def _consume_reported_evaluation_cost( + *, + evaluation_budget: EvaluationBudget | None, + remaining_before: int | None, + reported_evaluation_count: int, +) -> int: + """Reconcile reported outcome cost against pre-consumed runner cost.""" + if evaluation_budget is None or remaining_before is None: + return reported_evaluation_count + + consumed_by_runner = remaining_before - evaluation_budget.remaining + if reported_evaluation_count > consumed_by_runner: + evaluation_budget.consume(reported_evaluation_count - consumed_by_runner) + return reported_evaluation_count + return consumed_by_runner + + +def _current_remaining_budget( + *, + evaluation_budget: EvaluationBudget | None, + record_budget_remaining: int, +) -> int: + """Return the active loop budget for evaluation- or record-count mode.""" + if evaluation_budget is None: + return record_budget_remaining + return evaluation_budget.remaining + + +def _current_step_batch_size( + _study: StudyExecutionOwner[ + BoundaryT, + CandidateT, + RunMethodStateT, + StudyEvaluationRecordT, + ], + *, + batch_size: int, + remaining: int, + evaluation_budget: EvaluationBudget | None, +) -> int: + """Return a batch size that preserves hard budget safety for kernels.""" + _ = evaluation_budget + return min(batch_size, remaining) + + def _optimize_direct_scalar_sequential( study: DirectScalarSequentialStudyOwner[ BoundaryT, @@ -215,14 +292,24 @@ def _optimize_direct_scalar_sequential( observations: list[Observation[CandidateT]] = [] trace_events: list[TraceEvent] = [] - remaining = max_evaluations + evaluation_budget = ( + EvaluationBudget(max_evaluations) if count_evaluation_cost else None + ) + record_budget_remaining = max_evaluations state = ( study.run_method.create_initial_state() if initial_state is None else initial_state ) - while remaining > 0 and not study.run_method.is_exhausted(state): + while _current_remaining_budget( + evaluation_budget=evaluation_budget, + record_budget_remaining=record_budget_remaining, + ) > 0 and not study.run_method.is_exhausted(state): + remaining = _current_remaining_budget( + evaluation_budget=evaluation_budget, + record_budget_remaining=record_budget_remaining, + ) current_batch_size = min(batch_size, remaining) proposals, next_state = study.run_method.ask( state, @@ -259,7 +346,9 @@ def _optimize_direct_scalar_sequential( raise ValueError(msg) batch_observations: list[Observation[CandidateT]] = [] - batch_outcomes: list[EvaluationOutcome[CandidateT, Observation[CandidateT]]] = [] + batch_outcomes: list[ + EvaluationOutcome[CandidateT, Observation[CandidateT]] + ] = [] for index, proposal in enumerate(proposals): candidate = proposal.candidate study.problem.space.validate(candidate) @@ -272,6 +361,8 @@ def _optimize_direct_scalar_sequential( proposal=proposal, proposal_evaluation_spec=proposal_evaluation_spec, ) + if evaluation_budget is not None: + evaluation_budget.consume() observation = Observation.from_objective_value( request=request, candidate=candidate, @@ -290,13 +381,8 @@ def _optimize_direct_scalar_sequential( batch_outcome_tuple = tuple(batch_outcomes) state = study.run_method.tell_outcomes(next_state, batch_outcome_tuple) observations.extend(batch_observation_tuple) - batch_evaluation_count = sum( - outcome.evaluation_count for outcome in batch_outcome_tuple - ) - if count_evaluation_cost: - remaining -= batch_evaluation_count - else: - remaining -= len(batch_observation_tuple) + if evaluation_budget is None: + record_budget_remaining -= len(batch_observation_tuple) trace_events.append( TraceEvent( @@ -309,7 +395,11 @@ def _optimize_direct_scalar_sequential( return ( RunResult[CandidateT].from_observations( observations=tuple(observations), - evaluation_count=max_evaluations - remaining, + evaluation_count=( + max_evaluations - record_budget_remaining + if evaluation_budget is None + else max_evaluations - evaluation_budget.remaining + ), trace=Trace(events=tuple(trace_events)), candidate_equal=study.problem.space.candidates_equal, ), @@ -352,6 +442,8 @@ def evaluate_batch_sync( if requests is None else requests ) + if query.evaluation_budget is not None: + query.evaluation_budget.consume(len(resolved_requests)) return tuple(study.evaluator.evaluate(query.problem, resolved_requests)) @@ -366,6 +458,7 @@ def evaluate_step( batch_size: int, *, execution_model: ExecutionModel, + evaluation_budget: EvaluationBudget | None = None, ) -> StudyStepResult[CandidateT, RunMethodStateT, StudyEvaluationRecordT]: """Run one ask/kernel/evaluate/tell step and return outcomes and state. @@ -381,6 +474,8 @@ def evaluate_step( execution_model : ExecutionModel Execution model controlling whether evaluation is synchronous or exact-async. + evaluation_budget : EvaluationBudget | None, default=None + Optional hard evaluation-budget ledger shared across kernel subqueries. Returns ------- @@ -426,6 +521,7 @@ def evaluate_step( execution_resources=study.evaluator.execution_resources(), proposal_evaluation_specs=proposal_evaluation_specs, proposal_kernel_hints=proposal_kernel_hints, + evaluation_budget=evaluation_budget, ) top_level_requests: tuple[EvaluationRequest[CandidateT], ...] | None = None @@ -449,6 +545,7 @@ def requests_for_query( return requests if execution_model == EXACT_ASYNC_EXECUTION_MODEL: + def batch_executor( query: ProposalBatchQuery[ BoundaryT, @@ -456,12 +553,17 @@ def batch_executor( StudyEvaluationRecordT, ], ) -> tuple[EvaluationOutcome[CandidateT, StudyEvaluationRecordT], ...]: + query = _query_with_evaluation_budget(query, evaluation_budget) + query_requests = requests_for_query(query) + if query.evaluation_budget is not None: + query.evaluation_budget.consume(len(query_requests)) return evaluate_batch_exact_async( require_async_evaluator(study), query.problem, - requests_for_query(query), + query_requests, ) else: + def batch_executor( query: ProposalBatchQuery[ BoundaryT, @@ -469,11 +571,16 @@ def batch_executor( StudyEvaluationRecordT, ], ) -> tuple[EvaluationOutcome[CandidateT, StudyEvaluationRecordT], ...]: + query = _query_with_evaluation_budget(query, evaluation_budget) return evaluate_batch_sync( study, query, requests=requests_for_query(query), ) + + remaining_before = ( + None if evaluation_budget is None else evaluation_budget.remaining + ) outcomes = study.kernel.run(top_level_query, batch_executor) requests = requests_for_query(top_level_query) validate_aligned_outcomes( @@ -481,8 +588,18 @@ def batch_executor( outcomes, candidate_equal=study.problem.space.candidates_equal, ) + reported_evaluation_count = sum(outcome.evaluation_count for outcome in outcomes) + step_evaluation_count = _consume_reported_evaluation_cost( + evaluation_budget=evaluation_budget, + remaining_before=remaining_before, + reported_evaluation_count=reported_evaluation_count, + ) next_state = study.run_method.tell_outcomes(next_state, outcomes) - return StudyStepResult(outcomes=outcomes, state=next_state) + return StudyStepResult( + outcomes=outcomes, + state=next_state, + evaluation_count=step_evaluation_count, + ) def step( @@ -521,10 +638,7 @@ def step( NotImplementedError If ``execution_model`` requests stale-async semantics. """ - if ( - execution_model.assimilation_mode - is ExecutionAssimilationMode.STALE_INCREMENTAL - ): + if execution_model.assimilation_mode is ExecutionAssimilationMode.STALE_INCREMENTAL: msg = ( "stale_async execution model is only supported by " "Study.run and Study.optimize" @@ -556,8 +670,9 @@ def run( batch_size: int = 1, *, execution_model: ExecutionModel = SYNC_BATCH_EXECUTION_MODEL, - count_evaluation_cost: bool = False, + count_evaluation_cost: bool = True, initial_state: RunMethodStateT | None = None, + stop_at_checkpoint_boundary: bool = False, ) -> tuple[RunReport[CandidateT, StudyEvaluationRecordT], RunMethodStateT]: """Run repeated ask/evaluate/tell steps and return one generic run report. @@ -572,11 +687,15 @@ def run( Maximum number of proposals requested per step. execution_model : ExecutionModel, default=SYNC_BATCH_EXECUTION_MODEL Execution model controlling evaluation behavior. - count_evaluation_cost : bool, default=False + count_evaluation_cost : bool, default=True Whether to debit the budget using evaluator-reported evaluation counts instead of completed record count. initial_state : RunMethodStateT | None, default=None Optional initial run-method state. ``None`` creates a fresh state. + stop_at_checkpoint_boundary : bool, default=False + Whether to return a checkpoint-safe state. If the budget ends inside an + unsafe run-method segment, the report and state are rolled back to the + most recent checkpoint-safe boundary reached during this call. Returns ------- @@ -606,20 +725,53 @@ def run( records: list[StudyEvaluationRecordT] = [] refinements: list[CandidateRefinement[CandidateT] | None] | None = None trace = Trace() - remaining = max_evaluations + evaluation_budget = ( + EvaluationBudget(max_evaluations) if count_evaluation_cost else None + ) + record_budget_remaining = max_evaluations state = ( study.run_method.create_initial_state() if initial_state is None else initial_state ) + safe_snapshot: ( + _CheckpointSafeRunSnapshot[ + CandidateT, + RunMethodStateT, + StudyEvaluationRecordT, + ] + | None + ) = None + unsafe_since_safe_snapshot = False + if stop_at_checkpoint_boundary and study.run_method.is_checkpoint_safe_state(state): + safe_snapshot = _CheckpointSafeRunSnapshot( + records=(), + refinements=None, + trace=trace, + evaluation_count=0, + state=state, + ) - while remaining > 0 and not study.run_method.is_exhausted(state): - current_batch_size = min(batch_size, remaining) + while _current_remaining_budget( + evaluation_budget=evaluation_budget, + record_budget_remaining=record_budget_remaining, + ) > 0 and not study.run_method.is_exhausted(state): + remaining = _current_remaining_budget( + evaluation_budget=evaluation_budget, + record_budget_remaining=record_budget_remaining, + ) + current_batch_size = _current_step_batch_size( + study, + batch_size=batch_size, + remaining=remaining, + evaluation_budget=evaluation_budget, + ) step_result = evaluate_step( study, state, batch_size=current_batch_size, execution_model=execution_model, + evaluation_budget=evaluation_budget, ) batch_records = tuple(outcome.record for outcome in step_result.outcomes) batch_refinements = tuple( @@ -631,18 +783,14 @@ def run( if refinements is not None: refinements.extend(batch_refinements) elif any(refinement is not None for refinement in batch_refinements): - refinement_history: list[ - CandidateRefinement[CandidateT] | None - ] = [None for _index in range(records_before_batch)] + refinement_history: list[CandidateRefinement[CandidateT] | None] = [ + None for _index in range(records_before_batch) + ] refinement_history.extend(batch_refinements) refinements = refinement_history - batch_evaluation_count = sum( - outcome.evaluation_count for outcome in step_result.outcomes - ) - if count_evaluation_cost: - remaining -= batch_evaluation_count - else: + if evaluation_budget is None: remaining -= len(batch_records) + record_budget_remaining = remaining trace = trace.append( TraceEvent( kind="study.step", @@ -650,11 +798,53 @@ def run( value=trace_value_for_records(batch_records), ), ) + if stop_at_checkpoint_boundary: + evaluation_count = ( + max_evaluations - record_budget_remaining + if evaluation_budget is None + else max_evaluations - evaluation_budget.remaining + ) + if study.run_method.is_checkpoint_safe_state(state): + safe_snapshot = _CheckpointSafeRunSnapshot( + records=tuple(records), + refinements=None if refinements is None else tuple(refinements), + trace=trace, + evaluation_count=evaluation_count, + state=state, + ) + if unsafe_since_safe_snapshot: + break + unsafe_since_safe_snapshot = False + else: + unsafe_since_safe_snapshot = True + + if stop_at_checkpoint_boundary and not study.run_method.is_checkpoint_safe_state( + state + ): + if safe_snapshot is None: + msg = ( + "run did not reach a checkpoint-safe state within the evaluation budget" + ) + raise RuntimeError(msg) + return ( + RunReport[CandidateT, StudyEvaluationRecordT].from_records( + records=safe_snapshot.records, + evaluation_count=safe_snapshot.evaluation_count, + trace=safe_snapshot.trace, + refinements=safe_snapshot.refinements, + candidate_equal=study.problem.space.candidates_equal, + ), + safe_snapshot.state, + ) return ( RunReport[CandidateT, StudyEvaluationRecordT].from_records( records=records, - evaluation_count=max_evaluations - remaining, + evaluation_count=( + max_evaluations - record_budget_remaining + if evaluation_budget is None + else max_evaluations - evaluation_budget.remaining + ), trace=trace, refinements=None if refinements is None else tuple(refinements), candidate_equal=study.problem.space.candidates_equal, @@ -719,8 +909,9 @@ def optimize( batch_size: int = 1, *, execution_model: ExecutionModel = SYNC_BATCH_EXECUTION_MODEL, - count_evaluation_cost: bool = False, + count_evaluation_cost: bool = True, initial_state: RunMethodStateT | None = None, + stop_at_checkpoint_boundary: bool = False, ) -> tuple[RunResult[CandidateT], RunMethodStateT]: """Run repeated ask/evaluate/tell steps until the budget is exhausted. @@ -735,11 +926,13 @@ def optimize( Maximum number of proposals requested per step. execution_model : ExecutionModel, default=SYNC_BATCH_EXECUTION_MODEL Execution model controlling evaluation behavior. - count_evaluation_cost : bool, default=False + count_evaluation_cost : bool, default=True Whether to debit the budget using evaluator-reported evaluation counts instead of completed record count. initial_state : RunMethodStateT | None, default=None Optional initial run-method state. ``None`` creates a fresh state. + stop_at_checkpoint_boundary : bool, default=False + Whether to return only checkpoint-safe terminal state boundaries. Returns ------- @@ -751,7 +944,7 @@ def optimize( TypeError If the study does not emit scalar :class:`Observation` records. """ - if _supports_direct_scalar_sequential_path( + if not stop_at_checkpoint_boundary and _supports_direct_scalar_sequential_path( study, execution_model=execution_model, ): @@ -771,6 +964,7 @@ def optimize( execution_model=execution_model, count_evaluation_cost=count_evaluation_cost, initial_state=initial_state, + stop_at_checkpoint_boundary=stop_at_checkpoint_boundary, ) return ( diff --git a/src/variopt/study/stale_async.py b/src/variopt/study/stale_async.py index 28a19af..e65e9b7 100644 --- a/src/variopt/study/stale_async.py +++ b/src/variopt/study/stale_async.py @@ -18,7 +18,7 @@ from ..evaluators.async_evaluator.contracts import AsyncEvaluator from ..evaluators.async_evaluator.sessions import EvaluationBatchSession from ..evaluators.base import Evaluator -from ..execution import STALE_ASYNC_EXECUTION_MODEL +from ..execution import STALE_ASYNC_EXECUTION_MODEL, EvaluationBudget from ..kernel import DirectKernel, Kernel, ProposalBatchQuery from ..methods import RunMethod from ..outcomes import EvaluationOutcome @@ -121,9 +121,7 @@ def poll_completed_groups( msg = "completion group exceeds logical batch bounds" raise ValueError(msg) - group_requests = self.requests[ - completion_group.start_index:end_index - ] + group_requests = self.requests[completion_group.start_index : end_index] validate_aligned_outcomes( group_requests, completion_group.outcomes, @@ -169,7 +167,10 @@ def open_stale_async_batch_session( ], state: RunMethodStateT, batch_size: int, -) -> tuple[StaleAsyncActiveBatchSession[CandidateT, StudyEvaluationRecordT], RunMethodStateT]: + evaluation_budget: EvaluationBudget | None = None, +) -> tuple[ + StaleAsyncActiveBatchSession[CandidateT, StudyEvaluationRecordT], RunMethodStateT +]: """Ask one batch and open its stale-async evaluator session. Parameters @@ -186,6 +187,8 @@ def open_stale_async_batch_session( Current run-method state. batch_size : int Requested logical batch size. + evaluation_budget : EvaluationBudget | None, default=None + Optional hard evaluation-budget ledger consumed before submitting work. Returns ------- @@ -220,6 +223,8 @@ def open_stale_async_batch_session( proposals, proposal_evaluation_specs=proposal_evaluation_specs, ) + if evaluation_budget is not None: + evaluation_budget.consume(len(requests)) return ( StaleAsyncActiveBatchSession( requests=requests, @@ -261,6 +266,7 @@ def _open_stale_async_batch_session_for_study( state: RunMethodStateT, *, batch_size: int, + evaluation_budget: EvaluationBudget | None = None, ) -> tuple[ StaleAsyncActiveBatchSession[CandidateT, StudyEvaluationRecordT], RunMethodStateT, @@ -281,6 +287,7 @@ def _open_stale_async_batch_session_for_study( proposal_evaluation_specs_for=study.run_method.proposal_evaluation_specs, state=state, batch_size=batch_size, + evaluation_budget=evaluation_budget, ) @@ -296,6 +303,7 @@ def run_stale_async( batch_size: int, count_evaluation_cost: bool, initial_state: RunMethodStateT | None, + stop_at_checkpoint_boundary: bool = False, ) -> tuple[RunReport[CandidateT, StudyEvaluationRecordT], RunMethodStateT]: """Run stale-incremental async orchestration with rolling batch refill. @@ -312,6 +320,9 @@ def run_stale_async( instead of completed record count. initial_state : RunMethodStateT | None Optional initial run-method state. + stop_at_checkpoint_boundary : bool, default=False + Whether to roll the returned state/report back to the latest + checkpoint-safe boundary if the budget ends inside an unsafe segment. Returns ------- @@ -323,20 +334,41 @@ def run_stale_async( records: list[StudyEvaluationRecordT] = [] refinements: list[CandidateRefinement[CandidateT] | None] | None = None trace = Trace() - remaining = max_evaluations + evaluation_budget = ( + EvaluationBudget(max_evaluations) if count_evaluation_cost else None + ) + record_budget_remaining = max_evaluations state = ( study.run_method.create_initial_state() if initial_state is None else initial_state ) + safe_records: tuple[StudyEvaluationRecordT, ...] | None = None + safe_refinements: tuple[CandidateRefinement[CandidateT] | None, ...] | None = None + safe_trace = trace + safe_evaluation_count = 0 + safe_state = state + if stop_at_checkpoint_boundary and study.run_method.is_checkpoint_safe_state(state): + safe_records = () active_sessions: list[ StaleAsyncActiveBatchSession[CandidateT, StudyEvaluationRecordT] ] = [] try: while active_sessions or ( - remaining > 0 and not study.run_method.is_exhausted(state) + ( + record_budget_remaining + if evaluation_budget is None + else evaluation_budget.remaining + ) + > 0 + and not study.run_method.is_exhausted(state) ): + remaining = ( + record_budget_remaining + if evaluation_budget is None + else evaluation_budget.remaining + ) if ( len(active_sessions) == 0 and remaining > 0 @@ -347,6 +379,7 @@ def run_stale_async( study, state, batch_size=current_batch_size, + evaluation_budget=evaluation_budget, ) active_sessions.append(active_session) @@ -362,12 +395,21 @@ def run_stale_async( for active_session in active_sessions: completed_groups = active_session.poll_completed_groups() for completed_group in completed_groups: - group_records = tuple( - outcome.record for outcome in completed_group - ) + group_records = tuple(outcome.record for outcome in completed_group) group_refinements = tuple( outcome.refinement for outcome in completed_group ) + group_evaluation_count = sum( + outcome.evaluation_count for outcome in completed_group + ) + if evaluation_budget is not None: + unmetered_evaluation_count = group_evaluation_count - len( + group_records, + ) + if unmetered_evaluation_count > 0: + evaluation_budget.consume(unmetered_evaluation_count) + else: + record_budget_remaining -= len(group_records) records_before_group = len(records) records.extend(group_records) if refinements is not None: @@ -384,13 +426,6 @@ def run_stale_async( state, completed_group, ) - group_evaluation_count = sum( - outcome.evaluation_count for outcome in completed_group - ) - if count_evaluation_cost: - remaining -= group_evaluation_count - else: - remaining -= len(group_records) trace = trace.append( TraceEvent( kind="study.step", @@ -398,13 +433,36 @@ def run_stale_async( value=trace_value_for_records(group_records), ), ) + if ( + stop_at_checkpoint_boundary + and study.run_method.is_checkpoint_safe_state(state) + ): + safe_records = tuple(records) + safe_refinements = ( + None if refinements is None else tuple(refinements) + ) + safe_trace = trace + safe_evaluation_count = ( + max_evaluations - record_budget_remaining + if evaluation_budget is None + else max_evaluations - evaluation_budget.remaining + ) + safe_state = state + remaining = ( + record_budget_remaining + if evaluation_budget is None + else evaluation_budget.remaining + ) if remaining > 0 and not study.run_method.is_exhausted(state): refill_batch_size = min(len(group_records), remaining) - refill_session, state = _open_stale_async_batch_session_for_study( - study, - state, - batch_size=refill_batch_size, + refill_session, state = ( + _open_stale_async_batch_session_for_study( + study, + state, + batch_size=refill_batch_size, + evaluation_budget=evaluation_budget, + ) ) refill_sessions.append(refill_session) @@ -417,10 +475,33 @@ def run_stale_async( active_session.cancel() raise + if stop_at_checkpoint_boundary and not study.run_method.is_checkpoint_safe_state( + state + ): + if safe_records is None: + msg = ( + "run did not reach a checkpoint-safe state within the evaluation budget" + ) + raise RuntimeError(msg) + return ( + RunReport[CandidateT, StudyEvaluationRecordT].from_records( + records=safe_records, + evaluation_count=safe_evaluation_count, + trace=safe_trace, + refinements=safe_refinements, + candidate_equal=study.problem.space.candidates_equal, + ), + safe_state, + ) + return ( RunReport[CandidateT, StudyEvaluationRecordT].from_records( records=records, - evaluation_count=max_evaluations - remaining, + evaluation_count=( + max_evaluations - record_budget_remaining + if evaluation_budget is None + else max_evaluations - evaluation_budget.remaining + ), trace=trace, refinements=None if refinements is None else tuple(refinements), candidate_equal=study.problem.space.candidates_equal, diff --git a/tests/csa/test_csa_checkpoint.py b/tests/csa/test_csa_checkpoint.py index 10fd200..2819bbc 100644 --- a/tests/csa/test_csa_checkpoint.py +++ b/tests/csa/test_csa_checkpoint.py @@ -4,13 +4,17 @@ from dataclasses import replace import pytest +from typing_extensions import override from variopt import ( IntegerSpace, + Objective, Observation, + Problem, Proposal, RealSpace, RecordSpace, + Study, TupleSpace, ) from variopt.algorithms.population.csa import CSAOptimizer, CSAProfile @@ -71,6 +75,7 @@ from variopt.algorithms.population.csa.selection.state import ( SeedSelectionState, ) +from variopt.evaluators import SequentialEvaluator from variopt.json_types import JSONValue from variopt.randomness import RandomStateSnapshot from variopt.spaces import RecordCandidate, SpaceBoundaryValue, SpaceCandidateValue @@ -101,6 +106,14 @@ def _evaluate_proposals( ) +class _SquareObjective(Objective[int]): + """Integer square objective used for CSA study checkpoint tests.""" + + @override + def evaluate(self, candidate: int) -> float: + return float(candidate * candidate) + + def _advance_to_safe_boundary( optimizer: CSAOptimizer[int, int], state: CSAEngineState[int], @@ -115,7 +128,10 @@ def _advance_to_safe_boundary( observations = _evaluate_proposals(proposals) next_state = optimizer.tell(next_state, observations) trace.append((proposals, observations)) - if next_state.pending_proposals.is_empty and not next_state.generation_state.is_active: + if ( + next_state.pending_proposals.is_empty + and not next_state.generation_state.is_active + ): return tuple(trace), next_state @@ -346,7 +362,9 @@ def test_rejects_checkpoint_when_pending_attributions_exist(self) -> None: class CSAOptimizerCheckpointTests: """Regression tests for public optimizer checkpoint helpers.""" - def test_structured_optimizer_checkpoint_round_trip_matches_uninterrupted_run(self) -> None: + def test_structured_optimizer_checkpoint_round_trip_matches_uninterrupted_run( + self, + ) -> None: optimizer = CSAOptimizer.from_space_defaults( space=IntegerSpace(-10, 10), bank_capacity=6, @@ -381,6 +399,39 @@ def test_structured_optimizer_checkpoint_round_trip_matches_uninterrupted_run(se uninterrupted_state, ) + def test_study_optimize_can_stop_at_checkpoint_safe_boundary(self) -> None: + optimizer = CSAOptimizer.from_space_defaults( + space=IntegerSpace(-10, 10), + bank_capacity=6, + profile=CSAProfile(seed_count=3), + random_state=7, + ) + study = Study( + problem=Problem( + space=IntegerSpace(-10, 10), + objective=_SquareObjective(), + ), + run_method=optimizer, + evaluator=SequentialEvaluator[int, int](), + ) + + full_result, unsafe_state = study.optimize(max_evaluations=7) + safe_result, safe_state = study.optimize( + max_evaluations=7, + stop_at_checkpoint_boundary=True, + ) + + assert full_result.evaluation_count == 7 + assert not optimizer.is_checkpoint_safe_state(unsafe_state) + with pytest.raises(ValueError): + _ = optimizer.state_to_dict(unsafe_state) + + assert 0 < safe_result.evaluation_count < full_result.evaluation_count + assert optimizer.is_checkpoint_safe_state(safe_state) + snapshot = optimizer.state_to_dict(safe_state) + restored_state = optimizer.state_from_dict(snapshot) + assert optimizer.state_to_dict(restored_state) == snapshot + def test_record_space_checkpoint_restore_preserves_record_candidates(self) -> None: space = RecordSpace( x=RealSpace(0.0, 1.0), diff --git a/tests/study/test_study.py b/tests/study/test_study.py index 77d8d5d..7636a20 100644 --- a/tests/study/test_study.py +++ b/tests/study/test_study.py @@ -32,6 +32,7 @@ ) from variopt import ( CandidateRefinement, + EvaluationBudgetExhausted, EvaluationOutcome, EvaluationRequest, IntegerSpace, @@ -94,8 +95,7 @@ def run( direction=query.problem.direction, ), evaluation_count=( - first_outcome.evaluation_count - + second_outcome.evaluation_count + first_outcome.evaluation_count + second_outcome.evaluation_count ), ), ) @@ -260,7 +260,9 @@ def test_step_propagates_run_method_kernel_hints_into_kernel_query( contexts = kernel.queries[0].proposal_kernel_hints assert contexts is not None assert contexts is not None - assert all(isinstance(context, ProposalLocalSearchContext) for context in contexts) + assert all( + isinstance(context, ProposalLocalSearchContext) for context in contexts + ) typed_contexts = cast(tuple[ProposalLocalSearchContext, ...], contexts) assert tuple(context.local_budget for context in typed_contexts) == (1, 2) @@ -513,7 +515,9 @@ def test_optimize_rejects_non_scalar_evaluation_records(self) -> None: with pytest.raises(TypeError): _ = study.optimize(max_evaluations=1) - def test_run_returns_terminal_report_for_non_scalar_evaluation_records(self) -> None: + def test_run_returns_terminal_report_for_non_scalar_evaluation_records( + self, + ) -> None: problem = Problem( space=IntegerSpace(low=0, high=10), evaluation_protocol=LabelProtocol(), @@ -532,10 +536,16 @@ def test_run_returns_terminal_report_for_non_scalar_evaluation_records(self) -> assert isinstance(report, RunReport) assert report.evaluation_count == 2 assert len(report.records) == 2 - assert tuple(record.label for record in report.records) == ("parity:1", "parity:0") + assert tuple(record.label for record in report.records) == ( + "parity:1", + "parity:0", + ) assert len(report.trace.events) == 2 assert all(event.value is None for event in report.trace.events) - assert final_state.tell_history == (tuple(report.records[:1]), tuple(report.records[1:])) + assert final_state.tell_history == ( + tuple(report.records[:1]), + tuple(report.records[1:]), + ) def test_optimize_returns_terminal_run_result(self) -> None: problem = Problem( @@ -596,7 +606,9 @@ def fail_evaluate_step(*args: object, **kwargs: object) -> object: result, final_state = study.optimize(max_evaluations=3, batch_size=2) - assert tuple(observation.proposal.proposal_id for observation in result.observations) == ( + assert tuple( + observation.proposal.proposal_id for observation in result.observations + ) == ( "p-1", "p-2", "p-3", @@ -634,7 +646,10 @@ def test_optimize_fast_path_preserves_outcome_aware_tell_hook(self) -> None: result, _ = study.optimize(max_evaluations=2, batch_size=2) - assert tuple(observation.candidate for observation in result.observations) == (4, 2) + assert tuple(observation.candidate for observation in result.observations) == ( + 4, + 2, + ) assert optimizer.seen_changed_leaf_paths == (None, None) def test_optimize_fast_path_carries_result_candidate_equality(self) -> None: @@ -900,7 +915,7 @@ def test_run_backfills_unrefined_history_when_late_refinement_appears(self) -> N assert late_refinement.source_candidate == 3 assert late_refinement.refined_candidate == report.records[1].candidate - def test_run_preserves_refinement_when_evaluation_cost_overshoots_budget( + def test_run_rejects_refinement_when_evaluation_cost_overshoots_budget( self, ) -> None: problem = Problem( @@ -918,18 +933,11 @@ def test_run_preserves_refinement_when_evaluation_cost_overshoots_budget( kernel=ScoringKernel(), ) - report, _ = study.run( - max_evaluations=3, - count_evaluation_cost=True, - ) - - assert report.evaluation_count == 7 - assert len(report.records) == 1 - assert len(report.refinements) == 1 - refinement = report.refinements[0] - assert refinement is not None - assert refinement.source_candidate == 4 - assert refinement.refined_candidate == report.records[0].candidate + with pytest.raises(EvaluationBudgetExhausted): + _ = study.run( + max_evaluations=3, + count_evaluation_cost=True, + ) def test_run_uses_space_candidate_equality_for_report_refinements(self) -> None: problem = Problem( @@ -953,7 +961,10 @@ def test_run_uses_space_candidate_equality_for_report_refinements(self) -> None: assert len(report.refinements) == 1 refinement = report.refinements[0] assert refinement is not None - assert refinement.refined_candidate.stable_id == report.records[0].candidate.stable_id + assert ( + refinement.refined_candidate.stable_id + == report.records[0].candidate.stable_id + ) def test_study_kernel_makes_local_optimization_visible(self) -> None: problem = Problem( @@ -998,7 +1009,6 @@ def test_study_kernel_can_supply_precomputed_objective_value_and_cost(self) -> N result, final_state = study.optimize( max_evaluations=7, - count_evaluation_cost=True, ) assert objective.evaluation_count == 0 @@ -1015,7 +1025,9 @@ def test_study_kernel_can_supply_precomputed_objective_value_and_cost(self) -> N assert refinement.refined_candidate == 2 assert len(final_state.tell_history) == 1 - def test_optimize_uses_space_candidate_equality_for_result_refinements(self) -> None: + def test_optimize_uses_space_candidate_equality_for_result_refinements( + self, + ) -> None: problem = Problem( space=SpaceOwnedEqualitySpace(), objective=SpaceOwnedEqualityObjective(), @@ -1037,7 +1049,10 @@ def test_optimize_uses_space_candidate_equality_for_result_refinements(self) -> assert len(result.refinements) == 1 refinement = result.refinements[0] assert refinement is not None - assert refinement.refined_candidate.stable_id == result.observations[0].candidate.stable_id + assert ( + refinement.refined_candidate.stable_id + == result.observations[0].candidate.stable_id + ) def test_optimize_backfills_unrefined_history_when_late_refinement_appears( self, @@ -1119,7 +1134,10 @@ def test_study_passes_evaluator_execution_resources_to_kernel(self) -> None: assert kernel.last_execution_resources is not None assert kernel.last_execution_resources.parallel_owner == "evaluator" - assert kernel.last_execution_resources.nested_parallelism_policy == NestedParallelismPolicy.FORBID + assert ( + kernel.last_execution_resources.nested_parallelism_policy + == NestedParallelismPolicy.FORBID + ) assert kernel.last_execution_resources.owner_worker_count == 1 assert kernel.last_execution_resources.owner_backend == "sequential" @@ -1138,7 +1156,9 @@ def test_step_rejects_non_positive_batch_size(self) -> None: with pytest.raises(ValueError): _ = study.step(state, batch_size=0) - def test_step_rejects_sequential_model_with_batch_size_greater_than_one(self) -> None: + def test_step_rejects_sequential_model_with_batch_size_greater_than_one( + self, + ) -> None: problem = Problem( space=IntegerSpace(low=0, high=10), objective=SquareObjective(), @@ -1155,7 +1175,9 @@ def test_step_rejects_sequential_model_with_batch_size_greater_than_one(self) -> study = Study(problem=problem, run_method=optimizer, evaluator=evaluator) state = optimizer.create_initial_state() - with pytest.raises(ValueError, match="sequential execution model requires batch_size == 1"): + with pytest.raises( + ValueError, match="sequential execution model requires batch_size == 1" + ): _ = study.step( state, batch_size=2, @@ -1174,7 +1196,10 @@ def test_step_rejects_unsupported_execution_model(self) -> None: study = Study(problem=problem, run_method=optimizer, evaluator=evaluator) state = optimizer.create_initial_state() - with pytest.raises(ValueError, match="run_method does not support the requested execution model: exact_async"): + with pytest.raises( + ValueError, + match="run_method does not support the requested execution model: exact_async", + ): _ = study.step( state, execution_model=EXACT_ASYNC_EXECUTION_MODEL, @@ -1200,7 +1225,9 @@ def test_study_rejects_misaligned_evaluator_outcomes(self) -> None: ) state = optimizer.create_initial_state() - with pytest.raises(ValueError, match="evaluator outcomes must align with input request order"): + with pytest.raises( + ValueError, match="evaluator outcomes must align with input request order" + ): _ = study.step(state, batch_size=2) def test_optimize_allows_zero_evaluations(self) -> None: @@ -1273,7 +1300,7 @@ def test_optimize_respects_maximize_direction(self) -> None: assert result.best_observation.value == 16.0 assert result.best_observation.score == -16.0 - def test_optimize_can_budget_by_local_optimization_cost(self) -> None: + def test_optimize_budgets_by_local_optimization_cost_by_default(self) -> None: problem = Problem( space=IntegerSpace(low=0, high=10), objective=SquareObjective(), @@ -1294,7 +1321,6 @@ def test_optimize_can_budget_by_local_optimization_cost(self) -> None: result, final_state = study.optimize( max_evaluations=7, - count_evaluation_cost=True, ) assert len(result.observations) == 1 diff --git a/tests/study/test_study_stale_async.py b/tests/study/test_study_stale_async.py index 37db1bd..5916519 100644 --- a/tests/study/test_study_stale_async.py +++ b/tests/study/test_study_stale_async.py @@ -6,6 +6,7 @@ OutOfOrderAsyncEvaluator, RecordingKernel, RollingStaleAsyncOptimizer, + SessionRecordingAsyncEvaluator, ShiftedObservationProtocol, SpaceOwnedEqualityAsyncEvaluator, SpaceOwnedEqualityObjective, @@ -32,7 +33,10 @@ def test_step_rejects_stale_async_model_outside_run_or_optimize(self) -> None: study = Study(problem=problem, run_method=optimizer, evaluator=evaluator) state = optimizer.create_initial_state() - with pytest.raises(NotImplementedError, match="stale_async execution model is only supported by Study.run and Study.optimize"): + with pytest.raises( + NotImplementedError, + match="stale_async execution model is only supported by Study.run and Study.optimize", + ): _ = study.step( state, execution_model=STALE_ASYNC_EXECUTION_MODEL, @@ -62,16 +66,44 @@ def test_run_stale_async_assimilates_incrementally_and_refills_frontier( assert final_state.ask_history == (2, 1, 1) assert tuple( - observation.proposal.proposal_id - for observation_batch in final_state.tell_history - for observation in observation_batch - ) == ("p-2", "p-1", "spawn-p-2", "spawn-p-1") + observation.proposal.proposal_id + for observation_batch in final_state.tell_history + for observation in observation_batch + ) == ("p-2", "p-1", "spawn-p-2", "spawn-p-1") assert tuple( - observation.proposal.proposal_id - for observation in report.records - ) == ("p-2", "p-1", "spawn-p-2", "spawn-p-1") + observation.proposal.proposal_id for observation in report.records + ) == ("p-2", "p-1", "spawn-p-2", "spawn-p-1") assert report.refinements == () + def test_run_stale_async_refill_respects_default_evaluation_budget(self) -> None: + problem = Problem( + space=IntegerSpace(low=0, high=20), + objective=SquareObjective(), + ) + optimizer = RollingStaleAsyncOptimizer( + proposals=( + Proposal(candidate=4, proposal_id="p-1"), + Proposal(candidate=2, proposal_id="p-2"), + ), + ) + evaluator = SessionRecordingAsyncEvaluator() + study = Study(problem=problem, run_method=optimizer, evaluator=evaluator) + + report, final_state = study.run( + max_evaluations=3, + batch_size=2, + execution_model=STALE_ASYNC_EXECUTION_MODEL, + ) + + assert evaluator.opened_batch_sizes == (2, 1) + assert final_state.ask_history == (2, 1) + assert report.evaluation_count == 3 + assert tuple(record.proposal.proposal_id for record in report.records) == ( + "p-2", + "p-1", + "spawn-p-2", + ) + def test_run_stale_async_preserves_refinement_completion_order(self) -> None: problem = Problem( space=IntegerSpace(low=0, high=20), @@ -107,10 +139,10 @@ def test_run_stale_async_preserves_refinement_completion_order(self) -> None: assert second_refinement.source_candidate == 14 assert second_refinement.refined_candidate == 13 assert tuple( - observation.proposal.proposal_id - for observation_batch in final_state.tell_history - for observation in observation_batch - ) == ("p-2", "p-1") + observation.proposal.proposal_id + for observation_batch in final_state.tell_history + for observation in observation_batch + ) == ("p-2", "p-1") def test_run_stale_async_uses_space_candidate_equality_for_refinement( self, @@ -131,7 +163,10 @@ def test_run_stale_async_uses_space_candidate_equality_for_refinement( assert len(report.refinements) == 1 refinement = report.refinements[0] assert refinement is not None - assert refinement.refined_candidate.stable_id == report.records[0].candidate.stable_id + assert ( + refinement.refined_candidate.stable_id + == report.records[0].candidate.stable_id + ) def test_optimize_stale_async_uses_space_candidate_equality_for_refinement( self, @@ -178,9 +213,8 @@ def test_optimize_stale_async_projects_completion_order_refinements(self) -> Non ) assert tuple( - observation.proposal.proposal_id - for observation in result.observations - ) == ("p-2", "p-1") + observation.proposal.proposal_id for observation in result.observations + ) == ("p-2", "p-1") assert tuple(observation.candidate for observation in result.observations) == ( 11, 13, @@ -195,10 +229,10 @@ def test_optimize_stale_async_projects_completion_order_refinements(self) -> Non assert second_refinement.source_candidate == 14 assert second_refinement.refined_candidate == 13 assert tuple( - observation.proposal.proposal_id - for observation_batch in final_state.tell_history - for observation in observation_batch - ) == ("p-2", "p-1") + observation.proposal.proposal_id + for observation_batch in final_state.tell_history + for observation in observation_batch + ) == ("p-2", "p-1") def test_run_stale_async_rejects_non_direct_kernel(self) -> None: problem = Problem( @@ -216,7 +250,10 @@ def test_run_stale_async_rejects_non_direct_kernel(self) -> None: kernel=RecordingKernel(), ) - with pytest.raises(ValueError, match="stale_async execution model currently requires DirectKernel"): + with pytest.raises( + ValueError, + match="stale_async execution model currently requires DirectKernel", + ): _ = study.run( max_evaluations=1, execution_model=STALE_ASYNC_EXECUTION_MODEL, From 88f09855f175d1a47451608b0c24759d4741e992 Mon Sep 17 00:00:00 2001 From: isty2e Date: Thu, 2 Jul 2026 22:05:10 +0900 Subject: [PATCH 2/6] fix(local-search): reserve batch budget during refinement --- .../algorithms/local_search/scipy/kernel.py | 132 +++++++++++++--- .../local_search/structured/hill_climb.py | 15 +- .../local_search/structured/iterated.py | 29 +++- .../structured/runtime/artifacts.py | 13 +- .../structured/runtime/prepared.py | 17 +- .../local_search/structured/runtime/search.py | 147 +++++++++++++----- .../local_search/structured/scheduled.py | 18 ++- .../local_search/structured/stochastic.py | 15 +- .../structured/variable_neighborhood.py | 22 ++- .../test_scipy_local_optimization.py | 119 ++++++++++++-- .../test_structured_local_optimization.py | 120 ++++++++++---- 11 files changed, 521 insertions(+), 126 deletions(-) diff --git a/src/variopt/algorithms/local_search/scipy/kernel.py b/src/variopt/algorithms/local_search/scipy/kernel.py index a12e99e..f8fc0bf 100644 --- a/src/variopt/algorithms/local_search/scipy/kernel.py +++ b/src/variopt/algorithms/local_search/scipy/kernel.py @@ -14,6 +14,7 @@ Proposal, ProposalEvaluationSpec, ) +from ....execution import EvaluationBudgetExhausted from ....kernel import ( Kernel, KernelDiagnostics, @@ -89,7 +90,8 @@ def _as_local_search_context( @dataclass(frozen=True, slots=True) -class ScipyMinimizeKernel(FrozenGenericSlotsCompat, +class ScipyMinimizeKernel( + FrozenGenericSlotsCompat, Kernel[ ProposalBatchQuery[ BoundaryT, @@ -228,6 +230,7 @@ def _evaluate_proposal( if proposal_evaluation_spec is None else (proposal_evaluation_spec,) ), + evaluation_budget=query.evaluation_budget, ), ) if len(local_outcomes) != 1: @@ -272,6 +275,7 @@ def _evaluate_candidate( if proposal_evaluation_spec is None else (proposal_evaluation_spec,) ), + evaluation_budget=query.evaluation_budget, ), ) if len(local_outcomes) != 1: @@ -293,6 +297,7 @@ def _optimize_proposal( [ProposalBatchQuery[BoundaryT, ContinuousCandidateT]], tuple[EvaluationOutcome[ContinuousCandidateT], ...], ], + reserved_count: int, ) -> EvaluationOutcome[ContinuousCandidateT]: """Run one local descent episode for one original proposal.""" context = self._proposal_context(query=query, proposal_index=proposal_index) @@ -317,10 +322,46 @@ def _optimize_proposal( EvaluationOutcome[ContinuousCandidateT], ] = {} + def can_evaluate_local_candidate() -> bool: + budget = query.evaluation_budget + return budget is None or budget.can_consume(1 + reserved_count) + + def budget_exhausted_outcome( + optimized_outcome: EvaluationOutcome[ContinuousCandidateT], + ) -> EvaluationOutcome[ContinuousCandidateT]: + optimized_candidate = optimized_outcome.record.candidate + refinement = _candidate_refinement_from_codec( + codec=codec, + source_candidate=proposal.candidate, + refined_candidate=optimized_candidate, + ) + return EvaluationOutcome( + record=Observation( + proposal=proposal, + proposal_evaluation_spec=proposal_evaluation_spec, + candidate=optimized_candidate, + value=optimized_outcome.record.value, + score=optimized_outcome.record.score, + elapsed_seconds=optimized_outcome.record.elapsed_seconds, + ), + evaluation_count=evaluation_count, + kernel_diagnostics=KernelDiagnostics( + backend="scipy.optimize.minimize", + method=self.method, + status=KernelStatus.STOPPED, + message="evaluation budget exhausted before local convergence", + ), + refinement=refinement, + candidate_equal=query.problem.space.candidates_equal, + ) + def objective_in_coordinate_space( coordinates: Sequence[float], ) -> float: nonlocal evaluation_count + if not can_evaluate_local_candidate(): + msg = "evaluation budget exhausted" + raise EvaluationBudgetExhausted(msg) coordinate_key = tuple(float(coordinate) for coordinate in coordinates) local_candidate = codec.candidate_from_coordinates( proposal.candidate, @@ -339,41 +380,71 @@ def objective_in_coordinate_space( evaluated_outcomes_by_coordinates[coordinate_key] = local_outcome return local_outcome.record.score - scipy_result = ScipyMinimizeResult.from_optimize_result( - run_scipy_minimize( - objective_in_coordinate_space=objective_in_coordinate_space, - initial_coordinates=initial_coordinates, - method=self.method, - coordinate_bounds=codec.coordinate_bounds, - tolerance=self.tolerance, - options=self._scipy_options(context=context), + try: + scipy_result = ScipyMinimizeResult.from_optimize_result( + run_scipy_minimize( + objective_in_coordinate_space=objective_in_coordinate_space, + initial_coordinates=initial_coordinates, + method=self.method, + coordinate_bounds=codec.coordinate_bounds, + tolerance=self.tolerance, + options=self._scipy_options(context=context), + ) ) - ) + except EvaluationBudgetExhausted: + if len(evaluated_outcomes_by_coordinates) == 0: + raise + optimized_outcome = min( + evaluated_outcomes_by_coordinates.values(), + key=lambda outcome: outcome.record.score, + ) + return budget_exhausted_outcome(optimized_outcome) if not scipy_result.has_finite_solution: original_outcome = evaluated_outcomes_by_coordinates.get( initial_coordinates, ) if original_outcome is None: - original_outcome = self._evaluate_proposal( - query=query, - proposal=proposal, - proposal_evaluation_spec=proposal_evaluation_spec, - runner=runner, + if ( + query.evaluation_budget is not None + and not can_evaluate_local_candidate() + and len(evaluated_outcomes_by_coordinates) > 0 + ): + original_outcome = min( + evaluated_outcomes_by_coordinates.values(), + key=lambda outcome: outcome.record.score, + ) + else: + original_outcome = self._evaluate_proposal( + query=query, + proposal=proposal, + proposal_evaluation_spec=proposal_evaluation_spec, + runner=runner, + ) + evaluation_count += original_outcome.evaluation_count + + fallback_candidate = original_outcome.record.candidate + refinement = None + if not query.problem.space.candidates_equal( + proposal.candidate, + fallback_candidate, + ): + refinement = _candidate_refinement_from_codec( + codec=codec, + source_candidate=proposal.candidate, + refined_candidate=fallback_candidate, ) - evaluation_count += original_outcome.evaluation_count - return EvaluationOutcome( record=Observation( proposal=proposal, proposal_evaluation_spec=proposal_evaluation_spec, - candidate=proposal.candidate, + candidate=fallback_candidate, value=original_outcome.record.value, score=original_outcome.record.score, elapsed_seconds=original_outcome.record.elapsed_seconds, ), evaluation_count=evaluation_count, kernel_diagnostics=scipy_result.diagnostics(method=self.method), - refinement=None, + refinement=refinement, candidate_equal=query.problem.space.candidates_equal, ) @@ -384,6 +455,17 @@ def objective_in_coordinate_space( ) optimized_outcome = evaluated_outcomes_by_coordinates.get(optimized_coordinates) if optimized_outcome is None: + if ( + query.evaluation_budget is not None + and not can_evaluate_local_candidate() + and len(evaluated_outcomes_by_coordinates) > 0 + ): + optimized_outcome = min( + evaluated_outcomes_by_coordinates.values(), + key=lambda outcome: outcome.record.score, + ) + return budget_exhausted_outcome(optimized_outcome) + optimized_outcome = self._evaluate_candidate( query=query, candidate=optimized_candidate, @@ -434,10 +516,13 @@ def run( tuple[EvaluationOutcome[ContinuousCandidateT], ...] Locally improved outcomes aligned to ``query.proposals``. """ - prepared_codec: ContinuousStructuredSpaceCodec[ - BoundaryT, - ContinuousCandidateT, - ] | None = None + prepared_codec: ( + ContinuousStructuredSpaceCodec[ + BoundaryT, + ContinuousCandidateT, + ] + | None + ) = None def codec_provider() -> ContinuousStructuredSpaceCodec[ BoundaryT, @@ -458,6 +543,7 @@ def codec_provider() -> ContinuousStructuredSpaceCodec[ proposal=proposal, codec_provider=codec_provider, runner=runner, + reserved_count=len(query.proposals) - proposal_index - 1, ) for proposal_index, proposal in enumerate(query.proposals) ) diff --git a/src/variopt/algorithms/local_search/structured/hill_climb.py b/src/variopt/algorithms/local_search/structured/hill_climb.py index 4425f83..e029f96 100644 --- a/src/variopt/algorithms/local_search/structured/hill_climb.py +++ b/src/variopt/algorithms/local_search/structured/hill_climb.py @@ -30,7 +30,8 @@ @dataclass(frozen=True, slots=True) -class StructuredHillClimbKernel(FrozenGenericSlotsCompat, +class StructuredHillClimbKernel( + FrozenGenericSlotsCompat, Kernel[ ProposalBatchQuery[ BoundaryT, @@ -132,6 +133,7 @@ def _optimize_proposal( runtime: PreparedStructuredLocalSearchRuntime[BoundaryT, StructuredCandidateT], proposal_index: int, proposal: Proposal[StructuredCandidateT], + reserved_count: int, ) -> EvaluationOutcome[StructuredCandidateT]: """Run one local hill-climb episode for one original proposal.""" runtime.neighborhood.space.validate(proposal.candidate) @@ -164,6 +166,7 @@ def _optimize_proposal( context=context, ) + budget_exhausted = False while completed_steps < episode_max_steps: improved = False for path, leaf_space in leaf_schedule: @@ -175,6 +178,9 @@ def _optimize_proposal( leaf_space, current_leaf_value, ): + if not runtime.can_evaluate(reserved_count=reserved_count): + budget_exhausted = True + break proposed_candidate = runtime.neighborhood.space.replace_leaf_values( current_candidate, {path: replacement}, @@ -194,15 +200,19 @@ def _optimize_proposal( completed_steps += 1 improved = True break - if improved: + if improved or budget_exhausted: break + if budget_exhausted: + break if not improved: converged = True break status = KernelStatus.STOPPED message = "max_steps reached before local convergence" + if budget_exhausted: + message = "evaluation budget exhausted before local convergence" if converged: status = KernelStatus.CONVERGED message = "no improving leafwise move found" @@ -268,6 +278,7 @@ def run( runtime=runtime, proposal_index=proposal_index, proposal=proposal, + reserved_count=len(query.proposals) - proposal_index - 1, ) for proposal_index, proposal in enumerate(query.proposals) ) diff --git a/src/variopt/algorithms/local_search/structured/iterated.py b/src/variopt/algorithms/local_search/structured/iterated.py index 61c3b1b..e391744 100644 --- a/src/variopt/algorithms/local_search/structured/iterated.py +++ b/src/variopt/algorithms/local_search/structured/iterated.py @@ -38,7 +38,8 @@ @dataclass(frozen=True, slots=True) -class StructuredIteratedLocalSearchKernel(FrozenGenericSlotsCompat, +class StructuredIteratedLocalSearchKernel( + FrozenGenericSlotsCompat, Kernel[ ProposalBatchQuery[ BoundaryT, @@ -147,6 +148,7 @@ def _optimize_proposal( proposal_index: int, proposal: Proposal[StructuredCandidateT], random_state: np.random.RandomState, + reserved_count: int, ) -> EvaluationOutcome[StructuredCandidateT]: """Run one iterated local-search episode for one original proposal.""" runtime.neighborhood.space.validate(proposal.candidate) @@ -173,6 +175,7 @@ def _optimize_proposal( proposal_evaluation_spec=proposal_evaluation_spec, leaf_schedule=leaf_schedule, max_steps=episode_max_steps, + reserved_count=reserved_count, ) incumbent_record = incumbent_result.record incumbent_candidate = incumbent_record.candidate @@ -183,8 +186,13 @@ def _optimize_proposal( accepted_refinement = completed_steps > 0 kick_count = 0 terminal_message = "max_kicks reached before iterated local-search termination" + budget_exhausted = incumbent_result.budget_exhausted - while completed_steps < episode_max_steps and kick_count < self.max_kicks: + while ( + not budget_exhausted + and completed_steps < episode_max_steps + and kick_count < self.max_kicks + ): kicked_candidate = sample_structured_kick_candidate( runtime=runtime, candidate=incumbent_candidate, @@ -193,7 +201,13 @@ def _optimize_proposal( random_state=random_state, ) if kicked_candidate is None: - terminal_message = "no admissible kick found for the configured kick policy" + terminal_message = ( + "no admissible kick found for the configured kick policy" + ) + break + + if not runtime.can_evaluate(reserved_count=reserved_count): + budget_exhausted = True break kick_count += 1 @@ -204,9 +218,11 @@ def _optimize_proposal( proposal_evaluation_spec=proposal_evaluation_spec, leaf_schedule=leaf_schedule, max_steps=episode_max_steps - completed_steps, + reserved_count=reserved_count, ) evaluation_count += kicked_result.evaluation_count completed_steps += kicked_result.completed_steps + budget_exhausted = kicked_result.budget_exhausted kicked_record = kicked_result.record if accepts_strict_improvement( incumbent_score=incumbent_score, @@ -218,8 +234,12 @@ def _optimize_proposal( accepted_refinement = True status = KernelStatus.STOPPED + if budget_exhausted: + terminal_message = "evaluation budget exhausted before local convergence" if completed_steps >= episode_max_steps: - terminal_message = "max_steps reached before iterated local-search termination" + terminal_message = ( + "max_steps reached before iterated local-search termination" + ) refinement = None if accepted_refinement: @@ -284,6 +304,7 @@ def run( proposal_index=proposal_index, proposal=proposal, random_state=random_state, + reserved_count=len(query.proposals) - proposal_index - 1, ) for proposal_index, proposal in enumerate(query.proposals) ) diff --git a/src/variopt/algorithms/local_search/structured/runtime/artifacts.py b/src/variopt/algorithms/local_search/structured/runtime/artifacts.py index 33e3620..bdfe150 100644 --- a/src/variopt/algorithms/local_search/structured/runtime/artifacts.py +++ b/src/variopt/algorithms/local_search/structured/runtime/artifacts.py @@ -12,7 +12,8 @@ @dataclass(frozen=True, slots=True) -class StructuredVariableNeighborhoodStageAttempt(FrozenGenericSlotsCompat, +class StructuredVariableNeighborhoodStageAttempt( + FrozenGenericSlotsCompat, Generic[StructuredCandidateT], ): """One attempted neighborhood stage inside a variable-neighborhood episode. @@ -27,16 +28,21 @@ class StructuredVariableNeighborhoodStageAttempt(FrozenGenericSlotsCompat, Terminal kernel status after the stage. terminal_message : str Human-readable terminal status message. + budget_exhausted : bool, default=False + Whether the stage stopped because no evaluation budget remained. """ improved_outcome: EvaluationOutcome[StructuredCandidateT] | None evaluation_count: int terminal_status: KernelStatus terminal_message: str + budget_exhausted: bool = False @dataclass(frozen=True, slots=True) -class StructuredLocalImprovementResult(FrozenGenericSlotsCompat, Generic[StructuredCandidateT]): +class StructuredLocalImprovementResult( + FrozenGenericSlotsCompat, Generic[StructuredCandidateT] +): """One completed inner local-improvement episode over a fixed incumbent. Parameters @@ -49,9 +55,12 @@ class StructuredLocalImprovementResult(FrozenGenericSlotsCompat, Generic[Structu Number of completed neighborhood steps. converged : bool Whether the episode converged without finding further improvements. + budget_exhausted : bool, default=False + Whether the episode stopped because no evaluation budget remained. """ record: Observation[StructuredCandidateT] evaluation_count: int completed_steps: int converged: bool + budget_exhausted: bool = False diff --git a/src/variopt/algorithms/local_search/structured/runtime/prepared.py b/src/variopt/algorithms/local_search/structured/runtime/prepared.py index da7a8d0..16f22be 100644 --- a/src/variopt/algorithms/local_search/structured/runtime/prepared.py +++ b/src/variopt/algorithms/local_search/structured/runtime/prepared.py @@ -39,7 +39,8 @@ def _as_local_search_context( @dataclass(frozen=True, slots=True) -class PreparedStructuredLocalSearchRuntime(FrozenGenericSlotsCompat, +class PreparedStructuredLocalSearchRuntime( + FrozenGenericSlotsCompat, Generic[BoundaryT, StructuredCandidateT], ): """Prepared per-run helpers shared by structured local-search kernels. @@ -67,6 +68,18 @@ class PreparedStructuredLocalSearchRuntime(FrozenGenericSlotsCompat, default_schedule: tuple[tuple[LeafPath, DiscreteLeafSpace], ...] leaf_space_by_path: dict[LeafPath, DiscreteLeafSpace] + def can_evaluate(self, *, reserved_count: int = 0) -> bool: + """Return whether another evaluator call is allowed by the budget. + + Parameters + ---------- + reserved_count : int, default=0 + Evaluation units that must remain available for later proposals in + the same top-level batch. + """ + budget = self.query.evaluation_budget + return budget is None or budget.can_consume(1 + reserved_count) + def evaluate_candidate( self, *, @@ -102,6 +115,7 @@ def evaluate_candidate( if proposal_evaluation_spec is None else (proposal_evaluation_spec,) ), + evaluation_budget=self.query.evaluation_budget, ), ) if len(local_outcomes) != 1: @@ -148,6 +162,7 @@ def evaluate_original_proposal( if proposal_evaluation_spec is None else (proposal_evaluation_spec,) ), + evaluation_budget=self.query.evaluation_budget, ), ) if len(local_outcomes) != 1: diff --git a/src/variopt/algorithms/local_search/structured/runtime/search.py b/src/variopt/algorithms/local_search/structured/runtime/search.py index 108d57d..53d6cb0 100644 --- a/src/variopt/algorithms/local_search/structured/runtime/search.py +++ b/src/variopt/algorithms/local_search/structured/runtime/search.py @@ -64,9 +64,8 @@ def sample_structured_discrete_neighborhood( path, ) leaf_neighbors = discrete_leaf_neighbors(leaf_space, current_leaf_value) - if ( - max_categorical_neighbors_per_leaf is not None - and isinstance(leaf_space, CategoricalSpace) + if max_categorical_neighbors_per_leaf is not None and isinstance( + leaf_space, CategoricalSpace ): bounded_leaf_neighbors = sample_neighbors_without_replacement( neighbors=leaf_neighbors, @@ -107,7 +106,8 @@ def first_improving_single_leaf_outcome( current_score: float, leaf_schedule: tuple[tuple[LeafPath, DiscreteLeafSpace], ...], proposal_evaluation_spec: ProposalEvaluationSpec | None, -) -> tuple[EvaluationOutcome[StructuredCandidateT] | None, int]: + reserved_count: int = 0, +) -> tuple[EvaluationOutcome[StructuredCandidateT] | None, int, bool]: """Return the first improving single-leaf move, if any. Parameters @@ -122,12 +122,16 @@ def first_improving_single_leaf_outcome( Ordered editable leaves to scan for first improvement. proposal_evaluation_spec : ProposalEvaluationSpec | None Optional proposal metadata forwarded to evaluation. + reserved_count : int, default=0 + Evaluation units reserved for later proposals in the same top-level + batch. Returns ------- - tuple[EvaluationOutcome[StructuredCandidateT] | None, int] + tuple[EvaluationOutcome[StructuredCandidateT] | None, int, bool] Improving outcome when found and the total number of evaluations - consumed while scanning the neighborhood. + consumed while scanning the neighborhood, followed by whether the scan + stopped because no evaluation budget remained. """ evaluated_neighbor_count = 0 for path, leaf_space in leaf_schedule: @@ -136,6 +140,8 @@ def first_improving_single_leaf_outcome( path, ) for replacement in discrete_leaf_neighbors(leaf_space, current_leaf_value): + if not runtime.can_evaluate(reserved_count=reserved_count): + return None, evaluated_neighbor_count, True proposed_candidate = runtime.neighborhood.space.replace_leaf_values( candidate, {path: replacement}, @@ -146,9 +152,9 @@ def first_improving_single_leaf_outcome( ) evaluated_neighbor_count += proposed_outcome.evaluation_count if proposed_outcome.record.score < current_score: - return proposed_outcome, evaluated_neighbor_count + return proposed_outcome, evaluated_neighbor_count, False - return None, evaluated_neighbor_count + return None, evaluated_neighbor_count, False def first_improving_pair_move_outcome( @@ -159,7 +165,8 @@ def first_improving_pair_move_outcome( leaf_schedule: tuple[tuple[LeafPath, DiscreteLeafSpace], ...], proposal_evaluation_spec: ProposalEvaluationSpec | None, pair_move_leaf_limit: int, -) -> tuple[EvaluationOutcome[StructuredCandidateT] | None, int]: + reserved_count: int = 0, +) -> tuple[EvaluationOutcome[StructuredCandidateT] | None, int, bool]: """Return the first improving two-leaf move, if any. Parameters @@ -176,16 +183,20 @@ def first_improving_pair_move_outcome( Optional proposal metadata forwarded to evaluation. pair_move_leaf_limit : int Maximum prefix of ``leaf_schedule`` to consider for pair moves. + reserved_count : int, default=0 + Evaluation units reserved for later proposals in the same top-level + batch. Returns ------- - tuple[EvaluationOutcome[StructuredCandidateT] | None, int] + tuple[EvaluationOutcome[StructuredCandidateT] | None, int, bool] Improving outcome when found and the total number of evaluations - consumed while scanning pair moves. + consumed while scanning pair moves, followed by whether the scan + stopped because no evaluation budget remained. """ limited_schedule = leaf_schedule[:pair_move_leaf_limit] if len(limited_schedule) < 2: - return None, 0 + return None, 0, False evaluated_neighbor_count = 0 for left_index in range(len(limited_schedule) - 1): @@ -213,6 +224,8 @@ def first_improving_pair_move_outcome( for left_replacement in left_neighbors: for right_replacement in right_neighbors: + if not runtime.can_evaluate(reserved_count=reserved_count): + return None, evaluated_neighbor_count, True proposed_candidate = runtime.neighborhood.space.replace_leaf_values( candidate, { @@ -226,9 +239,9 @@ def first_improving_pair_move_outcome( ) evaluated_neighbor_count += proposed_outcome.evaluation_count if proposed_outcome.record.score < current_score: - return proposed_outcome, evaluated_neighbor_count + return proposed_outcome, evaluated_neighbor_count, False - return None, evaluated_neighbor_count + return None, evaluated_neighbor_count, False def run_structured_variable_neighborhood_stage_once( @@ -240,6 +253,7 @@ def run_structured_variable_neighborhood_stage_once( leaf_schedule: tuple[tuple[LeafPath, DiscreteLeafSpace], ...], proposal_evaluation_spec: ProposalEvaluationSpec | None, random_state: np.random.RandomState, + reserved_count: int = 0, ) -> StructuredVariableNeighborhoodStageAttempt[StructuredCandidateT]: """Execute one configured variable-neighborhood stage. @@ -259,6 +273,9 @@ def run_structured_variable_neighborhood_stage_once( Optional proposal metadata forwarded to evaluation. random_state : np.random.RandomState Random state used by sampled neighborhood stages. + reserved_count : int, default=0 + Evaluation units reserved for later proposals in the same top-level + batch. Returns ------- @@ -272,13 +289,24 @@ def run_structured_variable_neighborhood_stage_once( If the stage configuration is incomplete or unsupported. """ if stage.kind == "leafwise_first_improvement": - proposed_outcome, evaluation_count = first_improving_single_leaf_outcome( - runtime=runtime, - candidate=candidate, - current_score=current_score, - leaf_schedule=leaf_schedule, - proposal_evaluation_spec=proposal_evaluation_spec, + proposed_outcome, evaluation_count, budget_exhausted = ( + first_improving_single_leaf_outcome( + runtime=runtime, + candidate=candidate, + current_score=current_score, + leaf_schedule=leaf_schedule, + proposal_evaluation_spec=proposal_evaluation_spec, + reserved_count=reserved_count, + ) ) + if budget_exhausted: + return StructuredVariableNeighborhoodStageAttempt( + improved_outcome=None, + evaluation_count=evaluation_count, + terminal_status=KernelStatus.STOPPED, + terminal_message="evaluation budget exhausted before local convergence", + budget_exhausted=True, + ) return StructuredVariableNeighborhoodStageAttempt( improved_outcome=proposed_outcome, evaluation_count=evaluation_count, @@ -303,6 +331,14 @@ def run_structured_variable_neighborhood_stage_once( ) evaluation_count = 0 for move in sampled_neighborhood.moves: + if not runtime.can_evaluate(reserved_count=reserved_count): + return StructuredVariableNeighborhoodStageAttempt( + improved_outcome=None, + evaluation_count=evaluation_count, + terminal_status=KernelStatus.STOPPED, + terminal_message="evaluation budget exhausted before local convergence", + budget_exhausted=True, + ) proposed_candidate = runtime.neighborhood.space.replace_leaf_values( candidate, {move.path: move.replacement}, @@ -344,13 +380,24 @@ def run_structured_variable_neighborhood_stage_once( msg = "scheduled stage must define pair_move_leaf_limit" raise ValueError(msg) - proposed_outcome, evaluation_count = first_improving_single_leaf_outcome( - runtime=runtime, - candidate=candidate, - current_score=current_score, - leaf_schedule=leaf_schedule, - proposal_evaluation_spec=proposal_evaluation_spec, + proposed_outcome, evaluation_count, budget_exhausted = ( + first_improving_single_leaf_outcome( + runtime=runtime, + candidate=candidate, + current_score=current_score, + leaf_schedule=leaf_schedule, + proposal_evaluation_spec=proposal_evaluation_spec, + reserved_count=reserved_count, + ) ) + if budget_exhausted: + return StructuredVariableNeighborhoodStageAttempt( + improved_outcome=None, + evaluation_count=evaluation_count, + terminal_status=KernelStatus.STOPPED, + terminal_message="evaluation budget exhausted before local convergence", + budget_exhausted=True, + ) if proposed_outcome is not None: return StructuredVariableNeighborhoodStageAttempt( improved_outcome=proposed_outcome, @@ -361,14 +408,25 @@ def run_structured_variable_neighborhood_stage_once( ), ) - pair_outcome, pair_evaluation_count = first_improving_pair_move_outcome( - runtime=runtime, - candidate=candidate, - current_score=current_score, - leaf_schedule=leaf_schedule, - proposal_evaluation_spec=proposal_evaluation_spec, - pair_move_leaf_limit=stage.pair_move_leaf_limit, + pair_outcome, pair_evaluation_count, pair_budget_exhausted = ( + first_improving_pair_move_outcome( + runtime=runtime, + candidate=candidate, + current_score=current_score, + leaf_schedule=leaf_schedule, + proposal_evaluation_spec=proposal_evaluation_spec, + pair_move_leaf_limit=stage.pair_move_leaf_limit, + reserved_count=reserved_count, + ) ) + if pair_budget_exhausted: + return StructuredVariableNeighborhoodStageAttempt( + improved_outcome=None, + evaluation_count=evaluation_count + pair_evaluation_count, + terminal_status=KernelStatus.STOPPED, + terminal_message="evaluation budget exhausted before local convergence", + budget_exhausted=True, + ) return StructuredVariableNeighborhoodStageAttempt( improved_outcome=pair_outcome, evaluation_count=evaluation_count + pair_evaluation_count, @@ -390,6 +448,7 @@ def run_leafwise_local_search_episode( proposal_evaluation_spec: ProposalEvaluationSpec | None, leaf_schedule: tuple[tuple[LeafPath, DiscreteLeafSpace], ...], max_steps: int, + reserved_count: int = 0, ) -> StructuredLocalImprovementResult[StructuredCandidateT]: """Run one deterministic first-improvement local-search episode. @@ -407,6 +466,9 @@ def run_leafwise_local_search_episode( Ordered editable leaves scanned at each local-search step. max_steps : int Maximum number of successful improvement steps to execute. + reserved_count : int, default=0 + Evaluation units reserved for later proposals in the same top-level + batch. Returns ------- @@ -424,18 +486,22 @@ def run_leafwise_local_search_episode( evaluation_count = current_outcome.evaluation_count completed_steps = 0 converged = False + budget_exhausted = False while completed_steps < max_steps: - proposed_outcome, neighbor_evaluation_count = first_improving_single_leaf_outcome( - runtime=runtime, - candidate=current_candidate, - current_score=current_score, - leaf_schedule=leaf_schedule, - proposal_evaluation_spec=proposal_evaluation_spec, + proposed_outcome, neighbor_evaluation_count, budget_exhausted = ( + first_improving_single_leaf_outcome( + runtime=runtime, + candidate=current_candidate, + current_score=current_score, + leaf_schedule=leaf_schedule, + proposal_evaluation_spec=proposal_evaluation_spec, + reserved_count=reserved_count, + ) ) evaluation_count += neighbor_evaluation_count if proposed_outcome is None: - converged = True + converged = not budget_exhausted break proposed_record = proposed_outcome.record @@ -455,4 +521,5 @@ def run_leafwise_local_search_episode( evaluation_count=evaluation_count, completed_steps=completed_steps, converged=converged, + budget_exhausted=budget_exhausted, ) diff --git a/src/variopt/algorithms/local_search/structured/scheduled.py b/src/variopt/algorithms/local_search/structured/scheduled.py index 08ea57f..2e62232 100644 --- a/src/variopt/algorithms/local_search/structured/scheduled.py +++ b/src/variopt/algorithms/local_search/structured/scheduled.py @@ -30,7 +30,8 @@ @dataclass(frozen=True, slots=True) -class StructuredScheduledLocalSearchKernel(FrozenGenericSlotsCompat, +class StructuredScheduledLocalSearchKernel( + FrozenGenericSlotsCompat, Kernel[ ProposalBatchQuery[ BoundaryT, @@ -145,6 +146,7 @@ def _optimize_proposal( runtime: PreparedStructuredLocalSearchRuntime[BoundaryT, StructuredCandidateT], proposal_index: int, proposal: Proposal[StructuredCandidateT], + reserved_count: int, ) -> EvaluationOutcome[StructuredCandidateT]: """Run one scheduled local-search episode for one original proposal.""" runtime.neighborhood.space.validate(proposal.candidate) @@ -176,20 +178,22 @@ def _optimize_proposal( runtime=runtime, context=context, ) + budget_exhausted = False while completed_steps < episode_max_steps: - proposed_outcome, neighbor_evaluation_count = ( + proposed_outcome, neighbor_evaluation_count, budget_exhausted = ( first_improving_single_leaf_outcome( runtime=runtime, candidate=current_candidate, current_score=current_score, leaf_schedule=leaf_schedule, proposal_evaluation_spec=proposal_evaluation_spec, + reserved_count=reserved_count, ) ) evaluation_count += neighbor_evaluation_count - if proposed_outcome is None: - proposed_outcome, neighbor_evaluation_count = ( + if proposed_outcome is None and not budget_exhausted: + proposed_outcome, neighbor_evaluation_count, budget_exhausted = ( first_improving_pair_move_outcome( runtime=runtime, candidate=current_candidate, @@ -197,10 +201,13 @@ def _optimize_proposal( leaf_schedule=leaf_schedule, proposal_evaluation_spec=proposal_evaluation_spec, pair_move_leaf_limit=self.pair_move_leaf_limit, + reserved_count=reserved_count, ) ) evaluation_count += neighbor_evaluation_count + if budget_exhausted: + break if proposed_outcome is None: converged = True break @@ -213,6 +220,8 @@ def _optimize_proposal( status = KernelStatus.STOPPED message = "max_steps reached before local convergence" + if budget_exhausted: + message = "evaluation budget exhausted before local convergence" if converged: status = KernelStatus.CONVERGED message = "no improving scheduled move found" @@ -278,6 +287,7 @@ def run( runtime=runtime, proposal_index=proposal_index, proposal=proposal, + reserved_count=len(query.proposals) - proposal_index - 1, ) for proposal_index, proposal in enumerate(query.proposals) ) diff --git a/src/variopt/algorithms/local_search/structured/stochastic.py b/src/variopt/algorithms/local_search/structured/stochastic.py index 3731ac2..9c9059b 100644 --- a/src/variopt/algorithms/local_search/structured/stochastic.py +++ b/src/variopt/algorithms/local_search/structured/stochastic.py @@ -29,7 +29,8 @@ @dataclass(frozen=True, slots=True) -class StructuredStochasticNeighborhoodKernel(FrozenGenericSlotsCompat, +class StructuredStochasticNeighborhoodKernel( + FrozenGenericSlotsCompat, Kernel[ ProposalBatchQuery[ BoundaryT, @@ -152,6 +153,7 @@ def _optimize_proposal( proposal_index: int, proposal: Proposal[StructuredCandidateT], random_state: np.random.RandomState, + reserved_count: int, ) -> EvaluationOutcome[StructuredCandidateT]: """Run one bounded stochastic local-search episode for one proposal.""" runtime.neighborhood.space.validate(proposal.candidate) @@ -183,6 +185,7 @@ def _optimize_proposal( context=context, ) found_full_neighborhood_stop = False + budget_exhausted = False while completed_steps < episode_max_steps: sampled_neighborhood = sample_structured_discrete_neighborhood( @@ -199,6 +202,9 @@ def _optimize_proposal( improved = False for move in sampled_neighborhood.moves: + if not runtime.can_evaluate(reserved_count=reserved_count): + budget_exhausted = True + break proposed_candidate = runtime.neighborhood.space.replace_leaf_values( current_candidate, {move.path: move.replacement}, @@ -219,12 +225,16 @@ def _optimize_proposal( improved = True break + if budget_exhausted: + break if not improved: break status = KernelStatus.STOPPED message = "max_steps reached before stochastic local-search termination" - if completed_steps < episode_max_steps: + if budget_exhausted: + message = "evaluation budget exhausted before local convergence" + elif completed_steps < episode_max_steps: if found_full_neighborhood_stop: status = KernelStatus.CONVERGED message = "no improving move found in the full discrete neighborhood" @@ -294,6 +304,7 @@ def run( proposal_index=proposal_index, proposal=proposal, random_state=random_state, + reserved_count=len(query.proposals) - proposal_index - 1, ) for proposal_index, proposal in enumerate(query.proposals) ) diff --git a/src/variopt/algorithms/local_search/structured/variable_neighborhood.py b/src/variopt/algorithms/local_search/structured/variable_neighborhood.py index 0c31530..4527cef 100644 --- a/src/variopt/algorithms/local_search/structured/variable_neighborhood.py +++ b/src/variopt/algorithms/local_search/structured/variable_neighborhood.py @@ -34,7 +34,8 @@ @dataclass(frozen=True, slots=True) -class StructuredVariableNeighborhoodKernel(FrozenGenericSlotsCompat, +class StructuredVariableNeighborhoodKernel( + FrozenGenericSlotsCompat, Kernel[ ProposalBatchQuery[ BoundaryT, @@ -159,6 +160,7 @@ def _optimize_proposal( proposal_index: int, proposal: Proposal[StructuredCandidateT], random_state: np.random.RandomState, + reserved_count: int, ) -> EvaluationOutcome[StructuredCandidateT]: """Run one variable-neighborhood local-search episode for one proposal.""" runtime.neighborhood.space.validate(proposal.candidate) @@ -200,6 +202,7 @@ def _optimize_proposal( leaf_schedule=leaf_schedule, proposal_evaluation_spec=proposal_evaluation_spec, random_state=random_state, + reserved_count=reserved_count, ) evaluation_count += stage_attempt.evaluation_count @@ -212,13 +215,22 @@ def _optimize_proposal( current_stage_index = 0 continue - if current_stage_index == len(self.stages) - 1: + if ( + stage_attempt.budget_exhausted + or current_stage_index == len(self.stages) - 1 + ): refinement = None if completed_steps > 0: refinement = runtime.candidate_refinement( source_candidate=proposal.candidate, refined_candidate=current_candidate, ) + terminal_message = stage_attempt.terminal_message + if not stage_attempt.budget_exhausted: + terminal_message = ( + terminal_message + + " after exhausting the configured variable-neighborhood stages" + ) return EvaluationOutcome( record=Observation.from_objective_value( @@ -233,10 +245,7 @@ def _optimize_proposal( backend="structured.local_search", method="variable_neighborhood_search", status=stage_attempt.terminal_status, - message=( - stage_attempt.terminal_message - + " after exhausting the configured variable-neighborhood stages" - ), + message=terminal_message, ), refinement=refinement, candidate_equal=runtime.query.problem.space.candidates_equal, @@ -307,6 +316,7 @@ def run( proposal_index=proposal_index, proposal=proposal, random_state=random_state, + reserved_count=len(query.proposals) - proposal_index - 1, ) for proposal_index, proposal in enumerate(query.proposals) ) diff --git a/tests/local_search/test_scipy_local_optimization.py b/tests/local_search/test_scipy_local_optimization.py index 5df8881..ce0cda5 100644 --- a/tests/local_search/test_scipy_local_optimization.py +++ b/tests/local_search/test_scipy_local_optimization.py @@ -10,6 +10,7 @@ from tests.numeric_support import approx_equal from variopt import ( + EvaluationBudget, EvaluationOutcome, Objective, Observation, @@ -49,6 +50,9 @@ def evaluate_query_directly( query: ProposalBatchQuery[BoundaryRunnerT, CandidateRunnerT], ) -> tuple[EvaluationOutcome[CandidateRunnerT], ...]: """Evaluate one proposal batch directly through the problem objective.""" + if query.evaluation_budget is not None: + query.evaluation_budget.consume(len(query.proposals)) + return tuple( EvaluationOutcome( observation=Observation.from_objective_value( @@ -315,6 +319,78 @@ def fake_run_scipy_minimize( assert outcome.observation.value == 10.0 assert outcome.evaluation_count == 2 + def test_scipy_reserves_budget_for_later_batch_proposals( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + problem = Problem( + space=RealSpace(-5.0, 5.0), + objective=ShiftedSquareObjective(), + ) + kernel = ScipyMinimizeKernel[float | int, float](method="L-BFGS-B") + evaluation_budget = EvaluationBudget(2) + + def fake_run_scipy_minimize( + *, + objective_in_coordinate_space: Callable[[Sequence[float]], float], + initial_coordinates: tuple[float, ...], + method: str, + coordinate_bounds: tuple[tuple[float, float], ...], + tolerance: float | None, + options: dict[str, int], + ) -> FakeScipyOptimizeResult: + _ = ( + method, + coordinate_bounds, + tolerance, + options, + ) + objective_score = objective_in_coordinate_space(initial_coordinates) + _ = objective_in_coordinate_space((1.5,)) + return FakeScipyOptimizeResult( + x=(1.5,), + fun=objective_score, + nfev=2, + success=True, + message="ok", + ) + + monkeypatch.setattr( + scipy_kernel_module, + "run_scipy_minimize", + fake_run_scipy_minimize, + ) + query = ProposalBatchQuery( + problem=problem, + proposals=( + Proposal(candidate=4.0, proposal_id="p-1"), + Proposal(candidate=-2.0, proposal_id="p-2"), + ), + execution_resources=ExecutionResources( + parallel_owner="evaluator", + nested_parallelism_policy=NestedParallelismPolicy.FORBID, + owner_worker_count=1, + owner_backend="sequential", + ), + evaluation_budget=evaluation_budget, + ) + + outcomes = kernel.run(query, evaluate_query_directly) + + assert evaluation_budget.remaining == 0 + assert tuple(outcome.evaluation_count for outcome in outcomes) == (1, 1) + assert tuple(outcome.observation.candidate for outcome in outcomes) == ( + 4.0, + -2.0, + ) + assert all( + outcome.kernel_diagnostics is not None + and outcome.kernel_diagnostics.status == KernelStatus.STOPPED + and outcome.kernel_diagnostics.message + == "evaluation budget exhausted before local convergence" + for outcome in outcomes + ) + def test_sequence_coordinate_cache_preserves_elapsed_seconds( self, monkeypatch: pytest.MonkeyPatch, @@ -363,7 +439,9 @@ def elapsed_runner( observation=Observation.from_objective_value( proposal=proposal, candidate=proposal.candidate, - value=local_query.problem.objective.evaluate(proposal.candidate), + value=local_query.problem.objective.evaluate( + proposal.candidate + ), direction=local_query.problem.direction, elapsed_seconds=0.25, ), @@ -792,9 +870,7 @@ def test_context_can_disable_local_search(self) -> None: owner_worker_count=1, owner_backend="sequential", ), - proposal_kernel_hints=( - ProposalLocalSearchContext(enabled=False), - ), + proposal_kernel_hints=(ProposalLocalSearchContext(enabled=False),), ) outcomes = kernel.run(query, evaluate_query_directly) @@ -805,7 +881,10 @@ def test_context_can_disable_local_search(self) -> None: assert outcome.evaluation_count == 1 assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "local search disabled by run-method context" + assert ( + outcome.kernel_diagnostics.message + == "local search disabled by run-method context" + ) assert outcome.refinement is None def test_disabled_local_search_forwards_proposal_evaluation_spec(self) -> None: @@ -828,9 +907,7 @@ class LocalSpec(ProposalEvaluationSpec): owner_backend="sequential", ), proposal_evaluation_specs=(spec,), - proposal_kernel_hints=( - ProposalLocalSearchContext(enabled=False), - ), + proposal_kernel_hints=(ProposalLocalSearchContext(enabled=False),), ) def assert_spec_forwarded( @@ -844,7 +921,9 @@ def assert_spec_forwarded( proposal=proposal, proposal_evaluation_spec=spec, candidate=proposal.candidate, - value=local_query.problem.objective.evaluate(proposal.candidate), + value=local_query.problem.objective.evaluate( + proposal.candidate + ), direction=local_query.problem.direction, ), evaluation_count=1, @@ -917,7 +996,9 @@ def assert_spec_forwarded( proposal=proposal, proposal_evaluation_spec=spec, candidate=proposal.candidate, - value=local_query.problem.objective.evaluate(proposal.candidate), + value=local_query.problem.objective.evaluate( + proposal.candidate + ), direction=local_query.problem.direction, ), evaluation_count=1, @@ -957,9 +1038,7 @@ def test_context_can_override_scipy_iteration_budget( owner_worker_count=1, owner_backend="sequential", ), - proposal_kernel_hints=( - ProposalLocalSearchContext(local_budget=3), - ), + proposal_kernel_hints=(ProposalLocalSearchContext(local_budget=3),), ) captured_options: list[dict[str, int]] = [] @@ -1158,7 +1237,10 @@ def reject_from_space( outcomes = kernel.run(query, evaluate_query_directly) - assert tuple(outcome.observation.candidate for outcome in outcomes) == (4.0, -2.0) + assert tuple(outcome.observation.candidate for outcome in outcomes) == ( + 4.0, + -2.0, + ) def test_empty_local_search_batch_does_not_prepare_structured_codec( self, @@ -1203,7 +1285,9 @@ def reject_runner( assert outcomes == () - def test_empty_local_search_batch_allows_incompatible_space_without_codec(self) -> None: + def test_empty_local_search_batch_allows_incompatible_space_without_codec( + self, + ) -> None: problem = Problem( space=IntegerSpace(0, 10), objective=IntegerObjective(), @@ -1306,7 +1390,10 @@ def fake_run_scipy_minimize( outcomes = kernel.run(query, evaluate_query_directly) - assert tuple(outcome.observation.candidate for outcome in outcomes) == (4.0, 1.5) + assert tuple(outcome.observation.candidate for outcome in outcomes) == ( + 4.0, + 1.5, + ) assert codec_call_count == 1 def test_structured_codec_cache_is_query_local( diff --git a/tests/local_search/test_structured_local_optimization.py b/tests/local_search/test_structured_local_optimization.py index 63000e8..5e902af 100644 --- a/tests/local_search/test_structured_local_optimization.py +++ b/tests/local_search/test_structured_local_optimization.py @@ -11,6 +11,7 @@ from variopt import ( ArraySpace, CategoricalSpace, + EvaluationBudget, EvaluationOutcome, IntegerSpace, Objective, @@ -63,6 +64,9 @@ def evaluate_query_directly( query: ProposalBatchQuery[BoundaryRunnerT, CandidateRunnerT], ) -> tuple[EvaluationOutcome[CandidateRunnerT], ...]: """Evaluate one proposal batch directly through the problem objective.""" + if query.evaluation_budget is not None: + query.evaluation_budget.consume(len(query.proposals)) + return tuple( EvaluationOutcome( observation=Observation.from_objective_value( @@ -236,7 +240,9 @@ def validate(self, candidate: ConditionalDiscreteCandidate) -> None: self.tail_space.validate(candidate[1]) @override - def sample(self, random_state: np.random.RandomState) -> ConditionalDiscreteCandidate: + def sample( + self, random_state: np.random.RandomState + ) -> ConditionalDiscreteCandidate: return ( self.head_space.sample(random_state), self.tail_space.sample(random_state), @@ -295,13 +301,17 @@ def replace_leaf_values( if ("head",) in replacements: replacement = replacements[("head",)] if type(replacement) is not int: - msg = "conditional discrete head replacement must be a canonical integer" + msg = ( + "conditional discrete head replacement must be a canonical integer" + ) raise TypeError(msg) head_value = self.head_space.normalize(replacement) if ("tail",) in replacements: replacement = replacements[("tail",)] if type(replacement) is not int: - msg = "conditional discrete tail replacement must be a canonical integer" + msg = ( + "conditional discrete tail replacement must be a canonical integer" + ) raise TypeError(msg) tail_value = self.tail_space.normalize(replacement) return (head_value, tail_value) @@ -346,18 +356,54 @@ def test_integer_hill_climber_converges_to_local_optimum(self) -> None: assert outcome.refinement.refined_candidate == 2 assert outcome.refinement.changed_leaf_paths == ((),) + def test_hill_climber_reserves_budget_for_later_batch_proposals(self) -> None: + problem = Problem( + space=IntegerSpace(0, 5), + objective=IntegerObjective(), + ) + kernel = StructuredHillClimbKernel[int, int](max_steps=8) + evaluation_budget = EvaluationBudget(2) + query = ProposalBatchQuery( + problem=problem, + proposals=( + Proposal(candidate=5, proposal_id="p-1"), + Proposal(candidate=5, proposal_id="p-2"), + ), + execution_resources=self.make_execution_resources(), + evaluation_budget=evaluation_budget, + ) + + outcomes = kernel.run(query, evaluate_query_directly) + + assert evaluation_budget.remaining == 0 + assert tuple(outcome.evaluation_count for outcome in outcomes) == (1, 1) + assert tuple(outcome.observation.candidate for outcome in outcomes) == (5, 5) + assert all( + outcome.kernel_diagnostics is not None + and outcome.kernel_diagnostics.status == KernelStatus.STOPPED + and outcome.kernel_diagnostics.message + == "evaluation budget exhausted before local convergence" + for outcome in outcomes + ) + def test_categorical_hill_climber_moves_through_declared_alternatives(self) -> None: - color_space: CategoricalSpace[Color] = CategoricalSpace(("red", "green", "blue")) + color_space: CategoricalSpace[Color] = CategoricalSpace( + ("red", "green", "blue") + ) initial_candidate: Color = "red" problem: Problem[Color, Color, Observation[Color]] = Problem( space=color_space, objective=CategoricalObjective(), ) kernel = StructuredHillClimbKernel[Color, Color](max_steps=4) - query: ProposalBatchQuery[Color, Color, Observation[Color]] = ProposalBatchQuery( - problem=problem, - proposals=(Proposal[Color](candidate=initial_candidate, proposal_id="p-1"),), - execution_resources=self.make_execution_resources(), + query: ProposalBatchQuery[Color, Color, Observation[Color]] = ( + ProposalBatchQuery( + problem=problem, + proposals=( + Proposal[Color](candidate=initial_candidate, proposal_id="p-1"), + ), + execution_resources=self.make_execution_resources(), + ) ) outcomes = kernel.run(query, evaluate_query_directly) @@ -434,9 +480,7 @@ def test_context_can_disable_local_search(self) -> None: problem=problem, proposals=(Proposal(candidate=5, proposal_id="p-1"),), execution_resources=self.make_execution_resources(), - proposal_kernel_hints=( - ProposalLocalSearchContext(enabled=False), - ), + proposal_kernel_hints=(ProposalLocalSearchContext(enabled=False),), ) outcomes = kernel.run(query, evaluate_query_directly) @@ -447,7 +491,10 @@ def test_context_can_disable_local_search(self) -> None: assert outcome.evaluation_count == 1 assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "local search disabled by run-method context" + assert ( + outcome.kernel_diagnostics.message + == "local search disabled by run-method context" + ) assert outcome.refinement is None def test_context_can_override_step_budget(self) -> None: @@ -468,9 +515,7 @@ def test_context_can_override_step_budget(self) -> None: problem=problem, proposals=(Proposal(candidate=initial_candidate, proposal_id="p-1"),), execution_resources=self.make_execution_resources(), - proposal_kernel_hints=( - ProposalLocalSearchContext(local_budget=1), - ), + proposal_kernel_hints=(ProposalLocalSearchContext(local_budget=1),), ) outcomes = kernel.run(query, evaluate_query_directly) @@ -682,7 +727,10 @@ def test_stochastic_kernel_stops_after_sampled_neighborhood_without_improvement( assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.method == "sampled_leafwise_first_improvement" assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "no improving move found in the sampled discrete neighborhood" + assert ( + outcome.kernel_diagnostics.message + == "no improving move found in the sampled discrete neighborhood" + ) assert outcome.refinement is None def test_stochastic_kernel_converges_when_sampling_covers_full_neighborhood( @@ -709,7 +757,10 @@ def test_stochastic_kernel_converges_when_sampling_covers_full_neighborhood( assert outcome.evaluation_count == 3 assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.status == KernelStatus.CONVERGED - assert outcome.kernel_diagnostics.message == "no improving move found in the full discrete neighborhood" + assert ( + outcome.kernel_diagnostics.message + == "no improving move found in the full discrete neighborhood" + ) assert outcome.refinement is None def test_stochastic_kernel_can_cap_categorical_neighbors_per_leaf(self) -> None: @@ -735,7 +786,10 @@ def test_stochastic_kernel_can_cap_categorical_neighbors_per_leaf(self) -> None: assert outcome.evaluation_count == 4 assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "no improving move found in the sampled discrete neighborhood" + assert ( + outcome.kernel_diagnostics.message + == "no improving move found in the sampled discrete neighborhood" + ) def test_stochastic_kernel_can_take_one_sampled_improving_move(self) -> None: problem = Problem( @@ -761,7 +815,10 @@ def test_stochastic_kernel_can_take_one_sampled_improving_move(self) -> None: assert outcome.evaluation_count == 2 assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "max_steps reached before stochastic local-search termination" + assert ( + outcome.kernel_diagnostics.message + == "max_steps reached before stochastic local-search termination" + ) assert outcome.refinement is not None assert outcome.refinement.source_candidate == 0 assert outcome.refinement.refined_candidate == outcome.observation.candidate @@ -850,7 +907,10 @@ def test_variable_neighborhood_kernel_resets_to_first_stage_after_improvement( assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.method == "variable_neighborhood_search" assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "max_steps reached before variable-neighborhood termination" + assert ( + outcome.kernel_diagnostics.message + == "max_steps reached before variable-neighborhood termination" + ) assert outcome.refinement is not None assert outcome.refinement.source_candidate == (0, 0, 0) assert outcome.refinement.refined_candidate == (1, 1, 1) @@ -886,9 +946,9 @@ def test_variable_neighborhood_kernel_uses_sampled_stage_terminal_status( assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED assert outcome.kernel_diagnostics.message == ( - "no improving move found in the sampled variable neighborhood " - "after exhausting the configured variable-neighborhood stages" - ) + "no improving move found in the sampled variable neighborhood " + "after exhausting the configured variable-neighborhood stages" + ) assert outcome.refinement is None def test_variable_neighborhood_kernel_rejects_invalid_stage_metadata( @@ -952,7 +1012,10 @@ def test_iterated_local_search_kernel_can_escape_pair_move_trap_via_kick( assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.method == "iterated_local_search" assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "max_kicks reached before iterated local-search termination" + assert ( + outcome.kernel_diagnostics.message + == "max_kicks reached before iterated local-search termination" + ) assert outcome.refinement is not None assert outcome.refinement.source_candidate == (0, 0) assert outcome.refinement.refined_candidate == (1, 1) @@ -989,7 +1052,10 @@ def test_iterated_local_search_kernel_requires_strict_improvement_to_accept( assert outcome.refinement is None assert outcome.kernel_diagnostics is not None assert outcome.kernel_diagnostics.status == KernelStatus.STOPPED - assert outcome.kernel_diagnostics.message == "max_kicks reached before iterated local-search termination" + assert ( + outcome.kernel_diagnostics.message + == "max_kicks reached before iterated local-search termination" + ) def test_iterated_local_search_kernel_rejects_invalid_kick_metadata(self) -> None: with pytest.raises(ValueError, match="max_kicks must be positive"): @@ -998,7 +1064,9 @@ def test_iterated_local_search_kernel_rejects_invalid_kick_metadata(self) -> Non with pytest.raises(ValueError, match="kick_leaf_count must be positive"): _ = StructuredKickPolicy(kick_leaf_count=0) - with pytest.raises(ValueError, match="max_categorical_alternatives_per_leaf must be positive"): + with pytest.raises( + ValueError, match="max_categorical_alternatives_per_leaf must be positive" + ): _ = StructuredKickPolicy(max_categorical_alternatives_per_leaf=0) From 6db4a3b7b93cbb32d0bcdb1de4c09b7f8c9a5743 Mon Sep 17 00:00:00 2001 From: isty2e Date: Thu, 2 Jul 2026 22:05:18 +0900 Subject: [PATCH 3/6] docs: align budget and checkpoint guidance --- docs/concepts/candidate-refinement.md | 9 ++++---- docs/guides/local-optimization-methods.md | 25 ++++++++++++----------- docs/reference/checkpointing.md | 7 +++++-- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/docs/concepts/candidate-refinement.md b/docs/concepts/candidate-refinement.md index 3f25c53..40a4749 100644 --- a/docs/concepts/candidate-refinement.md +++ b/docs/concepts/candidate-refinement.md @@ -55,10 +55,11 @@ problem's semantic evaluation rule. Refinement metadata does not count evaluations by itself. Logical evaluation cost is carried by `EvaluationOutcome.evaluation_count`. -`Study.optimize(..., count_evaluation_cost=True)` charges the reported -`evaluation_count` instead of only counting returned records. This matters when -a local-search kernel evaluates several inner candidates before returning one -refined result. +By default, `Study.optimize(...)` charges the reported `evaluation_count` instead +of only counting returned records. This matters when a local-search kernel +evaluates several inner candidates before returning one refined result. Set +`count_evaluation_cost=False` only when you deliberately want outer-record +counting. Terminal surfaces preserve provenance only as aligned metadata: diff --git a/docs/guides/local-optimization-methods.md b/docs/guides/local-optimization-methods.md index 56b7e90..9bff5cd 100644 --- a/docs/guides/local-optimization-methods.md +++ b/docs/guides/local-optimization-methods.md @@ -90,10 +90,9 @@ Practical trade-off: - but it may consume many inner objective evaluations due to finite-difference gradient estimation -If you care about actual objective-evaluation cost rather than just the number -of outer proposals, enable cost-aware budgeting in -[`Study.optimize(...)`](../reference/api/study.md) -with `count_evaluation_cost=True`. +`Study.optimize(...)` budgets by actual objective-evaluation cost by default, so +SciPy inner evaluations are charged against `max_evaluations` rather than hidden +behind one outer proposal. ### `Powell` @@ -177,15 +176,17 @@ The kernel path reports that cost through [`EvaluationOutcome.evaluation_count`](../reference/api/variopt.md). `Study.optimize(...)` then offers two modes: -- default: budget decreases by the number of returned observations -- `count_evaluation_cost=True`: budget decreases by the sum of inner objective - evaluations reported by the kernel/evaluator path +- default: budget decreases by the sum of objective evaluations reported by the + kernel/evaluator path +- `count_evaluation_cost=False`: budget decreases by the number of returned + observations Practical guidance: -- use default counting when you only care about outer search steps -- use `count_evaluation_cost=True` when comparing methods with and without local - optimization, or when the kernel itself is expensive +- keep the default when comparing methods with and without local optimization, + or when the objective itself is expensive +- use `count_evaluation_cost=False` only when you deliberately want an outer-step + budget rather than an objective-cost budget If a custom kernel already computed the objective value, it should return both that value and the true `evaluation_count` so that `Study` can reuse the value @@ -257,7 +258,7 @@ execution configuration, not as free throughput toggles. | All-discrete structured space that already has a justified staged neighborhood-widening story | `StructuredVariableNeighborhoodKernel(max_steps=..., stages=(...))` | | All-discrete structured space with a fixed stage sequence | `StructuredScheduledLocalSearchKernel(stages=(...))` | | Mixed real/integer/categorical space | no built-in generic mixed adapter yet; use a custom kernel, split the local search cleanly by domain, or skip local optimization | -| Comparing methods with and without local optimization | enable `count_evaluation_cost=True` | +| Comparing methods with and without local optimization | use the default objective-cost budget | | Batch-parallel evaluation with joblib | keep the kernel serial and let the evaluator own parallelism | | Early debugging or correctness validation | start with `SequentialEvaluator` | @@ -269,7 +270,7 @@ If you are unsure, start here: 2. If the space is continuous and local improvement is clearly valuable, try `L-BFGS-B`. 3. If `L-BFGS-B` behaves poorly on a rough objective, switch to `Powell`. -4. Turn on `count_evaluation_cost=True` before making any fairness claims about +4. Keep default objective-cost budgeting before making any fairness claims about efficiency. 5. Add `JoblibEvaluator` only after the kernel itself is behaving well in sequential execution. diff --git a/docs/reference/checkpointing.md b/docs/reference/checkpointing.md index 9a9aeb7..e2019ee 100644 --- a/docs/reference/checkpointing.md +++ b/docs/reference/checkpointing.md @@ -41,8 +41,11 @@ study = Study( evaluator=SequentialEvaluator[int, int](), ) -# Run partway and save. -result, state = study.optimize(max_evaluations=20) +# Run partway to a checkpoint-safe boundary and save. +result, state = study.optimize( + max_evaluations=20, + stop_at_checkpoint_boundary=True, +) checkpoint = optimizer.state_to_dict(state) with open("checkpoint.json", "w") as f: From 6e7bf0b954d77b123739a4535fdee0d3d72037d3 Mon Sep 17 00:00:00 2001 From: isty2e Date: Thu, 2 Jul 2026 22:14:47 +0900 Subject: [PATCH 4/6] fix(local-search): avoid scipy outcome type narrowing conflict --- .../algorithms/local_search/scipy/kernel.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/variopt/algorithms/local_search/scipy/kernel.py b/src/variopt/algorithms/local_search/scipy/kernel.py index f8fc0bf..7577d23 100644 --- a/src/variopt/algorithms/local_search/scipy/kernel.py +++ b/src/variopt/algorithms/local_search/scipy/kernel.py @@ -453,26 +453,28 @@ def objective_in_coordinate_space( proposal.candidate, optimized_coordinates, ) - optimized_outcome = evaluated_outcomes_by_coordinates.get(optimized_coordinates) - if optimized_outcome is None: + cached_optimized_outcome = evaluated_outcomes_by_coordinates.get( + optimized_coordinates, + ) + if cached_optimized_outcome is None: if ( query.evaluation_budget is not None and not can_evaluate_local_candidate() and len(evaluated_outcomes_by_coordinates) > 0 ): - optimized_outcome = min( + best_seen_outcome = min( evaluated_outcomes_by_coordinates.values(), key=lambda outcome: outcome.record.score, ) - return budget_exhausted_outcome(optimized_outcome) + return budget_exhausted_outcome(best_seen_outcome) - optimized_outcome = self._evaluate_candidate( + cached_optimized_outcome = self._evaluate_candidate( query=query, candidate=optimized_candidate, proposal_evaluation_spec=proposal_evaluation_spec, runner=runner, ) - evaluation_count += optimized_outcome.evaluation_count + evaluation_count += cached_optimized_outcome.evaluation_count refinement = _candidate_refinement_from_codec( codec=codec, source_candidate=proposal.candidate, @@ -483,9 +485,9 @@ def objective_in_coordinate_space( proposal=proposal, proposal_evaluation_spec=proposal_evaluation_spec, candidate=optimized_candidate, - value=optimized_outcome.record.value, - score=optimized_outcome.record.score, - elapsed_seconds=optimized_outcome.record.elapsed_seconds, + value=cached_optimized_outcome.record.value, + score=cached_optimized_outcome.record.score, + elapsed_seconds=cached_optimized_outcome.record.elapsed_seconds, ), evaluation_count=evaluation_count, kernel_diagnostics=scipy_result.diagnostics(method=self.method), From 3079b64dd7b5f76506016d6dfb16b2fedc01878e Mon Sep 17 00:00:00 2001 From: isty2e Date: Thu, 2 Jul 2026 22:14:53 +0900 Subject: [PATCH 5/6] docs: document hard budget migration impact --- CHANGELOG.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b125be0..45ed99f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,22 @@ format. Stability guarantees for the public surface are documented in the ## [Unreleased] -No unreleased changes yet. +### Breaking + +- `Study.run(...)` and `Study.optimize(...)` now default + `count_evaluation_cost=True`. Evaluation budgets are charged against reported + logical evaluation cost, including inner local-search evaluations, rather than + only the number of returned records. Code that intentionally wants outer-record + counting must pass `count_evaluation_cost=False`. +- Study execution now raises `EvaluationBudgetExhausted` instead of silently + assimilating a step whose reported evaluation cost exceeds the remaining hard + budget. + +### Added + +- Added `stop_at_checkpoint_boundary=True` for `Study.run(...)` and + `Study.optimize(...)` so CSA runs can return the latest checkpoint-safe state + when the budget ends inside an unsafe generation segment. ## [0.1.0] - 2026-06-15 From fa02a3db8e4058d574fa84b17376368c1dabcceb Mon Sep 17 00:00:00 2001 From: isty2e Date: Thu, 2 Jul 2026 22:15:51 +0900 Subject: [PATCH 6/6] refactor(study): inline budget batch sizing --- src/variopt/study/execution.py | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/variopt/study/execution.py b/src/variopt/study/execution.py index d3216b3..1633bf0 100644 --- a/src/variopt/study/execution.py +++ b/src/variopt/study/execution.py @@ -245,23 +245,6 @@ def _current_remaining_budget( return evaluation_budget.remaining -def _current_step_batch_size( - _study: StudyExecutionOwner[ - BoundaryT, - CandidateT, - RunMethodStateT, - StudyEvaluationRecordT, - ], - *, - batch_size: int, - remaining: int, - evaluation_budget: EvaluationBudget | None, -) -> int: - """Return a batch size that preserves hard budget safety for kernels.""" - _ = evaluation_budget - return min(batch_size, remaining) - - def _optimize_direct_scalar_sequential( study: DirectScalarSequentialStudyOwner[ BoundaryT, @@ -760,12 +743,7 @@ def run( evaluation_budget=evaluation_budget, record_budget_remaining=record_budget_remaining, ) - current_batch_size = _current_step_batch_size( - study, - batch_size=batch_size, - remaining=remaining, - evaluation_budget=evaluation_budget, - ) + current_batch_size = min(batch_size, remaining) step_result = evaluate_step( study, state,