From 6c0574966d95d5912e692f844068b8e11fa21852 Mon Sep 17 00:00:00 2001 From: Qian Xie <90579251+JaneQianXie@users.noreply.github.com> Date: Sun, 14 Jun 2026 22:56:06 +0800 Subject: [PATCH 1/2] clean up algorithms provided (remove bad algorithms) --- docs/api/selectors.md | 41 +- docs/concepts/algorithms.md | 143 +---- .../selection/local/advanced_algorithms.py | 76 +-- src/agentopt/__init__.py | 25 +- src/agentopt/model_selection/__init__.py | 10 - src/agentopt/model_selection/base.py | 5 - src/agentopt/model_selection/epsilon_lucb.py | 341 ----------- src/agentopt/model_selection/hill_climbing.py | 559 ------------------ src/agentopt/model_selection/lm_proposal.py | 314 ---------- src/agentopt/model_selection/matrix_ucb.py | 4 +- .../matrix_ucb_factorization.py | 69 --- src/agentopt/model_selection/random_search.py | 197 ------ .../threshold_successive_elimination.py | 416 ------------- 13 files changed, 24 insertions(+), 2176 deletions(-) delete mode 100644 src/agentopt/model_selection/epsilon_lucb.py delete mode 100644 src/agentopt/model_selection/hill_climbing.py delete mode 100644 src/agentopt/model_selection/lm_proposal.py delete mode 100644 src/agentopt/model_selection/matrix_ucb_factorization.py delete mode 100644 src/agentopt/model_selection/random_search.py delete mode 100644 src/agentopt/model_selection/threshold_successive_elimination.py diff --git a/docs/api/selectors.md b/docs/api/selectors.md index d990203..09b007a 100644 --- a/docs/api/selectors.md +++ b/docs/api/selectors.md @@ -27,7 +27,6 @@ results.print_summary() | `model_prices` | `Dict`, optional | Custom pricing overrides: `{"model": {"input_price": x, "output_price": y}}` in $/MTok. Required for cost terms when `lambda_cost > 0`. | | `lambda_cost` | `float`, optional | Weight on **normalized** per-sample cost in the combined objective. Default `0.0` (disabled). See [Combined objective](#combined-objective-optional-costlatency-weights) below. | | `lambda_latency` | `float`, optional | Weight on **normalized** per-sample latency in the combined objective. Default `0.0` (disabled). | -| `node_descriptions` | `Dict[str, str]`, optional | Human-readable descriptions per node — surfaced in `LMProposalModelSelector`. | | `tracker` | `LLMTracker`, optional | Bring your own. Defaults to a fresh `LLMTracker()` started in the constructor. Pass one in to share a cache across runs, route via a daemon (`AGENTOPT_GATEWAY_URL`), or post-process records after `select_best()` returns. | The selector calls `tracker.start()` in the constructor and `tracker.stop()` when `select_best()` returns or raises. Record queries on the tracker remain valid after `stop()`, so post-run analysis works: @@ -93,16 +92,12 @@ results.print_summary() # ranks by combined_objective when lambdas are set | Methods | During search | Final `is_best` | |:---|:---|:---| | `matrix_ucb`, `matrix_ucb_lrf` | UCB rewards use per-cell combined objective | `_find_best` on `combined_objective` | -| `arm_elimination`, `epsilon_lucb`, `threshold` | Elimination / LUCB stats on combined per-sample objectives | same | -| `hill_climbing`, `bayesian` | Move / surrogate target uses combined objective | same | -| `brute_force`, `random` | Does not steer *which* combos to try | same | -| `lm_proposal` | Proposer uses `objective=` **text**, not these lambdas | `combined_objective` on the one evaluated combo only | +| `arm_elimination` | Elimination stats on combined per-sample objectives | same | +| `bayesian` | Surrogate target uses combined objective | same | +| `brute_force` | Does not steer *which* combos to try | same | After `select_best()`, a final pass recomputes every result’s `combined_objective` against the **full-run** normalizer so rankings are comparable. -!!! note "`lm_proposal` vs lambdas" - `LMProposalModelSelector(objective="...")` is a natural-language hint to the **proposer LLM**. It is separate from `lambda_cost` / `lambda_latency`, which only affect the scalar reward used for ranking and bandit methods. - ## `select_best()` ```python @@ -123,13 +118,8 @@ Returns a [`SelectionResults`](results.md). `parallel=True` requires `agent.run` |:---|:---|:---| | `"auto"` (default) | Arm elimination | Strong best-arm identification at lower search cost than brute force. Same impl as `"arm_elimination"`. | | `"brute_force"` | Evaluate every combo on the full dataset | Small search space; ground-truth comparison. | -| `"random"` | Random search | Cheap baseline. | -| `"hill_climbing"` | Greedy per-node | Large combinatorial spaces with weak coupling between nodes. | | `"arm_elimination"` | Successive elimination | Best-arm identification with PAC-style guarantees. | -| `"epsilon_lucb"` | LUCB with tolerance | Stop once a combo is within ε of the best. | | `"matrix_ucb"` / `"matrix_ucb_lrf"` | UCB exploiting cross-combo structure | Large model x datapoint matrices; `lrf` adds low-rank factorization. | -| `"threshold"` | Threshold bandit successive elimination | "Find all combos above accuracy θ" rather than the single best. | -| `"lm_proposal"` | LM-guided | Uses `node_descriptions` to propose combinations. | | `"bayesian"` | Bayesian optimization | Optional extra: `pip install "agentopt-py[bayesian]"`. | --- @@ -141,26 +131,11 @@ Returns a [`SelectionResults`](results.md). `parallel=True` requires `agent.run` members: false show_bases: false -::: agentopt.model_selection.random_search.RandomSearchModelSelector - options: - members: false - show_bases: false - -::: agentopt.model_selection.hill_climbing.HillClimbingModelSelector - options: - members: false - show_bases: false - ::: agentopt.model_selection.arm_elimination.ArmEliminationModelSelector options: members: false show_bases: false -::: agentopt.model_selection.epsilon_lucb.EpsilonLUCBModelSelector - options: - members: false - show_bases: false - ::: agentopt.model_selection.matrix_ucb.MatrixUCBModelSelector options: members: false @@ -171,16 +146,6 @@ Returns a [`SelectionResults`](results.md). `parallel=True` requires `agent.run` members: false show_bases: false -::: agentopt.model_selection.threshold_successive_elimination.ThresholdBanditSEModelSelector - options: - members: false - show_bases: false - -::: agentopt.model_selection.lm_proposal.LMProposalModelSelector - options: - members: false - show_bases: false - ::: agentopt.model_selection.bayesian_optimization.BayesianOptimizationModelSelector options: members: false diff --git a/docs/concepts/algorithms.md b/docs/concepts/algorithms.md index 40a3f6e..58858fa 100644 --- a/docs/concepts/algorithms.md +++ b/docs/concepts/algorithms.md @@ -1,18 +1,14 @@ # Selection Algorithms -AgentOpt provides 8 selection algorithms. Choose based on your search space size and evaluation budget. +AgentOpt provides 5 selection algorithms. Choose based on your search space size and evaluation budget. ## At a Glance | Algorithm | Strategy | Evaluations | Best For | |:----------|:---------|:------------|:---------| | [Brute Force](#brute-force) | Exhaustive | All | Small spaces (< 50 combos) | -| [Random Search](#random-search) | Sampling | Configurable fraction | Quick baselines | -| [Hill Climbing](#hill-climbing) | Greedy + restarts | Guided neighbors | Medium spaces | | [Arm Elimination](#arm-elimination) | Progressive pruning | Adaptive | Statistical early stopping | -| [Epsilon LUCB](#epsilon-lucb) | ε-optimal LUCB | Adaptive | Cost savings when ε-optimal is enough | -| [Threshold SE](#threshold-successive-elimination) | Threshold classification | Adaptive | Filtering above/below a performance target | -| [LM Proposal](#lm-proposal) | LLM-guided | Shortlist | Leveraging model knowledge | +| [Matrix UCB](#matrix-ucb) | UCB over combo × datapoint grid | Budgeted | Large spaces with selective datapoint sampling | | [Bayesian Optimization](#bayesian-optimization) | GP surrogate | Sequential | Expensive evaluations | !!! tip "Common interface" @@ -55,62 +51,6 @@ selector = BruteForceModelSelector( --- -## Random Search - -Samples a random fraction of all combinations. - -```python -from agentopt import RandomSearchModelSelector - -selector = RandomSearchModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - sample_fraction=0.25, # evaluate 25% of combinations - seed=42, -) -``` - -| Parameter | Default | Description | -|:----------|:--------|:------------| -| `sample_fraction` | `0.25` | Fraction of combinations to evaluate | -| `seed` | `None` | Random seed for reproducibility | - -!!! success "When to use" - Quick exploration to establish a baseline before committing to a thorough search. - ---- - -## Hill Climbing - -Greedy local search with random restarts. Defines "neighbors" using model quality and speed rankings, so each step is an informed single-model swap. - -```python -from agentopt import HillClimbingModelSelector - -selector = HillClimbingModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - max_iterations=20, - num_restarts=3, - patience=3, -) -``` - -| Parameter | Default | Description | -|:----------|:--------|:------------| -| `max_iterations` | `20` | Max steps per restart | -| `num_restarts` | `3` | Number of random restarts | -| `patience` | `3` | Steps without improvement before restart | - -!!! success "When to use" - Medium-sized spaces where you want to exploit model topology — cheaper models are neighbors of expensive ones. - ---- - ## Arm Elimination Progressively eliminates statistically dominated combinations. Starts with a small batch of datapoints, then grows the batch while eliminating underperformers. @@ -135,92 +75,37 @@ selector = ArmEliminationModelSelector( | `confidence` | `1.0` | Elimination confidence threshold | !!! success "When to use" - When bad combinations should be eliminated early to save budget. Particularly effective when there are clearly weak options. - ---- - -## Epsilon LUCB - -Identifies an ε-optimal best arm using Lower and Upper Confidence Bounds. Each round, it compares the current leader's lower confidence bound against the best challenger's upper bound. When the gap closes below epsilon, the algorithm stops with statistical confidence that the selected arm is within epsilon of optimal. - -```python -from agentopt import EpsilonLUCBModelSelector - -selector = EpsilonLUCBModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - epsilon=0.01, - confidence=1.0, -) -``` - -| Parameter | Default | Description | -|:----------|:--------|:------------| -| `epsilon` | `0.01` | Acceptable gap from the true best | -| `n_initial` | `1` | Initial datapoints per combination | -| `confidence` | `1.0` | Confidence level for bound computation | - -!!! success "When to use" - When finding the *exact* best combo isn't necessary and you can tolerate a small accuracy gap (epsilon) in exchange for significant search cost savings. Particularly effective when many combos are close in performance. + When bad combinations should be eliminated early to save budget. Particularly effective when there are clearly weak options. This is the default (`method="auto"`). --- -## Threshold Successive Elimination +## Matrix UCB -Instead of finding the single best combination, Threshold SE classifies each combination as above or below a user-defined performance threshold. Each round, it evaluates all surviving combos on one more datapoint and checks their confidence intervals. Once a combo's interval no longer straddles the threshold (entirely above or entirely below), it's classified and removed from the active set. +UCB exploration over the combination × datapoint matrix. Instead of evaluating every combo on every datapoint, it adaptively picks which cells to observe next. ```python -from agentopt import ThresholdBanditSEModelSelector +from agentopt import MatrixUCBModelSelector -selector = ThresholdBanditSEModelSelector( +selector = MatrixUCBModelSelector( agent=MyAgent, models=models, eval_fn=eval_fn, dataset=dataset, - threshold=0.75, - confidence=1.0, + a=1.0, + sample_fraction=0.25, ) ``` | Parameter | Default | Description | |:----------|:--------|:------------| -| `threshold` | `0.75` | Performance threshold to classify against | -| `confidence` | `1.0` | Confidence level for bound computation | - -!!! success "When to use" - When you have a minimum acceptable accuracy in mind (e.g., "I need at least 75%") and want to quickly identify which combinations meet it. Useful for filtering rather than ranking. - ---- - -## LM Proposal - -Uses a proposer LLM to shortlist promising combinations before evaluation. The proposer sees the candidate models and a dataset preview, then suggests which combinations to try. - -```python -from agentopt import LMProposalModelSelector - -selector = LMProposalModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - proposer_model="gpt-4.1", - objective="maximize accuracy and then minimize latency and cost", - dataset_preview_size=10, -) -``` +| `a` | `1.0` | UCB exploration coefficient | +| `sample_fraction` | `None` | Fraction of the combo × datapoint grid to observe (alias for `observation_budget_fraction`) | +| `seed` | `None` | Random seed for reproducibility | -| Parameter | Default | Description | -|:----------|:--------|:------------| -| `proposer_model` | `"gpt-4.1"` | Model used for proposal generation | -| `proposer_client` | `None` | Custom OpenAI-compatible client; auto-creates `OpenAI()` if omitted | -| `objective` | `"maximize accuracy and then minimize latency and cost"` | Natural-language objective passed to the proposer | -| `dataset_preview_size` | `10` | Number of dataset examples shown to the proposer | +A low-rank factorization variant is available via `MatrixUCBLRFModelSelector` (`method="matrix_ucb_lrf"`). It adds parameters like `rank`, `ensemble_size`, and `warmup_fraction` for structured uncertainty over the matrix. !!! success "When to use" - When you want to leverage an LLM's knowledge about model capabilities to skip obviously bad combinations. + Large search spaces where you want to sample both combinations and datapoints intelligently rather than running the full grid. --- diff --git a/examples/selection/local/advanced_algorithms.py b/examples/selection/local/advanced_algorithms.py index 0f9540b..f090428 100644 --- a/examples/selection/local/advanced_algorithms.py +++ b/examples/selection/local/advanced_algorithms.py @@ -15,7 +15,7 @@ 4. For matrix UCB-LRF: pip install "agentopt-py[ucb_lrf]" The matrix UCB demos use ``sample_fraction=0.1`` (~10% of the combination × datapoint -grid), like ``random`` / ``bayesian``. Matrix UCB-LRF also accepts ``warmup_fraction`` +grid), like ``bayesian``. Matrix UCB-LRF also accepts ``warmup_fraction`` (alias for ``warmup_percentage``). Use ``1.0`` for a full grid; tune ``max_concurrent`` for step size. """ @@ -103,32 +103,6 @@ def run_auto(): return selector.select_best(parallel=True) -def run_random(): - """method="random" — evaluate a random subset of combinations.""" - selector = ModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - method="random", - sample_fraction=0.25, # evaluate 25% of all combinations - ) - return selector.select_best(parallel=True) - - -def run_hill_climbing(): - """method="hill_climbing" — greedy search using model quality/speed rankings.""" - selector = ModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - method="hill_climbing", - batch_size=4, # number of neighbors to evaluate per step - ) - return selector.select_best(parallel=True) - - def run_arm_elimination(): """method="arm_elimination" — eliminates statistically dominated combinations early.""" selector = ModelSelector( @@ -141,44 +115,6 @@ def run_arm_elimination(): return selector.select_best(parallel=True) -def run_epsilon_lucb(): - """method="epsilon_lucb" — stops when the best arm is identified within epsilon.""" - selector = ModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - method="epsilon_lucb", - epsilon=0.01, # acceptable gap from the true best - ) - return selector.select_best(parallel=True) - - -def run_threshold(): - """method="threshold" — classify combinations as above/below a quality threshold.""" - selector = ModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - method="threshold", - threshold=0.75, # minimum acceptable accuracy - ) - return selector.select_best(parallel=True) - - -def run_lm_proposal(): - """method="lm_proposal" — use a proposer LLM to shortlist promising combinations.""" - selector = ModelSelector( - agent=MyAgent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - method="lm_proposal", - ) - return selector.select_best(parallel=True) - - def run_bayesian(): """method="bayesian" — GP-based Bayesian optimization (requires agentopt[bayesian]).""" selector = ModelSelector( @@ -233,12 +169,7 @@ def run_matrix_ucb_lrf(): METHODS = { "auto": run_auto, - "random": run_random, - "hill_climbing": run_hill_climbing, "arm_elimination": run_arm_elimination, - "epsilon_lucb": run_epsilon_lucb, - "threshold": run_threshold, - "lm_proposal": run_lm_proposal, "bayesian": run_bayesian, "matrix_ucb": run_matrix_ucb, "matrix_ucb_lrf": run_matrix_ucb_lrf, @@ -253,12 +184,7 @@ def run_matrix_ucb_lrf(): epilog=""" Available methods: auto Automatically finds the best combination (wired to arm_elimination; lower search cost than brute_force) (default) - random Evaluate a random subset of combinations - hill_climbing Greedy search using model quality/speed rankings arm_elimination Eliminate statistically dominated combinations early - epsilon_lucb Stop when best arm is identified within epsilon - threshold Classify combinations above/below a quality threshold - lm_proposal Use a proposer LLM to shortlist promising combinations bayesian GP-based Bayesian optimization (requires agentopt[bayesian]) matrix_ucb UCB on the combination × datapoint matrix (demo: 10%% budget) matrix_ucb_lrf Same with low-rank uncertainty (requires agentopt[ucb_lrf]) diff --git a/src/agentopt/__init__.py b/src/agentopt/__init__.py index 0790714..8ce8162 100644 --- a/src/agentopt/__init__.py +++ b/src/agentopt/__init__.py @@ -80,16 +80,11 @@ def get_current_session_proxy() -> Optional[SessionProxy]: ArmEliminationModelSelector, BaseModelSelector, BruteForceModelSelector, - EpsilonLUCBModelSelector, - HillClimbingModelSelector, - LMProposalModelSelector, DatapointResult, MatrixUCBLRFModelSelector, MatrixUCBModelSelector, ModelResult, - RandomSearchModelSelector, SelectionResults, - ThresholdBanditSEModelSelector, ) # Bayesian is optional (requires torch/botorch) @@ -101,14 +96,9 @@ def get_current_session_proxy() -> Optional[SessionProxy]: _METHODS = { "auto": ArmEliminationModelSelector, "brute_force": BruteForceModelSelector, - "random": RandomSearchModelSelector, - "hill_climbing": HillClimbingModelSelector, "arm_elimination": ArmEliminationModelSelector, - "epsilon_lucb": EpsilonLUCBModelSelector, "matrix_ucb": MatrixUCBModelSelector, "matrix_ucb_lrf": MatrixUCBLRFModelSelector, - "threshold": ThresholdBanditSEModelSelector, - "lm_proposal": LMProposalModelSelector, "bayesian": BayesianOptimizationModelSelector, } @@ -131,13 +121,11 @@ def ModelSelector( the best combination (same implementation as ``"arm_elimination"`` — strong best-arm identification with lower search cost than ``"brute_force"``). Other options: ``"brute_force"``, - ``"random"``, ``"hill_climbing"``, ``"arm_elimination"``, - ``"epsilon_lucb"``, ``"matrix_ucb"``, ``"matrix_ucb_lrf"``, - ``"threshold"``, - ``"lm_proposal"``, ``"bayesian"``. + ``"arm_elimination"``, ``"matrix_ucb"``, ``"matrix_ucb_lrf"``, + ``"bayesian"``. **kwargs: Additional arguments passed to the selector - (e.g. ``epsilon``, ``threshold``, ``sample_fraction``, ``warmup_fraction`` - for matrix UCB-LRF; ``lambda_cost``, ``lambda_latency`` for the optional + (e.g. ``sample_fraction``, ``warmup_fraction`` for matrix UCB-LRF; + ``lambda_cost``, ``lambda_latency`` for the optional combined objective ``score - lambda_cost*norm_cost - lambda_latency*norm_latency`` — both default to ``0.0`` / accuracy-only). @@ -167,14 +155,9 @@ def ModelSelector( "CallRecord", # Selectors "BruteForceModelSelector", - "RandomSearchModelSelector", - "HillClimbingModelSelector", "ArmEliminationModelSelector", - "EpsilonLUCBModelSelector", "MatrixUCBModelSelector", "MatrixUCBLRFModelSelector", - "ThresholdBanditSEModelSelector", - "LMProposalModelSelector", "BayesianOptimizationModelSelector", # Result types "DatapointResult", diff --git a/src/agentopt/model_selection/__init__.py b/src/agentopt/model_selection/__init__.py index 56feae6..cec5306 100644 --- a/src/agentopt/model_selection/__init__.py +++ b/src/agentopt/model_selection/__init__.py @@ -3,11 +3,6 @@ from .arm_elimination import ArmEliminationModelSelector from .base import BaseModelSelector, DatapointResult, ModelResult, SelectionResults from .brute_force import BruteForceModelSelector -from .epsilon_lucb import EpsilonLUCBModelSelector -from .hill_climbing import HillClimbingModelSelector -from .lm_proposal import LMProposalModelSelector -from .random_search import RandomSearchModelSelector -from .threshold_successive_elimination import ThresholdBanditSEModelSelector from .matrix_ucb import MatrixUCBLRFModelSelector, MatrixUCBModelSelector # Bayesian is optional (requires torch/botorch) @@ -19,12 +14,7 @@ __all__ = [ "BaseModelSelector", "BruteForceModelSelector", - "RandomSearchModelSelector", - "HillClimbingModelSelector", "ArmEliminationModelSelector", - "EpsilonLUCBModelSelector", - "ThresholdBanditSEModelSelector", - "LMProposalModelSelector", "MatrixUCBModelSelector", "MatrixUCBLRFModelSelector", "BayesianOptimizationModelSelector", diff --git a/src/agentopt/model_selection/base.py b/src/agentopt/model_selection/base.py index 177487f..157e747 100644 --- a/src/agentopt/model_selection/base.py +++ b/src/agentopt/model_selection/base.py @@ -716,7 +716,6 @@ def __init__( eval_fn: EvalFn = None, dataset: Dataset = None, model_prices: Optional[Dict[str, Dict[str, float]]] = None, - node_descriptions: Optional[Dict[str, str]] = None, tracker: Optional[LLMTracker] = None, lambda_cost: float = 0.0, lambda_latency: float = 0.0, @@ -739,9 +738,6 @@ def __init__( model_prices: Optional custom pricing overrides. Maps model names to dicts with ``'input_price'`` and ``'output_price'`` keys ($/MTok). - node_descriptions: Optional dict mapping node names to human-readable - descriptions of what each node does, e.g. - ``{"planner": "Decomposes queries into sub-tasks"}``. tracker: Optional :class:`LLMTracker` instance. If not provided, one is created and started automatically. lambda_cost: Weight on normalized per-sample cost in the combined @@ -776,7 +772,6 @@ def __init__( self._models = models self._node_names = list(models.keys()) self.model_prices = model_prices - self.node_descriptions = node_descriptions self.lambda_cost = float(lambda_cost) self.lambda_latency = float(lambda_latency) diff --git a/src/agentopt/model_selection/epsilon_lucb.py b/src/agentopt/model_selection/epsilon_lucb.py deleted file mode 100644 index 52cd05c..0000000 --- a/src/agentopt/model_selection/epsilon_lucb.py +++ /dev/null @@ -1,341 +0,0 @@ -""" -Epsilon-optimal LUCB model selector. - -Identifies an epsilon-optimal best model combination using confidence bounds. -""" - -import asyncio -import logging -import math -from typing import Any, Callable, Dict, List, Optional, Set, Tuple - -from ..base_models import Dataset, EvalFn, ModelCandidate -from .base import BaseModelSelector, ModelResult, SelectionResults - -logger = logging.getLogger(__name__) - - -class EpsilonLUCBModelSelector(BaseModelSelector): - """Select models via epsilon-optimal LUCB.""" - - def __init__( - self, - agent: Any = None, - models: Dict[str, List[ModelCandidate]] = None, - eval_fn: EvalFn = None, - dataset: Dataset = None, - epsilon: float = 0.01, - n_initial: int = 1, - confidence: float = 1.0, - model_prices: Optional[Dict[str, Dict[str, float]]] = None, - tracker=None, - lambda_cost: float = 0.0, - lambda_latency: float = 0.0, - ) -> None: - super().__init__( - agent=agent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - model_prices=model_prices, - tracker=tracker, - lambda_cost=lambda_cost, - lambda_latency=lambda_latency, - ) - self.epsilon = max(0.0, float(epsilon)) - self.n_initial = max(1, int(n_initial)) - self.confidence = confidence - - def _run_selection( - self, parallel: bool = False, max_concurrent: int = 20, - ) -> SelectionResults: - if parallel: - return asyncio.run(self._select_async(max_concurrent)) - return self._select_sequential() - - def _select_sequential(self) -> SelectionResults: - all_combos = self._all_combos() - dataset_list = list(self.dataset) - n_total = len(dataset_list) - n_arms = len(all_combos) - - combo_scores: Dict[int, List[float]] = {i: [] for i in range(n_arms)} - combo_latencies: Dict[int, List[float]] = {i: [] for i in range(n_arms)} - combo_costs: Dict[int, List[Optional[float]]] = {i: [] for i in range(n_arms)} - combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(n_arms)} - active: Set[int] = set(range(n_arms)) - - print(f"\n{'='*60}") - print( - f"Epsilon-LUCB (sequential): {n_arms} combinations, " - f"{n_total} samples, epsilon={self.epsilon}" - ) - print(f"{'='*60}") - - offset = 0 - init_batch_size = min(self.n_initial, n_total) - assert init_batch_size > 0 - init_batch = dataset_list[offset : offset + init_batch_size] - for idx in range(n_arms): - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = self._evaluate_combo( - combo, init_batch, label=combo_name, dp_offset=offset - ) - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - offset += init_batch_size - - round_num = 1 - while active and offset < n_total: - h_idx, l_idx, h_lcb, l_ucb = self._choose_lucb_pair( - active, combo_scores, combo_latencies, combo_costs - ) - gap = l_ucb - h_lcb - if gap <= self.epsilon or l_idx is None: - break - - batch = [dataset_list[offset]] - offset += 1 - sample_idxs = [h_idx] if l_idx == h_idx else [h_idx, l_idx] - - print( - f"\nRound {round_num} [sample {offset}/{n_total}] " - f"h={self._combo_name(all_combos[h_idx])}, " - f"l={self._combo_name(all_combos[l_idx])}, gap={gap:.4f}" - ) - - for idx in sample_idxs: - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = self._evaluate_combo( - combo, batch, label=combo_name, dp_offset=offset - 1 - ) - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - objs = self._compute_objectives( - combo_scores[idx], combo_latencies[idx], combo_costs[idx] - ) - mu, _ = self._compute_stats(objs) - print(f" {combo_name}: mu={mu:.3f} (n={len(objs)})") - - round_num += 1 - - return self._build_results( - all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids - ) - - async def _select_async(self, max_concurrent: int = 20) -> SelectionResults: - all_combos = self._all_combos() - dataset_list = list(self.dataset) - n_total = len(dataset_list) - n_arms = len(all_combos) - - combo_scores: Dict[int, List[float]] = {i: [] for i in range(n_arms)} - combo_latencies: Dict[int, List[float]] = {i: [] for i in range(n_arms)} - combo_costs: Dict[int, List[Optional[float]]] = {i: [] for i in range(n_arms)} - combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(n_arms)} - active: Set[int] = set(range(n_arms)) - - print(f"\n{'='*60}") - print( - f"Epsilon-LUCB (async): {n_arms} combinations, " - f"{n_total} samples, epsilon={self.epsilon}, " - f"max {max_concurrent} total concurrent" - ) - print(f"{'='*60}") - - offset = 0 - init_batch_size = min(self.n_initial, n_total) - assert init_batch_size > 0 - init_batch = dataset_list[offset : offset + init_batch_size] - n_combo_init, dp_concurrent_init = self._compute_concurrency( - max_concurrent, init_batch_size - ) - init_combo_sem = asyncio.Semaphore(n_combo_init) - - async def _eval_initial( - idx: int, - ) -> Tuple[int, List[float], List[float], List[str]]: - async with init_combo_sem: - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = await self._evaluate_combo_async( - combo, - init_batch, - label=combo_name, - max_concurrent=dp_concurrent_init, - dp_offset=offset, - ) - return idx, scores, latencies, dp_ids - - round_results = await asyncio.gather( - *[_eval_initial(idx) for idx in range(n_arms)], return_exceptions=True, - ) - for res in round_results: - if isinstance(res, Exception): - logger.warning("Initial LUCB batch evaluation error: %s", res) - continue - idx, scores, latencies, dp_ids = res - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - offset += init_batch_size - - round_num = 1 - # Per-round batch_size is always 1 - n_combo_round, dp_concurrent_round = self._compute_concurrency( - max_concurrent, 1 - ) - round_combo_sem = asyncio.Semaphore(n_combo_round) - - while active and offset < n_total: - h_idx, l_idx, h_lcb, l_ucb = self._choose_lucb_pair( - active, combo_scores, combo_latencies, combo_costs - ) - gap = l_ucb - h_lcb - if gap <= self.epsilon or l_idx is None: - break - - batch = [dataset_list[offset]] - offset += 1 - sample_idxs = [h_idx] if l_idx == h_idx else [h_idx, l_idx] - - print( - f"\nRound {round_num} [sample {offset}/{n_total}] " - f"h={self._combo_name(all_combos[h_idx])}, " - f"l={self._combo_name(all_combos[l_idx])}, gap={gap:.4f}" - ) - - async def _eval_pair( - idx: int, - ) -> Tuple[int, List[float], List[float], List[str]]: - async with round_combo_sem: - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = await self._evaluate_combo_async( - combo, - batch, - label=combo_name, - max_concurrent=dp_concurrent_round, - dp_offset=offset - 1, - ) - return idx, scores, latencies, dp_ids - - round_results = await asyncio.gather( - *[_eval_pair(idx) for idx in sample_idxs], return_exceptions=True, - ) - for res in round_results: - if isinstance(res, Exception): - logger.warning("LUCB pair evaluation error: %s", res) - continue - idx, scores, latencies, dp_ids = res - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - objs = self._compute_objectives( - combo_scores[idx], combo_latencies[idx], combo_costs[idx] - ) - mu, _ = self._compute_stats(objs) - print( - f" {self._combo_name(all_combos[idx])}: " - f"mu={mu:.3f} (n={len(objs)})" - ) - - round_num += 1 - - return self._build_results( - all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids - ) - - def _choose_lucb_pair( - self, - active: Set[int], - combo_scores: Dict[int, List[float]], - combo_latencies: Dict[int, List[float]], - combo_costs: Dict[int, List[Optional[float]]], - ) -> Tuple[int, Optional[int], float, float]: - stats: Dict[int, Tuple[float, float, float]] = {} - for idx in active: - objs = self._compute_objectives( - combo_scores[idx], combo_latencies[idx], combo_costs[idx] - ) - stats[idx] = self._confidence_bounds(objs) - - h_idx = max(active, key=lambda i: stats[i][0]) # highest empirical mean - if len(active) == 1: - _, h_lcb, _ = stats[h_idx] - return h_idx, None, h_lcb, h_lcb - - competitors = [i for i in active if i != h_idx] - l_idx = max(competitors, key=lambda i: stats[i][2]) # highest UCB - h_lcb = stats[h_idx][1] - l_ucb = stats[l_idx][2] - return h_idx, l_idx, h_lcb, l_ucb - - def _confidence_bounds(self, values: List[float]) -> Tuple[float, float, float]: - n = len(values) - if n == 0: - return 0.0, float("-inf"), float("inf") - mu, std = self._compute_stats(values) - se = std / math.sqrt(n) - radius = self.confidence * se - return mu, mu - radius, mu + radius - - def _build_results( - self, - all_combos: List[Dict[str, ModelCandidate]], - combo_scores: Dict[int, List[float]], - combo_latencies: Dict[int, List[float]], - combo_costs: Dict[int, List[Optional[float]]], - combo_dp_ids: Dict[int, List[str]], - ) -> SelectionResults: - all_results: List[ModelResult] = [] - for idx, combo in enumerate(all_combos): - combo_name = self._combo_name(combo) - scores = combo_scores[idx] - if scores: - all_results.append( - self._build_combo_result( - combo_name, - scores, - combo_latencies[idx], - combo_dp_ids[idx], - costs=combo_costs[idx], - ) - ) - else: - all_results.append( - self._make_result( - model_name=combo_name, - accuracy=0.0, - latency_seconds=0.0, - input_tokens={}, - output_tokens={}, - attribute="combination", - is_best=False, - ) - ) - - self._finalize_combined_objectives(all_results) - best_info = self._find_best(all_results) - if best_info is not None: - best_name, _ = best_info - for result in all_results: - if result.model_name == best_name: - result.is_best = True - break - else: - print("\n No combinations succeeded.") - - return SelectionResults(results=all_results) diff --git a/src/agentopt/model_selection/hill_climbing.py b/src/agentopt/model_selection/hill_climbing.py deleted file mode 100644 index b6b8ab9..0000000 --- a/src/agentopt/model_selection/hill_climbing.py +++ /dev/null @@ -1,559 +0,0 @@ -""" -Hill-climbing model selector with random restarts. - -Uses the model topology (quality / speed rankings) to define -neighbours so that each iteration makes an informed single-step move. -""" - -import asyncio -import random -from typing import Any, Callable, Dict, List, Optional, Set, Tuple - -from ..base_models import Dataset, EvalFn, ModelCandidate -from ..model_price import compute_price -from ..model_topology import get_faster_neighbor, get_higher_quality_neighbor -from .base import BaseModelSelector, DatapointResult, ModelResult, SelectionResults - - -class HillClimbingModelSelector(BaseModelSelector): - """Select models via stochastic hill climbing with random restarts.""" - - def __init__( - self, - agent: Any = None, - models: Dict[str, List[ModelCandidate]] = None, - eval_fn: EvalFn = None, - dataset: Dataset = None, - max_iterations: int = 20, - num_restarts: int = 3, - patience: int = 3, - seed: Optional[int] = None, - batch_size: int = 1, - model_prices: Optional[Dict[str, Dict[str, float]]] = None, - tracker=None, - lambda_cost: float = 0.0, - lambda_latency: float = 0.0, - ) -> None: - super().__init__( - agent=agent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - model_prices=model_prices, - tracker=tracker, - lambda_cost=lambda_cost, - lambda_latency=lambda_latency, - ) - self.max_iterations = max_iterations - self.num_restarts = num_restarts - self.patience = patience - self.batch_size = max(1, int(batch_size)) - - if seed is not None: - random.seed(seed) - - # Pre-compute all combinations for random starts. - self._all_combo_list = self._all_combos() - - # Cache: combo_name -> (accuracy, latency, input_tokens, output_tokens, datapoint_results). - self._eval_cache: Dict[ - str, - Tuple[float, float, Dict[str, int], Dict[str, int], List[DatapointResult]], - ] = {} - - def _objective_from_dp(self, dp_results: List[DatapointResult]) -> Optional[float]: - """Recompute the mean combined objective from cached datapoint results.""" - if not self._has_combined_objective or not dp_results: - return None - scores = [dp.score for dp in dp_results] - lats = [dp.latency_seconds for dp in dp_results] - costs = [ - compute_price( - dp.input_tokens, dp.output_tokens, custom_prices=self._custom_prices, - ) - for dp in dp_results - ] - return self._mean_objective(scores, lats, costs) - - def _primary_value( - self, accuracy: float, dp_results: List[DatapointResult], - ) -> float: - """Ranking key for tiebreaks: combined objective if configured, else accuracy.""" - obj = self._objective_from_dp(dp_results) - return obj if obj is not None else accuracy - - # ------------------------------------------------------------------ - # Shared helpers - # ------------------------------------------------------------------ - - def _random_combination( - self, seen: Set[str] - ) -> Optional[Dict[str, ModelCandidate]]: - """Pick a random unseen combination, or ``None`` if all exhausted.""" - unseen = [c for c in self._all_combo_list if self._combo_name(c) not in seen] - if unseen: - return dict(random.choice(unseen)) - return None - - def _process_eval_result( - self, - combo_name: str, - scores: List[float], - latencies: List[float], - dp_ids: List[str], - ) -> Tuple[ - str, float, float, Dict[str, int], Dict[str, int], List[DatapointResult], bool - ]: - """Compute stats, absorb cost samples, cache, and return the eval tuple.""" - self._observe_combo(scores, latencies, dp_ids) - input_tokens, output_tokens = self._fetch_tokens(combo_name) - accuracy, _ = self._compute_stats(scores) - latency = sum(latencies) / len(latencies) if latencies else 0.0 - dp_results = self._build_datapoint_results(scores, latencies, dp_ids) - self._eval_cache[combo_name] = ( - accuracy, - latency, - input_tokens, - output_tokens, - dp_results, - ) - return ( - combo_name, - accuracy, - latency, - input_tokens, - output_tokens, - dp_results, - False, - ) - - def _evaluate_cached( - self, combo: Dict[str, ModelCandidate], - ) -> Tuple[ - str, float, float, Dict[str, int], Dict[str, int], List[DatapointResult], bool - ]: - """Evaluate a combo synchronously, using an in-memory cache.""" - combo_name = self._combo_name(combo) - if combo_name in self._eval_cache: - acc, lat, in_tok, out_tok, dp_results = self._eval_cache[combo_name] - return combo_name, acc, lat, in_tok, out_tok, dp_results, True - scores, latencies, dp_ids = self._evaluate_combo( - combo, self.dataset, label=combo_name - ) - return self._process_eval_result(combo_name, scores, latencies, dp_ids) - - async def _evaluate_cached_async( - self, combo: Dict[str, ModelCandidate], max_concurrent: int - ) -> Tuple[ - str, float, float, Dict[str, int], Dict[str, int], List[DatapointResult], bool - ]: - """Evaluate a combo asynchronously, using an in-memory cache.""" - combo_name = self._combo_name(combo) - if combo_name in self._eval_cache: - acc, lat, in_tok, out_tok, dp_results = self._eval_cache[combo_name] - return combo_name, acc, lat, in_tok, out_tok, dp_results, True - scores, latencies, dp_ids = await self._evaluate_combo_async( - combo, self.dataset, label=combo_name, max_concurrent=max_concurrent - ) - return self._process_eval_result(combo_name, scores, latencies, dp_ids) - - def _get_neighbors( - self, combo: Dict[str, ModelCandidate], seen: Set[str], accuracy: float, - ) -> List[Dict[str, ModelCandidate]]: - """Generate neighbors with quality/speed fallback logic.""" - if accuracy < 1.0: - neighbors = self._generate_neighbors( - combo, seen, max_neighbors=self.batch_size, improve_quality=True, - ) - if not neighbors: - neighbors = self._generate_neighbors( - combo, seen, max_neighbors=self.batch_size, improve_quality=False, - ) - else: - neighbors = self._generate_neighbors( - combo, seen, max_neighbors=self.batch_size, improve_quality=False, - ) - if not neighbors: - neighbors = self._generate_neighbors( - combo, seen, max_neighbors=self.batch_size, improve_quality=True, - ) - return neighbors - - def _generate_neighbors( - self, - combo: Dict[str, ModelCandidate], - seen: Set[str], - max_neighbors: int, - improve_quality: bool, - ) -> List[Dict[str, ModelCandidate]]: - """Generate up to *max_neighbors* unseen neighbors that differ by one node.""" - neighbors: List[Dict[str, ModelCandidate]] = [] - node_names = list(combo.keys()) - random.shuffle(node_names) - - for node in node_names: - current = combo[node] - if improve_quality: - neighbor = get_higher_quality_neighbor(current, self._models[node]) - else: - neighbor = get_faster_neighbor(current, self._models[node]) - if neighbor is None: - continue - - new_combo = dict(combo) - new_combo[node] = neighbor - if self._combo_name(new_combo) in seen: - continue - - neighbors.append(new_combo) - if len(neighbors) >= max_neighbors: - break - - return neighbors - - def _pick_best_neighbor( - self, - eval_results: List[Tuple], - neighbors: List[Dict[str, ModelCandidate]], - seen: Set[str], - current_value: float, - current_latency: float, - tol: float, - ) -> Optional[Dict[str, ModelCandidate]]: - """Select the best neighbor from eval results, or None if none improves. - - Ranks by primary value (combined objective when ``lambda_*`` are set, - else accuracy), with latency as the tiebreaker. - """ - best_neighbor: Optional[Dict[str, ModelCandidate]] = None - best_n_val = float("-inf") - best_n_lat = float("inf") - - for neighbor, eval_result in zip(neighbors, eval_results): - n_name, n_acc, n_lat, _, _, n_dp_results, _ = eval_result - seen.add(n_name) - n_val = self._primary_value(n_acc, n_dp_results) - - if n_val > best_n_val + tol: - best_neighbor, best_n_val, best_n_lat = neighbor, n_val, n_lat - elif abs(n_val - best_n_val) <= tol and n_lat < best_n_lat: - best_neighbor, best_n_val, best_n_lat = neighbor, n_val, n_lat - - if best_neighbor is None or ( - best_n_val < current_value - tol - or ( - abs(best_n_val - current_value) <= tol - and best_n_lat >= current_latency - ) - ): - return None - return best_neighbor - - def _hc_finalize( - self, - all_results: List[ModelResult], - global_best_combo: Optional[Dict[str, ModelCandidate]], - global_best_value: float, - ) -> SelectionResults: - """Finalize combined objectives, mark the best result, return results.""" - self._finalize_combined_objectives(all_results) - if global_best_combo is None: - print("\nNo combinations succeeded\n") - return SelectionResults(results=all_results) - - # Prefer the combined-objective-aware _find_best when lambdas are set; - # otherwise honor the within-search global best to preserve the - # original tie-breaking semantics. - if self._has_combined_objective: - best_info = self._find_best(all_results) - best_name = best_info[0] if best_info else self._combo_name(global_best_combo) - else: - best_name = self._combo_name(global_best_combo) - - tol = 1e-9 - for result in all_results: - if result.model_name != best_name: - continue - if self._has_combined_objective: - result.is_best = True - break - # Accuracy-mode: match by name AND the tracked best value. - if abs(result.accuracy - global_best_value) < tol: - result.is_best = True - break - return SelectionResults(results=all_results) - - # ------------------------------------------------------------------ - # Single restart (sequential) - # ------------------------------------------------------------------ - - def _hill_climb_once_sequential( - self, seen: Set[str], - ) -> Optional[Tuple[Dict[str, ModelCandidate], float, float, List[ModelResult]]]: - combo = self._random_combination(seen) - if combo is None: - return None - - results: List[ModelResult] = [] - best_combo = dict(combo) - best_value = float("-inf") - best_latency = float("inf") - tol = 1e-9 - no_improve_count = 0 - - for iteration in range(self.max_iterations): - combo_name = self._combo_name(combo) - seen.add(combo_name) - - ( - _, - accuracy, - latency, - input_tokens, - output_tokens, - dp_results, - cached, - ) = self._evaluate_cached(combo) - - result = self._make_result( - model_name=combo_name, - accuracy=accuracy, - latency_seconds=latency, - input_tokens=input_tokens, - output_tokens=output_tokens, - attribute="combination", - is_best=False, - datapoint_results=dp_results, - ) - suffix = " (cached)" if cached else "" - print(f" Iter {iteration + 1}: {result}{suffix}") - results.append(result) - - current_value = self._primary_value(accuracy, dp_results) - should_update = ( - best_value == float("-inf") - or current_value > best_value + tol - or (abs(current_value - best_value) <= tol and latency < best_latency) - ) - if should_update: - best_value, best_latency, best_combo = current_value, latency, dict(combo) - no_improve_count = 0 - else: - no_improve_count += 1 - - if no_improve_count >= self.patience: - print( - f" No improvement for {self.patience} iterations. " - f"Converged at iteration {iteration + 1}." - ) - break - - neighbors = self._get_neighbors(combo, seen, accuracy) - if not neighbors: - print(f" No improving moves at iteration {iteration + 1}. Stopping.") - break - - eval_results = [self._evaluate_cached(n) for n in neighbors] - best_neighbor = self._pick_best_neighbor( - eval_results, neighbors, seen, current_value, latency, tol - ) - if best_neighbor is None: - print( - f" No neighbor in batch of {len(neighbors)} improves at " - f"iteration {iteration + 1}. Stopping." - ) - break - - combo = dict(best_neighbor) - - return best_combo, best_value, best_latency, results - - # ------------------------------------------------------------------ - # Single restart (async) - # ------------------------------------------------------------------ - - async def _hill_climb_once_async( - self, seen: Set[str], max_concurrent: int - ) -> Optional[Tuple[Dict[str, ModelCandidate], float, float, List[ModelResult]]]: - combo = self._random_combination(seen) - if combo is None: - return None - - results: List[ModelResult] = [] - best_combo = dict(combo) - best_value = float("-inf") - best_latency = float("inf") - tol = 1e-9 - no_improve_count = 0 - - for iteration in range(self.max_iterations): - combo_name = self._combo_name(combo) - seen.add(combo_name) - - ( - _, - accuracy, - latency, - input_tokens, - output_tokens, - dp_results, - cached, - ) = await self._evaluate_cached_async(combo, max_concurrent=max_concurrent) - - result = self._make_result( - model_name=combo_name, - accuracy=accuracy, - latency_seconds=latency, - input_tokens=input_tokens, - output_tokens=output_tokens, - attribute="combination", - is_best=False, - datapoint_results=dp_results, - ) - suffix = " (cached)" if cached else "" - print(f" Iter {iteration + 1}: {result}{suffix}") - results.append(result) - - current_value = self._primary_value(accuracy, dp_results) - should_update = ( - best_value == float("-inf") - or current_value > best_value + tol - or (abs(current_value - best_value) <= tol and latency < best_latency) - ) - if should_update: - best_value, best_latency, best_combo = current_value, latency, dict(combo) - no_improve_count = 0 - else: - no_improve_count += 1 - - if no_improve_count >= self.patience: - print( - f" No improvement for {self.patience} iterations. " - f"Converged at iteration {iteration + 1}." - ) - break - - neighbors = self._get_neighbors(combo, seen, accuracy) - if not neighbors: - print(f" No improving moves at iteration {iteration + 1}. Stopping.") - break - - batch_size = len(self.dataset) - n_combo_nb, dp_concurrent_nb = self._compute_concurrency( - max_concurrent, batch_size - ) - neighbor_sem = asyncio.Semaphore(n_combo_nb) - - async def _eval_neighbor_throttled( - n: Dict[str, ModelCandidate], - ) -> Tuple[str, float, float, Dict[str, int], Dict[str, int], List, bool]: - async with neighbor_sem: - return await self._evaluate_cached_async( - n, max_concurrent=dp_concurrent_nb - ) - - eval_results = await asyncio.gather( - *(_eval_neighbor_throttled(n) for n in neighbors) - ) - best_neighbor = self._pick_best_neighbor( - eval_results, neighbors, seen, current_value, latency, tol - ) - if best_neighbor is None: - print( - f" No neighbor in batch of {len(neighbors)} improves at " - f"iteration {iteration + 1}. Stopping." - ) - break - - combo = dict(best_neighbor) - - return best_combo, best_value, best_latency, results - - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ - - def _run_selection( - self, parallel: bool = False, max_concurrent: int = 20, - ) -> SelectionResults: - if parallel: - return asyncio.run(self._run_selection_async(max_concurrent)) - return self._run_selection_sequential() - - def _run_selection_sequential(self) -> SelectionResults: - all_results: List[ModelResult] = [] - global_best_combo: Optional[Dict[str, ModelCandidate]] = None - global_best_value = float("-inf") - global_best_latency = float("inf") - tol = 1e-9 - - print(f"\n{'=' * 60}") - print( - f"Hill climbing (sequential): {self.num_restarts} restart(s), " - f"max {self.max_iterations} iterations each, patience {self.patience}" - ) - print(f"{'=' * 60}\n") - - seen: Set[str] = set() - for restart in range(self.num_restarts): - print(f"--- Restart {restart + 1}/{self.num_restarts} ---") - result = self._hill_climb_once_sequential(seen) - if result is None: - print(" All combinations exhausted. Stopping.\n") - break - best_combo, best_val, best_lat, run_results = result - all_results.extend(run_results) - - if ( - global_best_combo is None - or best_val > global_best_value + tol - or ( - abs(best_val - global_best_value) <= tol - and best_lat < global_best_latency - ) - ): - global_best_value = best_val - global_best_latency = best_lat - global_best_combo = best_combo - - return self._hc_finalize(all_results, global_best_combo, global_best_value) - - async def _run_selection_async(self, max_concurrent: int = 20,) -> SelectionResults: - all_results: List[ModelResult] = [] - global_best_combo: Optional[Dict[str, ModelCandidate]] = None - global_best_value = float("-inf") - global_best_latency = float("inf") - tol = 1e-9 - - print(f"\n{'=' * 60}") - print( - f"Hill climbing (parallel): {self.num_restarts} restart(s), " - f"max {self.max_iterations} iterations each, patience {self.patience}" - ) - print(f"{'=' * 60}\n") - - seen: Set[str] = set() - for restart in range(self.num_restarts): - print(f"--- Restart {restart + 1}/{self.num_restarts} ---") - result = await self._hill_climb_once_async( - seen, max_concurrent=max_concurrent - ) - if result is None: - print(" All combinations exhausted. Stopping.\n") - break - best_combo, best_val, best_lat, run_results = result - all_results.extend(run_results) - - if ( - global_best_combo is None - or best_val > global_best_value + tol - or ( - abs(best_val - global_best_value) <= tol - and best_lat < global_best_latency - ) - ): - global_best_value = best_val - global_best_latency = best_lat - global_best_combo = best_combo - - return self._hc_finalize(all_results, global_best_combo, global_best_value) diff --git a/src/agentopt/model_selection/lm_proposal.py b/src/agentopt/model_selection/lm_proposal.py deleted file mode 100644 index addf44d..0000000 --- a/src/agentopt/model_selection/lm_proposal.py +++ /dev/null @@ -1,314 +0,0 @@ -""" -LLM-proposal model selector. - -This selector asks a proposer LLM to suggest the single best model combination -for a multi-node agent, using node descriptions, model prices, and a dataset -preview to inform its recommendation. The proposed combination is returned -directly without evaluation. -""" - -from __future__ import annotations - -import json -import logging -from typing import Any, Callable, Dict, List, Optional, Tuple - -from pydantic import BaseModel, Field, ValidationError - -from ..base_models import Dataset, EvalFn, ModelCandidate -from ..model_price import get_model_price -from .base import BaseModelSelector, SelectionResults - -logger = logging.getLogger(__name__) - - -class ProposalResponse(BaseModel): - """Expected JSON response from the proposer LLM.""" - - combination: Dict[str, str] = Field( - description="Mapping of node name to selected model name.", - ) - reasoning: str = Field( - default="", description="Brief explanation of why this combination was chosen.", - ) - - -class LMProposalModelSelector(BaseModelSelector): - """Model selector where an LLM proposes the single best combination.""" - - def __init__( - self, - agent: Any = None, - models: Dict[str, List[ModelCandidate]] = None, - eval_fn: EvalFn = None, - dataset: Dataset = None, - proposer_model: str = "gpt-4.1", - proposer_client: Any = None, - objective: str = "maximize accuracy and then minimize latency and cost", - dataset_preview_size: int = 10, - model_prices: Optional[Dict[str, Dict[str, float]]] = None, - node_descriptions: Optional[Dict[str, str]] = None, - lambda_cost: float = 0.0, - lambda_latency: float = 0.0, - ) -> None: - super().__init__( - agent=agent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - model_prices=model_prices, - node_descriptions=node_descriptions, - lambda_cost=lambda_cost, - lambda_latency=lambda_latency, - ) - if dataset_preview_size < 1: - raise ValueError("dataset_preview_size must be >= 1.") - - self.proposer_model = proposer_model - self.objective = objective - self.dataset_preview_size = dataset_preview_size - - # Build label→index lookup per node for parsing LLM responses. - self._label_to_index: Dict[str, Dict[str, int]] = {} - for node in self._node_names: - self._label_to_index[node] = { - self._candidate_label(c): idx - for idx, c in enumerate(self._models[node]) - } - - if proposer_client is None: - try: - from openai import OpenAI - except ImportError as e: - raise ImportError( - "LMProposalModelSelector requires `openai` unless proposer_client is supplied. " - "Install with: pip install openai" - ) from e - proposer_client = OpenAI() - self.proposer_client = proposer_client - - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ - - def _run_selection( - self, parallel: bool = False, max_concurrent: int = 20, - ) -> SelectionResults: - if parallel: - logger.warning( - "LMProposalModelSelector received parallel=True, but only a single " - "combination is evaluated; proceeding with sequential evaluation." - ) - combo_idx = self._ask_proposer() - if combo_idx is None: - combo_idx = tuple(0 for _ in self._node_names) - - combo = self._index_combo_to_combo(combo_idx) - combo_name = self._combo_name(combo) - - print(f"\n{'='*60}") - print(f"LM proposal: evaluating proposed combination") - print(f"{'='*60}\n") - print(f" [1/1] Evaluating: {combo_name}") - - try: - scores, latencies, dp_ids = self._evaluate_combo( - combo, self.dataset, label=combo_name - ) - result = self._build_combo_result( - combo_name, scores, latencies, dp_ids, is_best=True, - ) - print(f" {result}") - except Exception as e: - print(f" [{combo_name}] failed: {e}") - result = self._make_result( - model_name=combo_name, - accuracy=0.0, - latency_seconds=0.0, - input_tokens={}, - output_tokens={}, - attribute="combination", - is_best=True, - ) - - results = [result] - self._finalize_combined_objectives(results) - return SelectionResults(results=results) - - # ------------------------------------------------------------------ - # Prompt construction - # ------------------------------------------------------------------ - - @staticmethod - def _safe_json(value: Any) -> Any: - try: - json.dumps(value) - return value - except TypeError: - return str(value) - - def _dataset_preview(self) -> List[Dict[str, Any]]: - preview: List[Dict[str, Any]] = [] - for input_data, expected in list(self.dataset)[: self.dataset_preview_size]: - preview.append( - {"input": self._safe_json(input_data), "expected": str(expected)} - ) - return preview - - def _index_combo_to_combo( - self, idx_combo: Tuple[int, ...], - ) -> Dict[str, ModelCandidate]: - return { - node: self._models[node][idx] - for node, idx in zip(self._node_names, idx_combo) - } - - def _build_prompt(self, preview: List[Dict[str, Any]]) -> str: - # -- Build nodes info ------------------------------------------------ - nodes_info = [] - for node in self._node_names: - node_entry: Dict[str, Any] = {"node_name": node} - if self.node_descriptions and node in self.node_descriptions: - node_entry["description"] = self.node_descriptions[node] - - candidates = [] - for c in self._models[node]: - label = self._candidate_label(c) - candidate_entry: Dict[str, Any] = {"name": label} - price = get_model_price(label, custom_prices=self._custom_prices) - if price is not None: - candidate_entry["input_price_per_mtok"] = price[0] - candidate_entry["output_price_per_mtok"] = price[1] - candidates.append(candidate_entry) - node_entry["candidates"] = candidates - nodes_info.append(node_entry) - - # -- Build response example ------------------------------------------ - example = { - "combination": { - node: self._candidate_label(self._models[node][0]) - for node in self._node_names - }, - "reasoning": "Your explanation here.", - } - - # -- Assemble prompt ------------------------------------------------- - sections = [ - # Role & Task - "# Task\n" - "You are an expert AI model selector. You will be given a multi-agent " - "workflow where each node can use one of several candidate LLMs. " - "Your job is to select the best combination of models for the nodes.\n", - # Objective - "# The objective to target when selecting the model combination:\n" - f"{self.objective}\n", - # Agent Pipeline - "# Agent Pipeline\n" - "The agent has the following nodes and each can be assigned one of its candidate models.\n" - f"```json\n{json.dumps(nodes_info, indent=2, ensure_ascii=True)}\n```\n", - # Dataset Preview - "# Dataset Preview\n" - "Below are sample inputs and their expected outputs. Use these to understand " - "the task complexity and choose models accordingly.\n" - f"```json\n{json.dumps(preview, indent=2, ensure_ascii=True)}\n```\n", - # Response Format - "# Response Format\n" - "Respond with a JSON object like this example:\n" - f"```json\n{json.dumps(example, ensure_ascii=True)}\n```\n", - # Constraints - "# Constraints\n" - "- Each key in `combination` must be a node name from the pipeline above.\n" - "- Each value must be a candidate model name from that node's candidates list.\n" - "- All nodes must be included.\n" - "- Return exactly one combination.\n", - ] - - prompt = "\n".join(sections) - - return prompt - - # ------------------------------------------------------------------ - # Parsing & proposer - # ------------------------------------------------------------------ - - def _parse_proposed_combination(self, text: str,) -> Optional[Tuple[int, ...]]: - if not text.strip(): - return None - try: - payload = json.loads(text) - except json.JSONDecodeError: - logger.warning( - "LMProposalModelSelector: proposer returned non-JSON output." - ) - return None - - try: - response = ProposalResponse.model_validate(payload) - except ValidationError as e: - logger.warning("LMProposalModelSelector: invalid response structure: %s", e) - return None - - if set(response.combination.keys()) != set(self._node_names): - logger.warning( - "LMProposalModelSelector: response nodes don't match pipeline nodes." - ) - return None - - indices: List[int] = [] - for node in self._node_names: - model_name = response.combination[node] - lookup = self._label_to_index.get(node, {}) - if model_name not in lookup: - logger.warning( - "LMProposalModelSelector: unknown model '%s' for node '%s'.", - model_name, - node, - ) - return None - indices.append(lookup[model_name]) - - return tuple(indices) - - def _ask_proposer(self, max_retries: int = 3) -> Optional[Tuple[int, ...]]: - preview = self._dataset_preview() - prompt = self._build_prompt(preview) - messages = [ - { - "role": "system", - "content": ( - "You are an expert model-selection assistant. " - "Analyze the agent pipeline, candidate models, and dataset, " - "then return a single JSON object with your recommended " - "model combination." - ), - }, - {"role": "user", "content": prompt}, - ] - - for attempt in range(1, max_retries + 1): - try: - response = self.proposer_client.chat.completions.create( - model=self.proposer_model, - temperature=0.0, - response_format={"type": "json_object"}, - messages=messages, - ) - raw = response.choices[0].message.content or "" - proposed = self._parse_proposed_combination(raw) - if proposed is not None: - return proposed - logger.warning( - "LM proposer attempt %d/%d: invalid response, retrying...", - attempt, - max_retries, - ) - except Exception as e: - logger.warning( - "LM proposer attempt %d/%d failed: %s", attempt, max_retries, e, - ) - - logger.warning( - "LM proposer exhausted all %d retries; falling back to defaults.", - max_retries, - ) - return None diff --git a/src/agentopt/model_selection/matrix_ucb.py b/src/agentopt/model_selection/matrix_ucb.py index b6f45b8..5955560 100644 --- a/src/agentopt/model_selection/matrix_ucb.py +++ b/src/agentopt/model_selection/matrix_ucb.py @@ -39,7 +39,7 @@ def _resolve_observation_budget_fraction( observation_budget_fraction: float, sample_fraction: Optional[float], ) -> float: - """Match RandomSearch/Bayesian: ``sample_fraction`` overrides ``observation_budget_fraction``.""" + """Match Bayesian: ``sample_fraction`` overrides ``observation_budget_fraction``.""" if sample_fraction is not None: s = float(sample_fraction) if not 0 < s <= 1: @@ -220,7 +220,7 @@ class MatrixUCBModelSelector(BaseModelSelector): ``select_best(..., max_concurrent=...)`` matters (``parallel`` is ignored). ``observation_budget_fraction`` or, equivalently, ``sample_fraction`` (same meaning - as in :class:`RandomSearchModelSelector` / Bayesian: fraction of the search budget — + as in Bayesian optimization: fraction of the search budget — here, **fraction of matrix cells** to observe) caps evaluations. ``1.0`` fills the full grid; ``0.1`` stops after about 10% of cells. If both are passed, ``sample_fraction`` wins. diff --git a/src/agentopt/model_selection/matrix_ucb_factorization.py b/src/agentopt/model_selection/matrix_ucb_factorization.py deleted file mode 100644 index 54b91a2..0000000 --- a/src/agentopt/model_selection/matrix_ucb_factorization.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) 2025 Jin Peng Zhou, Christian K. Belardi, Ruihan Wu -# SPDX-License-Identifier: MIT -# Adapted from https://github.com/kilian-group/banditeval (banditeval/factorization.py) - -import torch -from einops import rearrange - - -class Factorization(torch.nn.Module): - r"""Low-rank factorization ensemble :math:`X \approx UV^\top` with ALS. - - * :math:`X` has shape (**combinations** × **datapoints**), matching banditeval’s - “methods × examples” layout. - """ - - def __init__( - self, - n_combos: int, - n_datapoints: int, - rank: int, - ensemble_size: int, - regularizer_weight: float = 0.00, - drop_probability: float = 0.05, - ) -> None: - super().__init__() - self.register_buffer("U", torch.randn(ensemble_size, n_combos, rank)) - self.register_buffer("V", torch.randn(ensemble_size, n_datapoints, rank)) - self.register_buffer("L", regularizer_weight * torch.eye(rank)) - - self.n_combos = n_combos - self.n_datapoints = n_datapoints - self.rank = rank - self.ensemble_size = ensemble_size - self.regularizer_weight = regularizer_weight - self.drop_probability = drop_probability - - def forward(self) -> torch.Tensor: - return torch.bmm(self.U, self.V.transpose(1, 2)) - - def _als_step( - self, data_matrix: torch.Tensor, fixed_matrix: torch.Tensor - ) -> torch.Tensor: - non_zero_mask = (~torch.isnan(data_matrix)).float() - y = fixed_matrix.unsqueeze(2) - y_t = y.transpose(1, 2) - A = (non_zero_mask.unsqueeze(2) * torch.bmm(y, y_t)).sum(0) + self.L - b = (torch.nan_to_num(data_matrix * non_zero_mask) * y.squeeze(2)).sum(0) - return torch.linalg.solve(A, b) - - def fit(self, X: torch.Tensor, iterations: int = 10) -> None: - # X: (combinations, datapoints); einops axes e, m, n = ensemble, combo, datapoint - X = X.unsqueeze(0).repeat(self.ensemble_size, 1, 1) - if self.drop_probability > 0: - mask = torch.rand_like(X) < self.drop_probability - X[mask] = torch.nan - X_u = rearrange(X, "e combo dp -> (e combo) dp 1") - X_v = rearrange(X, "e combo dp -> (e dp) combo 1") - vmap_als_step = torch.vmap(self._als_step, in_dims=(0, 0)) - for _ in range(iterations): - self.V.data = ( - vmap_als_step(X_v, self.U.repeat(self.n_datapoints, 1, 1)) - ).reshape(self.ensemble_size, self.n_datapoints, self.rank) - self.U.data = ( - vmap_als_step(X_u, self.V.repeat(self.n_combos, 1, 1)) - ).reshape(self.ensemble_size, self.n_combos, self.rank) - - def reset(self) -> None: - torch.nn.init.normal_(self.U) - torch.nn.init.normal_(self.V) diff --git a/src/agentopt/model_selection/random_search.py b/src/agentopt/model_selection/random_search.py deleted file mode 100644 index 59b22ba..0000000 --- a/src/agentopt/model_selection/random_search.py +++ /dev/null @@ -1,197 +0,0 @@ -""" -Random-search model selection: evaluates a random subset of the Cartesian -product of candidate models across all nodes. -""" - -import asyncio -import logging -import math -import random -from typing import Any, Callable, Dict, List, Optional, Tuple - -from ..base_models import Dataset, EvalFn, ModelCandidate -from .base import BaseModelSelector, ModelResult, SelectionResults - -logger = logging.getLogger(__name__) - - -class RandomSearchModelSelector(BaseModelSelector): - """ - Selects the best model combination from a random subset of candidates. - """ - - def __init__( - self, - agent: Any = None, - models: Dict[str, List[ModelCandidate]] = None, - eval_fn: EvalFn = None, - dataset: Dataset = None, - sample_fraction: float = 0.25, - seed: Optional[int] = None, - model_prices: Optional[Dict[str, Dict[str, float]]] = None, - tracker=None, - lambda_cost: float = 0.0, - lambda_latency: float = 0.0, - ) -> None: - super().__init__( - agent=agent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - model_prices=model_prices, - tracker=tracker, - lambda_cost=lambda_cost, - lambda_latency=lambda_latency, - ) - if not 0 < sample_fraction <= 1: - raise ValueError("sample_fraction must be in the range (0, 1].") - self.sample_fraction = sample_fraction - self.seed = seed - - def _run_selection( - self, parallel: bool = False, max_concurrent: int = 20, - ) -> SelectionResults: - if parallel: - return asyncio.run(self._select_async(max_concurrent)) - return self._select_sequential() - - def _get_sampled_combinations( - self, - ) -> Tuple[List[Dict[str, ModelCandidate]], List[Dict[str, ModelCandidate]]]: - """Return (all_combos, sampled_combos).""" - all_combos = self._all_combos() - total = len(all_combos) - sample_size = max(1, math.ceil(total * self.sample_fraction)) - sample_size = min(sample_size, total) - - if sample_size == total: - return all_combos, all_combos - - rng = random.Random(self.seed) - sampled_indices = sorted(rng.sample(range(total), sample_size)) - sampled = [all_combos[i] for i in sampled_indices] - return all_combos, sampled - - def _select_sequential(self) -> SelectionResults: - all_combos, sampled = self._get_sampled_combinations() - - all_results: List[ModelResult] = [] - - print(f"\n{'='*60}") - print( - f"Random search (sequential): " - f"{len(sampled)}/{len(all_combos)} combinations " - f"({self.sample_fraction:.1%} sample)" - ) - print(f"{'='*60}\n") - - for idx, combo in enumerate(sampled, 1): - combo_name = self._combo_name(combo) - print(f" [{idx}/{len(sampled)}] Evaluating: {combo_name}") - - try: - scores, latencies, dp_ids = self._evaluate_combo( - combo, self.dataset, label=combo_name - ) - result = self._build_combo_result( - combo_name, scores, latencies, dp_ids, - ) - print(f" {result}") - all_results.append(result) - - except Exception as e: - print(f" [{combo_name}] failed: {e}") - all_results.append( - self._make_result( - model_name=combo_name, - accuracy=0.0, - latency_seconds=0.0, - input_tokens={}, - output_tokens={}, - attribute="combination", - is_best=False, - ) - ) - - self._finalize_combined_objectives(all_results) - best_info = self._find_best(all_results) - if best_info is not None: - best_name, _ = best_info - for result in all_results: - if result.model_name == best_name: - result.is_best = True - break - else: - print("\n No sampled combinations succeeded") - - results = SelectionResults(results=all_results) - return results - - async def _select_async(self, max_concurrent: int = 20) -> SelectionResults: - all_combos, sampled = self._get_sampled_combinations() - - batch_size = len(self.dataset) - n_combo, dp_concurrent = self._compute_concurrency(max_concurrent, batch_size) - combo_sem = asyncio.Semaphore(n_combo) - - print(f"\n{'='*60}") - print( - f"Random search (async): " - f"{len(sampled)}/{len(all_combos)} combinations " - f"({self.sample_fraction:.1%} sample), " - f"max {max_concurrent} total concurrent" - ) - print(f"{'='*60}\n") - - async def _eval_combo( - combo: Dict[str, ModelCandidate], - ) -> Tuple[str, ModelResult]: - async with combo_sem: - combo_name = self._combo_name(combo) - print(f" Evaluating: {combo_name}") - scores, latencies, dp_ids = await self._evaluate_combo_async( - combo, self.dataset, label=combo_name, max_concurrent=dp_concurrent - ) - result = self._build_combo_result( - combo_name, scores, latencies, dp_ids, - ) - print(f" {result}") - return combo_name, result - - combo_results = await asyncio.gather( - *[_eval_combo(c) for c in sampled], return_exceptions=True, - ) - - all_results: List[ModelResult] = [] - for i, res in enumerate(combo_results): - if isinstance(res, Exception): - combo_name = self._combo_name(sampled[i]) - print(f" [{combo_name}] failed: {res}") - all_results.append( - self._make_result( - model_name=combo_name, - accuracy=0.0, - latency_seconds=0.0, - input_tokens={}, - output_tokens={}, - attribute="combination", - is_best=False, - ) - ) - else: - _, result = res - all_results.append(result) - - self._finalize_combined_objectives(all_results) - best_info = self._find_best(all_results) - if best_info is not None: - best_name, _ = best_info - for r in all_results: - if r.model_name == best_name: - r.is_best = True - break - else: - print("\n No sampled combinations succeeded") - - results = SelectionResults(results=all_results) - return results diff --git a/src/agentopt/model_selection/threshold_successive_elimination.py b/src/agentopt/model_selection/threshold_successive_elimination.py deleted file mode 100644 index 5c60d48..0000000 --- a/src/agentopt/model_selection/threshold_successive_elimination.py +++ /dev/null @@ -1,416 +0,0 @@ -""" -Threshold-bandit successive elimination model selector. - -Classifies combinations as above/below a user-provided threshold. -""" - -import asyncio -import logging -import math -from typing import Any, Callable, Dict, List, Optional, Set, Tuple - -from ..base_models import Dataset, EvalFn, ModelCandidate -from .base import BaseModelSelector, ModelResult, SelectionResults - -logger = logging.getLogger(__name__) - - -class ThresholdBanditSEModelSelector(BaseModelSelector): - """Select models via threshold-based successive elimination.""" - - def __init__( - self, - agent: Any = None, - models: Dict[str, List[ModelCandidate]] = None, - eval_fn: EvalFn = None, - dataset: Dataset = None, - threshold: float = 0.75, - n_initial: Optional[int] = None, - confidence: float = 1.0, - model_prices: Optional[Dict[str, Dict[str, float]]] = None, - tracker=None, - lambda_cost: float = 0.0, - lambda_latency: float = 0.0, - ) -> None: - super().__init__( - agent=agent, - models=models, - eval_fn=eval_fn, - dataset=dataset, - model_prices=model_prices, - tracker=tracker, - lambda_cost=lambda_cost, - lambda_latency=lambda_latency, - ) - n = len(self.dataset) - if n_initial is None: - self.n_initial = max(1, n // 10) - else: - self.n_initial = n_initial - self.confidence = confidence - self.threshold = threshold - - def _run_selection( - self, parallel: bool = False, max_concurrent: int = 20, - ) -> SelectionResults: - if parallel: - return asyncio.run(self._select_async(max_concurrent)) - return self._select_sequential() - - def _arm_objectives( - self, - idx: int, - combo_scores: Dict[int, List[float]], - combo_latencies: Dict[int, List[float]], - combo_costs: Dict[int, List[Optional[float]]], - ) -> List[float]: - return self._compute_objectives( - combo_scores[idx], combo_latencies[idx], combo_costs[idx] - ) - - def _select_sequential(self) -> SelectionResults: - all_combos = self._all_combos() - dataset_list = list(self.dataset) - n_total = len(dataset_list) - - combo_scores: Dict[int, List[float]] = {i: [] for i in range(len(all_combos))} - combo_latencies: Dict[int, List[float]] = { - i: [] for i in range(len(all_combos)) - } - combo_costs: Dict[int, List[Optional[float]]] = { - i: [] for i in range(len(all_combos)) - } - combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(len(all_combos))} - active: Set[int] = set(range(len(all_combos))) - - print(f"\n{'='*60}") - print( - f"Threshold successive elimination (sequential): {len(all_combos)} " - f"combinations, {n_total} samples, threshold={self.threshold}" - ) - print(f"{'='*60}") - - offset = 0 - init_batch_size = min(self.n_initial, n_total) - assert init_batch_size > 0 - init_batch = dataset_list[offset : offset + init_batch_size] - print( - f"\nInitial round [samples {offset}-{offset + init_batch_size}, " - f"{len(active)} active]:" - ) - for idx in sorted(active): - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = self._evaluate_combo( - combo, init_batch, label=combo_name, dp_offset=offset - ) - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - mu, lcb, ucb = self._confidence_bounds(objs) - print( - f" {combo_name}: mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] " - f"(n={len(objs)})" - ) - offset += init_batch_size - - round_num = 1 - - while active and offset < n_total: - batch = [dataset_list[offset]] - offset += 1 - - print( - f"\nRound {round_num} [sample {offset}/{n_total}, " - f"{len(active)} active]:" - ) - - for idx in sorted(active): - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = self._evaluate_combo( - combo, batch, label=combo_name, dp_offset=offset - 1 - ) - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - mu, lcb, ucb = self._confidence_bounds(objs) - print( - f" {combo_name}: mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] " - f"(n={len(objs)})" - ) - - newly_eliminated: Set[int] = set() - for idx in active: - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - _, lcb, ucb = self._confidence_bounds(objs) - # Classified wrt threshold, so this arm is no longer ambiguous. - if ucb < self.threshold or lcb > self.threshold: - newly_eliminated.add(idx) - - if newly_eliminated: - for idx in sorted(newly_eliminated): - combo_name = self._combo_name(all_combos[idx]) - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - _, lcb, ucb = self._confidence_bounds(objs) - side = "below" if ucb < self.threshold else "above" - print( - f" Eliminated: {combo_name} " - f"(classified {side} threshold, " - f"CI=[{lcb:.3f}, {ucb:.3f}])" - ) - active -= newly_eliminated - print(f" Ambiguous survivors: {len(active)} / {len(all_combos)}") - else: - print( - f" No eliminations. Ambiguous survivors: " - f"{len(active)} / {len(all_combos)}" - ) - - if not active: - break - round_num += 1 - - return self._build_results( - all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids - ) - - async def _select_async(self, max_concurrent: int = 20) -> SelectionResults: - all_combos = self._all_combos() - dataset_list = list(self.dataset) - n_total = len(dataset_list) - - combo_scores: Dict[int, List[float]] = {i: [] for i in range(len(all_combos))} - combo_latencies: Dict[int, List[float]] = { - i: [] for i in range(len(all_combos)) - } - combo_costs: Dict[int, List[Optional[float]]] = { - i: [] for i in range(len(all_combos)) - } - combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(len(all_combos))} - active: Set[int] = set(range(len(all_combos))) - - print(f"\n{'='*60}") - print( - f"Threshold successive elimination (async): {len(all_combos)} " - f"combinations, {n_total} samples, threshold={self.threshold}, " - f"max {max_concurrent} total concurrent" - ) - print(f"{'='*60}") - - offset = 0 - init_batch_size = min(self.n_initial, n_total) - assert init_batch_size > 0 - init_batch = dataset_list[offset : offset + init_batch_size] - n_combo_init, dp_concurrent_init = self._compute_concurrency( - max_concurrent, init_batch_size - ) - init_combo_sem = asyncio.Semaphore(n_combo_init) - print( - f"\nInitial round [samples {offset}-{offset + init_batch_size}, " - f"{len(active)} active]:" - ) - - async def _eval_initial( - idx: int, - ) -> Tuple[int, List[float], List[float], List[str]]: - async with init_combo_sem: - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = await self._evaluate_combo_async( - combo, - init_batch, - label=combo_name, - max_concurrent=dp_concurrent_init, - dp_offset=offset, - ) - return idx, scores, latencies, dp_ids - - init_results = await asyncio.gather( - *[_eval_initial(idx) for idx in sorted(active)], return_exceptions=True, - ) - - for res in init_results: - if isinstance(res, Exception): - logger.warning("Initial batch evaluation error: %s", res) - continue - idx, scores, latencies, dp_ids = res - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - mu, lcb, ucb = self._confidence_bounds(objs) - print( - f" {self._combo_name(all_combos[idx])}: " - f"mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] " - f"(n={len(objs)})" - ) - offset += init_batch_size - - round_num = 1 - # Per-round batch_size is always 1, so compute once - n_combo_round, dp_concurrent_round = self._compute_concurrency( - max_concurrent, 1 - ) - round_combo_sem = asyncio.Semaphore(n_combo_round) - - while active and offset < n_total: - batch = [dataset_list[offset]] - offset += 1 - - print( - f"\nRound {round_num} [sample {offset}/{n_total}, " - f"{len(active)} active]:" - ) - - async def _eval_batch( - idx: int, - ) -> Tuple[int, List[float], List[float], List[str]]: - async with round_combo_sem: - combo = all_combos[idx] - combo_name = self._combo_name(combo) - scores, latencies, dp_ids = await self._evaluate_combo_async( - combo, - batch, - label=combo_name, - max_concurrent=dp_concurrent_round, - dp_offset=offset - 1, - ) - return idx, scores, latencies, dp_ids - - round_results = await asyncio.gather( - *[_eval_batch(idx) for idx in sorted(active)], return_exceptions=True, - ) - - for res in round_results: - if isinstance(res, Exception): - logger.warning("Batch evaluation error: %s", res) - continue - idx, scores, latencies, dp_ids = res - costs = self._observe_combo(scores, latencies, dp_ids) - combo_scores[idx].extend(scores) - combo_latencies[idx].extend(latencies) - combo_costs[idx].extend(costs) - combo_dp_ids[idx].extend(dp_ids) - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - mu, lcb, ucb = self._confidence_bounds(objs) - print( - f" {self._combo_name(all_combos[idx])}: " - f"mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] " - f"(n={len(objs)})" - ) - - newly_eliminated: Set[int] = set() - for idx in active: - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - _, lcb, ucb = self._confidence_bounds(objs) - if ucb < self.threshold or lcb > self.threshold: - newly_eliminated.add(idx) - - if newly_eliminated: - for idx in sorted(newly_eliminated): - combo_name = self._combo_name(all_combos[idx]) - objs = self._arm_objectives( - idx, combo_scores, combo_latencies, combo_costs - ) - _, lcb, ucb = self._confidence_bounds(objs) - side = "below" if ucb < self.threshold else "above" - print( - f" Eliminated: {combo_name} " - f"(classified {side} threshold, " - f"CI=[{lcb:.3f}, {ucb:.3f}])" - ) - active -= newly_eliminated - print(f" Ambiguous survivors: {len(active)} / {len(all_combos)}") - else: - print( - f" No eliminations. Ambiguous survivors: " - f"{len(active)} / {len(all_combos)}" - ) - - if not active: - break - round_num += 1 - - return self._build_results( - all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids - ) - - def _confidence_bounds(self, values: List[float]) -> Tuple[float, float, float]: - n = len(values) - if n == 0: - return 0.0, float("-inf"), float("inf") - mu, std = self._compute_stats(values) - se = std / math.sqrt(n) - radius = self.confidence * se - return mu, mu - radius, mu + radius - - def _build_results( - self, - all_combos: List[Dict[str, ModelCandidate]], - combo_scores: Dict[int, List[float]], - combo_latencies: Dict[int, List[float]], - combo_costs: Dict[int, List[Optional[float]]], - combo_dp_ids: Dict[int, List[str]], - ) -> SelectionResults: - all_results: List[ModelResult] = [] - for idx, combo in enumerate(all_combos): - combo_name = self._combo_name(combo) - scores = combo_scores[idx] - if scores: - all_results.append( - self._build_combo_result( - combo_name, - scores, - combo_latencies[idx], - combo_dp_ids[idx], - costs=combo_costs[idx], - ) - ) - else: - all_results.append( - self._make_result( - model_name=combo_name, - accuracy=0.0, - latency_seconds=0.0, - input_tokens={}, - output_tokens={}, - attribute="combination", - is_best=False, - ) - ) - - self._finalize_combined_objectives(all_results) - best_info = self._find_best(all_results) - if best_info is not None: - best_name, _ = best_info - for result in all_results: - if result.model_name == best_name: - result.is_best = True - break - else: - print("\n No combinations succeeded.") - - return SelectionResults(results=all_results) From bb593342802879efc440b9abbe457af02c27c6f1 Mon Sep 17 00:00:00 2001 From: Qian Xie <90579251+JaneQianXie@users.noreply.github.com> Date: Mon, 22 Jun 2026 22:08:34 +0800 Subject: [PATCH 2/2] remove underperformed old algorithms --- .codex/skills/agentopt/SKILL.md | 7 +------ README.md | 11 ++--------- docs/api/results.md | 2 +- docs/benchmark-results/index.md | 20 -------------------- docs/blog/posts/technical-deep-dive.md | 23 +---------------------- docs/concepts/parallelism.md | 7 ++----- docs/getting-started/quickstart.md | 2 +- docs/index.md | 8 ++------ examples/selection/README.md | 2 +- src/agentopt/model_topology.py | 4 ++-- 10 files changed, 13 insertions(+), 73 deletions(-) diff --git a/.codex/skills/agentopt/SKILL.md b/.codex/skills/agentopt/SKILL.md index 3445aa7..ddf3267 100644 --- a/.codex/skills/agentopt/SKILL.md +++ b/.codex/skills/agentopt/SKILL.md @@ -218,16 +218,11 @@ Candidates may be model-name strings or framework-specific LLM instances, as lon | `auto` | **Default.** Aliases `arm_elimination`. Start here. | — | | `arm_elimination` | Best-arm identification with statistical elimination. | `n_initial`, `growth_factor`, `confidence` | | `brute_force` | Small combo space; exhaustive baseline. | — | -| `random` | Cheap exploratory scan over a large space. | `sample_fraction`, `seed` | -| `hill_climbing` | Local search; combos have smooth structure. | `max_iterations`, `num_restarts`, `patience`, `seed`, `batch_size` | -| `epsilon_lucb` | Find any combo within ε of the best. | `epsilon`, `n_initial`, `confidence` | -| `threshold` | Classify combos above/below a quality bar. | `threshold`, `n_initial`, `confidence` | | `matrix_ucb` | UCB over a node × model matrix; structured spaces. | see `docs/api/selectors.md` | | `matrix_ucb_lrf` | Low-rank-factorization variant of matrix UCB. | see `docs/api/selectors.md` | -| `lm_proposal` | Ask a proposer LLM to pick a combo, then evaluate it. | `proposer_model`, `proposer_client`, `objective`, `dataset_preview_size`, `node_descriptions` | | `bayesian` | Surrogate-model-guided search over medium/large spaces. | `batch_size`, `sample_fraction` | -Other `ModelSelector` kwargs: `model_prices` (custom token pricing), `tracker` (e.g. `LLMTracker(cache_dir=...)` for disk-cache reuse), `node_descriptions` (used by `lm_proposal`), `lambda_cost` / `lambda_latency` (optional; default `0.0` — scalar combined objective `score - λ_cost·norm(cost) - λ_latency·norm(latency)`; see `docs/api/selectors.md#combined-objective-optional-costlatency-weights`). +Other `ModelSelector` kwargs: `model_prices` (custom token pricing), `tracker` (e.g. `LLMTracker(cache_dir=...)` for disk-cache reuse), `lambda_cost` / `lambda_latency` (optional; default `0.0` — scalar combined objective `score - λ_cost·norm(cost) - λ_latency·norm(latency)`; see `docs/api/selectors.md#combined-objective-optional-costlatency-weights`). ### 5. Concurrency diff --git a/README.md b/README.md index 86c3522..e2f0925 100644 --- a/README.md +++ b/README.md @@ -288,25 +288,18 @@ Working examples for the frameworks and CLI agents named above. Examples are org AgentOpt includes a rich set of selection algorithms. Advanced users may get significant speedups by choosing the right method for their use case. See the [documentation](https://agentoptimizer.github.io/agentopt/) and [advanced_algorithms.py](examples/selection/local/advanced_algorithms.py) for details. -If you do not need the strict best model combination and want **lower search cost**, `epsilon_lucb` is often a good choice: it stops once an **ε-optimal** arm is found (tune `epsilon` to trade off how close to optimal you need to be versus how many runs you spend). - | `method=` | Best for | How it works | |-----------|----------|-------------| | `"auto"` (default) | General use | Automatically finds the best combination (wired to `arm_elimination` — strong best-arm identification with lower search cost than `brute_force`) | | `"brute_force"` | Small search spaces | Evaluates all combinations | -| `"random"` | Quick exploration | Samples a random fraction | -| `"hill_climbing"` | Topology-aware search | Greedy search using model quality/speed rankings | | `"arm_elimination"` | Best-arm identification | Bandit; eliminates statistically dominated combinations | -| `"epsilon_lucb"` | Extra search cost savings when ε-optimal is enough | Bandit; stops when an epsilon-optimal best arm is identified | -| `"threshold"` | Thresholding objectives | Bandit; determines whether each combination is above/below a user-defined `threshold` on the performance metric (e.g., mean accuracy) | -| `"lm_proposal"` | LLM-guided search | Uses a proposer LLM to shortlist promising combinations | +| `"matrix_ucb"` / `"matrix_ucb_lrf"` | Large combo × datapoint grids | UCB exploration over the evaluation matrix | | `"bayesian"` | Expensive evaluations | GP-based Bayesian optimization over categorical model choices; uses correlation between combinations (requires `pip install "agentopt-py[bayesian]"`) | ```python selector = ModelSelector( agent=MyAgent, models=models, eval_fn=eval_fn, dataset=dataset, - method="epsilon_lucb", - epsilon=0.01 + method="auto", ) results = selector.select_best(parallel=True) ``` diff --git a/docs/api/results.md b/docs/api/results.md index 184baa5..2b17db9 100644 --- a/docs/api/results.md +++ b/docs/api/results.md @@ -44,7 +44,7 @@ One per evaluated combination. | `latency_seconds` | `float` | Mean latency per datapoint. | | `input_tokens` | `Dict[str, int]` | Input tokens by model. | | `output_tokens` | `Dict[str, int]` | Output tokens by model. | -| `attribute` | `str` | Metric track the result was scored under (algorithms like `threshold` produce multiple). | +| `attribute` | `str` | Metric track the result was scored under. | | `is_best` | `bool` | Whether this is the top-ranked combination. | | `datapoint_results` | `List[DatapointResult]` | Per-datapoint breakdown. | diff --git a/docs/benchmark-results/index.md b/docs/benchmark-results/index.md index e18a3d8..3172433 100644 --- a/docs/benchmark-results/index.md +++ b/docs/benchmark-results/index.md @@ -52,13 +52,8 @@ All models accessed via AWS Bedrock Application Inference Profiles (on-demand pr | Selector | Find Rate | Mean Accuracy | Evaluations | Cost | Savings | |:---------|:----------|:--------------|:------------|:-----|:--------| | Brute Force | 100% | 74.75% | 1,782 | $4.71 | -- | -| LM Proposal | 100% | 74.75% | 198 | $2.47 | 48% | -| Hill Climbing | 90% | 74.55% | 1,501 | $4.03 | 14% | | Arm Elimination | 94% | 74.10% | 666 | $3.57 | **24%** | -| Epsilon LUCB | 72% | 73.14% | 380 | $2.51 | 47% | | Bayesian Opt | 56% | 72.43% | 990 | $2.59 | 45% | -| Random Search | 36% | 68.57% | 594 | $1.73 | 63% | -| Threshold SE | 16% | 57.48% | 252 | $1.80 | 62% | ### Thinking Effort Ablation @@ -103,13 +98,8 @@ Impact of thinking/reasoning budget on GPQA accuracy for Opus (adaptive effort) | Selector | Find Rate | Mean Accuracy | Evaluations | Cost | Savings | |:---------|:----------|:--------------|:------------|:-----|:--------| | Brute Force | 100% | 70.00% | 1,800 | $84.80 | -- | -| Hill Climbing | 100% | 70.00% | 1,664 | $72.12 | 15% | -| Epsilon LUCB | 28% | 69.90% | 399 | $40.03 | 53% | | Arm Elimination | 88% | 69.37% | 912 | $74.39 | **12%** | | Bayesian Opt | 44% | 69.27% | 1,000 | $50.64 | 40% | -| Random Search | 36% | 67.13% | 600 | $31.39 | 63% | -| Threshold SE | 10% | 58.19% | 186 | $18.82 | 78% | -| LM Proposal | 0% | 44.03% | 200 | $3.39 | 96% | --- @@ -253,11 +243,6 @@ Impact of thinking/reasoning budget on GPQA accuracy for Opus (adaptive effort) | Brute Force | 100% | 74.27% | 16,168 | $51.90 | -- | | Bayesian Opt | 8% | 73.33% | 3,996 | $12.29 | 76% | | Arm Elimination | 86% | 73.19% | 4,283 | $16.92 | **67%** | -| Hill Climbing | 52% | 73.13% | 4,635 | $19.39 | 63% | -| Random Search | 30% | 72.25% | 4,192 | $13.37 | 74% | -| Epsilon LUCB | 10% | 69.71% | 478 | $1.75 | 97% | -| Threshold SE | 4% | 65.42% | 1,642 | $6.45 | 88% | -| LM Proposal | 0% | 34.13% | 200 | $1.84 | 96% | --- @@ -397,9 +382,4 @@ Impact of thinking/reasoning budget on GPQA accuracy for Opus (adaptive effort) |:---------|:----------|:--------------|:------------|:-----|:--------| | Brute Force | 100% | 98.84% | 14,961 | $123.87 | -- | | Arm Elimination | 86% | 98.83% | 3,356 | $51.86 | **58%** | -| Hill Climbing | 80% | 98.76% | 3,926 | $54.22 | 56% | -| Random Search | 28% | 98.17% | 3,880 | $31.77 | 74% | -| Epsilon LUCB | 4% | 96.99% | 447 | $6.10 | 95% | -| LM Proposal | 0% | 95.82% | 158 | $5.61 | 95% | | Bayesian Opt | 4% | 95.41% | 3,666 | $35.56 | 71% | -| Threshold SE | 0% | 74.52% | 1,355 | $6.90 | 94% | diff --git a/docs/blog/posts/technical-deep-dive.md b/docs/blog/posts/technical-deep-dive.md index 3d581b1..7776e27 100644 --- a/docs/blog/posts/technical-deep-dive.md +++ b/docs/blog/posts/technical-deep-dive.md @@ -109,24 +109,10 @@ Arm Elimination works in rounds: Bad combos get eliminated early and cheaply. Good combos earn more search budget. The search cost is far less than brute force. -### Epsilon-LUCB - -When you just need to find *the single best* combo, epsilon-LUCB (Lower/Upper Confidence Bound) is extremely sample-efficient. Each round, it compares the current leader's lower confidence bound against the best challenger's upper bound. When the gap closes below a threshold epsilon, you've found your winner with statistical confidence. - -### Threshold Successive Elimination - -When you have a minimum acceptable accuracy in mind (e.g., "I need at least 70%"), Threshold SE takes a different approach. Instead of finding the single best combo, it classifies each combo as above or below your threshold. Each round, it evaluates all surviving combos on one more datapoint and checks their confidence intervals. Once a combo's interval no longer straddles the threshold (entirely above or entirely below), it's classified and removed from the active set. This is useful when you care about filtering rather than ranking. - ### Bayesian Optimization For expensive evaluations, Bayesian Optimization builds a Gaussian Process surrogate model that predicts accuracy as a function of the model combination. It uses Expected Improvement to pick the most informative next evaluation, spending budget where uncertainty is highest and potential is greatest. -### Hill Climbing - -Hill Climbing takes a different approach: greedy local search. Start with a random model combination, then swap one model at a time, keeping each swap only if it improves accuracy. Use random restarts to avoid getting stuck in local optima. - -The catch: Hill Climbing requires **topology information**. It needs a notion of which models are "neighbors" of each other, typically an ordering by capability or cost. This lets it search intelligently (try the next-best model, not a random one), but it also means you're injecting assumptions about model quality that may not hold. As the HotpotQA results show, model capability rankings don't always predict combo performance. When the topology is misleading, Hill Climbing can get stuck exploring the wrong region of the search space. - ### How Much Do These Save? Across our four benchmarks, Arm Elimination consistently achieves near-optimal accuracy while using up to 67% less budget than brute force: @@ -169,20 +155,13 @@ We validated AgentOpt across four diverse benchmarks using 9 models on Amazon Be