diff --git a/.codex/skills/agentopt/SKILL.md b/.codex/skills/agentopt/SKILL.md
index 3445aa7..ddf3267 100644
--- a/.codex/skills/agentopt/SKILL.md
+++ b/.codex/skills/agentopt/SKILL.md
@@ -218,16 +218,11 @@ Candidates may be model-name strings or framework-specific LLM instances, as lon
| `auto` | **Default.** Aliases `arm_elimination`. Start here. | — |
| `arm_elimination` | Best-arm identification with statistical elimination. | `n_initial`, `growth_factor`, `confidence` |
| `brute_force` | Small combo space; exhaustive baseline. | — |
-| `random` | Cheap exploratory scan over a large space. | `sample_fraction`, `seed` |
-| `hill_climbing` | Local search; combos have smooth structure. | `max_iterations`, `num_restarts`, `patience`, `seed`, `batch_size` |
-| `epsilon_lucb` | Find any combo within ε of the best. | `epsilon`, `n_initial`, `confidence` |
-| `threshold` | Classify combos above/below a quality bar. | `threshold`, `n_initial`, `confidence` |
| `matrix_ucb` | UCB over a node × model matrix; structured spaces. | see `docs/api/selectors.md` |
| `matrix_ucb_lrf` | Low-rank-factorization variant of matrix UCB. | see `docs/api/selectors.md` |
-| `lm_proposal` | Ask a proposer LLM to pick a combo, then evaluate it. | `proposer_model`, `proposer_client`, `objective`, `dataset_preview_size`, `node_descriptions` |
| `bayesian` | Surrogate-model-guided search over medium/large spaces. | `batch_size`, `sample_fraction` |
-Other `ModelSelector` kwargs: `model_prices` (custom token pricing), `tracker` (e.g. `LLMTracker(cache_dir=...)` for disk-cache reuse), `node_descriptions` (used by `lm_proposal`), `lambda_cost` / `lambda_latency` (optional; default `0.0` — scalar combined objective `score - λ_cost·norm(cost) - λ_latency·norm(latency)`; see `docs/api/selectors.md#combined-objective-optional-costlatency-weights`).
+Other `ModelSelector` kwargs: `model_prices` (custom token pricing), `tracker` (e.g. `LLMTracker(cache_dir=...)` for disk-cache reuse), `lambda_cost` / `lambda_latency` (optional; default `0.0` — scalar combined objective `score - λ_cost·norm(cost) - λ_latency·norm(latency)`; see `docs/api/selectors.md#combined-objective-optional-costlatency-weights`).
### 5. Concurrency
diff --git a/README.md b/README.md
index 86c3522..e2f0925 100644
--- a/README.md
+++ b/README.md
@@ -288,25 +288,18 @@ Working examples for the frameworks and CLI agents named above. Examples are org
AgentOpt includes a rich set of selection algorithms. Advanced users may get significant speedups by choosing the right method for their use case. See the [documentation](https://agentoptimizer.github.io/agentopt/) and [advanced_algorithms.py](examples/selection/local/advanced_algorithms.py) for details.
-If you do not need the strict best model combination and want **lower search cost**, `epsilon_lucb` is often a good choice: it stops once an **ε-optimal** arm is found (tune `epsilon` to trade off how close to optimal you need to be versus how many runs you spend).
-
| `method=` | Best for | How it works |
|-----------|----------|-------------|
| `"auto"` (default) | General use | Automatically finds the best combination (wired to `arm_elimination` — strong best-arm identification with lower search cost than `brute_force`) |
| `"brute_force"` | Small search spaces | Evaluates all combinations |
-| `"random"` | Quick exploration | Samples a random fraction |
-| `"hill_climbing"` | Topology-aware search | Greedy search using model quality/speed rankings |
| `"arm_elimination"` | Best-arm identification | Bandit; eliminates statistically dominated combinations |
-| `"epsilon_lucb"` | Extra search cost savings when ε-optimal is enough | Bandit; stops when an epsilon-optimal best arm is identified |
-| `"threshold"` | Thresholding objectives | Bandit; determines whether each combination is above/below a user-defined `threshold` on the performance metric (e.g., mean accuracy) |
-| `"lm_proposal"` | LLM-guided search | Uses a proposer LLM to shortlist promising combinations |
+| `"matrix_ucb"` / `"matrix_ucb_lrf"` | Large combo × datapoint grids | UCB exploration over the evaluation matrix |
| `"bayesian"` | Expensive evaluations | GP-based Bayesian optimization over categorical model choices; uses correlation between combinations (requires `pip install "agentopt-py[bayesian]"`) |
```python
selector = ModelSelector(
agent=MyAgent, models=models, eval_fn=eval_fn, dataset=dataset,
- method="epsilon_lucb",
- epsilon=0.01
+ method="auto",
)
results = selector.select_best(parallel=True)
```
diff --git a/docs/api/results.md b/docs/api/results.md
index 184baa5..2b17db9 100644
--- a/docs/api/results.md
+++ b/docs/api/results.md
@@ -44,7 +44,7 @@ One per evaluated combination.
| `latency_seconds` | `float` | Mean latency per datapoint. |
| `input_tokens` | `Dict[str, int]` | Input tokens by model. |
| `output_tokens` | `Dict[str, int]` | Output tokens by model. |
-| `attribute` | `str` | Metric track the result was scored under (algorithms like `threshold` produce multiple). |
+| `attribute` | `str` | Metric track the result was scored under. |
| `is_best` | `bool` | Whether this is the top-ranked combination. |
| `datapoint_results` | `List[DatapointResult]` | Per-datapoint breakdown. |
diff --git a/docs/api/selectors.md b/docs/api/selectors.md
index d990203..09b007a 100644
--- a/docs/api/selectors.md
+++ b/docs/api/selectors.md
@@ -27,7 +27,6 @@ results.print_summary()
| `model_prices` | `Dict`, optional | Custom pricing overrides: `{"model": {"input_price": x, "output_price": y}}` in $/MTok. Required for cost terms when `lambda_cost > 0`. |
| `lambda_cost` | `float`, optional | Weight on **normalized** per-sample cost in the combined objective. Default `0.0` (disabled). See [Combined objective](#combined-objective-optional-costlatency-weights) below. |
| `lambda_latency` | `float`, optional | Weight on **normalized** per-sample latency in the combined objective. Default `0.0` (disabled). |
-| `node_descriptions` | `Dict[str, str]`, optional | Human-readable descriptions per node — surfaced in `LMProposalModelSelector`. |
| `tracker` | `LLMTracker`, optional | Bring your own. Defaults to a fresh `LLMTracker()` started in the constructor. Pass one in to share a cache across runs, route via a daemon (`AGENTOPT_GATEWAY_URL`), or post-process records after `select_best()` returns. |
The selector calls `tracker.start()` in the constructor and `tracker.stop()` when `select_best()` returns or raises. Record queries on the tracker remain valid after `stop()`, so post-run analysis works:
@@ -93,16 +92,12 @@ results.print_summary() # ranks by combined_objective when lambdas are set
| Methods | During search | Final `is_best` |
|:---|:---|:---|
| `matrix_ucb`, `matrix_ucb_lrf` | UCB rewards use per-cell combined objective | `_find_best` on `combined_objective` |
-| `arm_elimination`, `epsilon_lucb`, `threshold` | Elimination / LUCB stats on combined per-sample objectives | same |
-| `hill_climbing`, `bayesian` | Move / surrogate target uses combined objective | same |
-| `brute_force`, `random` | Does not steer *which* combos to try | same |
-| `lm_proposal` | Proposer uses `objective=` **text**, not these lambdas | `combined_objective` on the one evaluated combo only |
+| `arm_elimination` | Elimination stats on combined per-sample objectives | same |
+| `bayesian` | Surrogate target uses combined objective | same |
+| `brute_force` | Does not steer *which* combos to try | same |
After `select_best()`, a final pass recomputes every result’s `combined_objective` against the **full-run** normalizer so rankings are comparable.
-!!! note "`lm_proposal` vs lambdas"
- `LMProposalModelSelector(objective="...")` is a natural-language hint to the **proposer LLM**. It is separate from `lambda_cost` / `lambda_latency`, which only affect the scalar reward used for ranking and bandit methods.
-
## `select_best()`
```python
@@ -123,13 +118,8 @@ Returns a [`SelectionResults`](results.md). `parallel=True` requires `agent.run`
|:---|:---|:---|
| `"auto"` (default) | Arm elimination | Strong best-arm identification at lower search cost than brute force. Same impl as `"arm_elimination"`. |
| `"brute_force"` | Evaluate every combo on the full dataset | Small search space; ground-truth comparison. |
-| `"random"` | Random search | Cheap baseline. |
-| `"hill_climbing"` | Greedy per-node | Large combinatorial spaces with weak coupling between nodes. |
| `"arm_elimination"` | Successive elimination | Best-arm identification with PAC-style guarantees. |
-| `"epsilon_lucb"` | LUCB with tolerance | Stop once a combo is within ε of the best. |
| `"matrix_ucb"` / `"matrix_ucb_lrf"` | UCB exploiting cross-combo structure | Large model x datapoint matrices; `lrf` adds low-rank factorization. |
-| `"threshold"` | Threshold bandit successive elimination | "Find all combos above accuracy θ" rather than the single best. |
-| `"lm_proposal"` | LM-guided | Uses `node_descriptions` to propose combinations. |
| `"bayesian"` | Bayesian optimization | Optional extra: `pip install "agentopt-py[bayesian]"`. |
---
@@ -141,26 +131,11 @@ Returns a [`SelectionResults`](results.md). `parallel=True` requires `agent.run`
members: false
show_bases: false
-::: agentopt.model_selection.random_search.RandomSearchModelSelector
- options:
- members: false
- show_bases: false
-
-::: agentopt.model_selection.hill_climbing.HillClimbingModelSelector
- options:
- members: false
- show_bases: false
-
::: agentopt.model_selection.arm_elimination.ArmEliminationModelSelector
options:
members: false
show_bases: false
-::: agentopt.model_selection.epsilon_lucb.EpsilonLUCBModelSelector
- options:
- members: false
- show_bases: false
-
::: agentopt.model_selection.matrix_ucb.MatrixUCBModelSelector
options:
members: false
@@ -171,16 +146,6 @@ Returns a [`SelectionResults`](results.md). `parallel=True` requires `agent.run`
members: false
show_bases: false
-::: agentopt.model_selection.threshold_successive_elimination.ThresholdBanditSEModelSelector
- options:
- members: false
- show_bases: false
-
-::: agentopt.model_selection.lm_proposal.LMProposalModelSelector
- options:
- members: false
- show_bases: false
-
::: agentopt.model_selection.bayesian_optimization.BayesianOptimizationModelSelector
options:
members: false
diff --git a/docs/benchmark-results/index.md b/docs/benchmark-results/index.md
index e18a3d8..3172433 100644
--- a/docs/benchmark-results/index.md
+++ b/docs/benchmark-results/index.md
@@ -52,13 +52,8 @@ All models accessed via AWS Bedrock Application Inference Profiles (on-demand pr
| Selector | Find Rate | Mean Accuracy | Evaluations | Cost | Savings |
|:---------|:----------|:--------------|:------------|:-----|:--------|
| Brute Force | 100% | 74.75% | 1,782 | $4.71 | -- |
-| LM Proposal | 100% | 74.75% | 198 | $2.47 | 48% |
-| Hill Climbing | 90% | 74.55% | 1,501 | $4.03 | 14% |
| Arm Elimination | 94% | 74.10% | 666 | $3.57 | **24%** |
-| Epsilon LUCB | 72% | 73.14% | 380 | $2.51 | 47% |
| Bayesian Opt | 56% | 72.43% | 990 | $2.59 | 45% |
-| Random Search | 36% | 68.57% | 594 | $1.73 | 63% |
-| Threshold SE | 16% | 57.48% | 252 | $1.80 | 62% |
### Thinking Effort Ablation
@@ -103,13 +98,8 @@ Impact of thinking/reasoning budget on GPQA accuracy for Opus (adaptive effort)
| Selector | Find Rate | Mean Accuracy | Evaluations | Cost | Savings |
|:---------|:----------|:--------------|:------------|:-----|:--------|
| Brute Force | 100% | 70.00% | 1,800 | $84.80 | -- |
-| Hill Climbing | 100% | 70.00% | 1,664 | $72.12 | 15% |
-| Epsilon LUCB | 28% | 69.90% | 399 | $40.03 | 53% |
| Arm Elimination | 88% | 69.37% | 912 | $74.39 | **12%** |
| Bayesian Opt | 44% | 69.27% | 1,000 | $50.64 | 40% |
-| Random Search | 36% | 67.13% | 600 | $31.39 | 63% |
-| Threshold SE | 10% | 58.19% | 186 | $18.82 | 78% |
-| LM Proposal | 0% | 44.03% | 200 | $3.39 | 96% |
---
@@ -253,11 +243,6 @@ Impact of thinking/reasoning budget on GPQA accuracy for Opus (adaptive effort)
| Brute Force | 100% | 74.27% | 16,168 | $51.90 | -- |
| Bayesian Opt | 8% | 73.33% | 3,996 | $12.29 | 76% |
| Arm Elimination | 86% | 73.19% | 4,283 | $16.92 | **67%** |
-| Hill Climbing | 52% | 73.13% | 4,635 | $19.39 | 63% |
-| Random Search | 30% | 72.25% | 4,192 | $13.37 | 74% |
-| Epsilon LUCB | 10% | 69.71% | 478 | $1.75 | 97% |
-| Threshold SE | 4% | 65.42% | 1,642 | $6.45 | 88% |
-| LM Proposal | 0% | 34.13% | 200 | $1.84 | 96% |
---
@@ -397,9 +382,4 @@ Impact of thinking/reasoning budget on GPQA accuracy for Opus (adaptive effort)
|:---------|:----------|:--------------|:------------|:-----|:--------|
| Brute Force | 100% | 98.84% | 14,961 | $123.87 | -- |
| Arm Elimination | 86% | 98.83% | 3,356 | $51.86 | **58%** |
-| Hill Climbing | 80% | 98.76% | 3,926 | $54.22 | 56% |
-| Random Search | 28% | 98.17% | 3,880 | $31.77 | 74% |
-| Epsilon LUCB | 4% | 96.99% | 447 | $6.10 | 95% |
-| LM Proposal | 0% | 95.82% | 158 | $5.61 | 95% |
| Bayesian Opt | 4% | 95.41% | 3,666 | $35.56 | 71% |
-| Threshold SE | 0% | 74.52% | 1,355 | $6.90 | 94% |
diff --git a/docs/blog/posts/technical-deep-dive.md b/docs/blog/posts/technical-deep-dive.md
index 3d581b1..7776e27 100644
--- a/docs/blog/posts/technical-deep-dive.md
+++ b/docs/blog/posts/technical-deep-dive.md
@@ -109,24 +109,10 @@ Arm Elimination works in rounds:
Bad combos get eliminated early and cheaply. Good combos earn more search budget. The search cost is far less than brute force.
-### Epsilon-LUCB
-
-When you just need to find *the single best* combo, epsilon-LUCB (Lower/Upper Confidence Bound) is extremely sample-efficient. Each round, it compares the current leader's lower confidence bound against the best challenger's upper bound. When the gap closes below a threshold epsilon, you've found your winner with statistical confidence.
-
-### Threshold Successive Elimination
-
-When you have a minimum acceptable accuracy in mind (e.g., "I need at least 70%"), Threshold SE takes a different approach. Instead of finding the single best combo, it classifies each combo as above or below your threshold. Each round, it evaluates all surviving combos on one more datapoint and checks their confidence intervals. Once a combo's interval no longer straddles the threshold (entirely above or entirely below), it's classified and removed from the active set. This is useful when you care about filtering rather than ranking.
-
### Bayesian Optimization
For expensive evaluations, Bayesian Optimization builds a Gaussian Process surrogate model that predicts accuracy as a function of the model combination. It uses Expected Improvement to pick the most informative next evaluation, spending budget where uncertainty is highest and potential is greatest.
-### Hill Climbing
-
-Hill Climbing takes a different approach: greedy local search. Start with a random model combination, then swap one model at a time, keeping each swap only if it improves accuracy. Use random restarts to avoid getting stuck in local optima.
-
-The catch: Hill Climbing requires **topology information**. It needs a notion of which models are "neighbors" of each other, typically an ordering by capability or cost. This lets it search intelligently (try the next-best model, not a random one), but it also means you're injecting assumptions about model quality that may not hold. As the HotpotQA results show, model capability rankings don't always predict combo performance. When the topology is misleading, Hill Climbing can get stuck exploring the wrong region of the search space.
-
### How Much Do These Save?
Across our four benchmarks, Arm Elimination consistently achieves near-optimal accuracy while using up to 67% less budget than brute force:
@@ -169,20 +155,13 @@ We validated AgentOpt across four diverse benchmarks using 9 models on Amazon Be
| Brute Force | 74.75% / 0% | 70.00% / 0% | 74.27% / 0% | 98.84% / 0% |
| Arm Elimination | 74.10% / 24% | 69.37% / 12% | 73.19% / 67% | 98.83% / 58% |
-| Hill Climbing | 74.55% / 14% | 70.00% / 15% | 73.13% / 63% | 98.76% / 56% |
| Bayesian Opt | 72.43% / 45% | 69.27% / 40% | 73.33% / 76% | 95.41% / 71% |
-| Random Search | 68.57% / 63% | 67.13% / 63% | 72.25% / 74% | 98.17% / 74% |
-| Epsilon-LUCB | 73.14% / 47% | 69.90% / 53% | 69.71% / 97% | 96.99% / 95% |
-| Threshold SE | 57.83% / 62% | 58.19% / 78% | 65.42% / 88% | 74.52% / 94% |
-| LM Proposal | 74.75% / 48% | 44.03% / 96% | 34.13% / 97% | 95.82% / 96% |
*Format: obtained accuracy / search cost savings vs brute force. Averaged over 50 seeds. Green = within 5% of brute force accuracy AND >50% savings on that metric.*
-Arm Elimination and Hill Climbing achieve comparable mean accuracy (within 1 percentage point of brute force across all four benchmarks), with Arm Elimination offering modestly higher cost savings on average (40% vs 37%). No single selector dominates all benchmarks. Hill Climbing excels when top models are tightly clustered (BFCL), while Arm Elimination performs best when there is clear separation between the best combo and the rest (HotpotQA, MathQA). However, Hill Climbing requires a hand-crafted topology ranking of the models upfront — you need prior knowledge about model quality and speed ordering for it to search effectively. Arm Elimination is fully assumption-free: it uses only the observed evaluation data to eliminate dominated combos, making it more practical when you don't have reliable priors about model capabilities.
-
-LM Proposal (asking GPT-4.1 to predict the best combo) matches brute force on GPQA (where the answer is intuitive) but collapses to 34% on HotpotQA and 44% on BFCL. It cannot predict that Ministral outperforms Opus as a planner.
+Arm Elimination consistently achieves near-optimal accuracy while using significantly less budget than brute force across our four benchmarks. No single selector dominates all benchmarks — Arm Elimination performs best when there is clear separation between the best combo and the rest (HotpotQA, MathQA), while Bayesian Optimization can achieve high savings on large search spaces at the cost of lower find rates.
## Get Started
diff --git a/docs/concepts/algorithms.md b/docs/concepts/algorithms.md
index 40a3f6e..58858fa 100644
--- a/docs/concepts/algorithms.md
+++ b/docs/concepts/algorithms.md
@@ -1,18 +1,14 @@
# Selection Algorithms
-AgentOpt provides 8 selection algorithms. Choose based on your search space size and evaluation budget.
+AgentOpt provides 5 selection algorithms. Choose based on your search space size and evaluation budget.
## At a Glance
| Algorithm | Strategy | Evaluations | Best For |
|:----------|:---------|:------------|:---------|
| [Brute Force](#brute-force) | Exhaustive | All | Small spaces (< 50 combos) |
-| [Random Search](#random-search) | Sampling | Configurable fraction | Quick baselines |
-| [Hill Climbing](#hill-climbing) | Greedy + restarts | Guided neighbors | Medium spaces |
| [Arm Elimination](#arm-elimination) | Progressive pruning | Adaptive | Statistical early stopping |
-| [Epsilon LUCB](#epsilon-lucb) | ε-optimal LUCB | Adaptive | Cost savings when ε-optimal is enough |
-| [Threshold SE](#threshold-successive-elimination) | Threshold classification | Adaptive | Filtering above/below a performance target |
-| [LM Proposal](#lm-proposal) | LLM-guided | Shortlist | Leveraging model knowledge |
+| [Matrix UCB](#matrix-ucb) | UCB over combo × datapoint grid | Budgeted | Large spaces with selective datapoint sampling |
| [Bayesian Optimization](#bayesian-optimization) | GP surrogate | Sequential | Expensive evaluations |
!!! tip "Common interface"
@@ -55,62 +51,6 @@ selector = BruteForceModelSelector(
---
-## Random Search
-
-Samples a random fraction of all combinations.
-
-```python
-from agentopt import RandomSearchModelSelector
-
-selector = RandomSearchModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- sample_fraction=0.25, # evaluate 25% of combinations
- seed=42,
-)
-```
-
-| Parameter | Default | Description |
-|:----------|:--------|:------------|
-| `sample_fraction` | `0.25` | Fraction of combinations to evaluate |
-| `seed` | `None` | Random seed for reproducibility |
-
-!!! success "When to use"
- Quick exploration to establish a baseline before committing to a thorough search.
-
----
-
-## Hill Climbing
-
-Greedy local search with random restarts. Defines "neighbors" using model quality and speed rankings, so each step is an informed single-model swap.
-
-```python
-from agentopt import HillClimbingModelSelector
-
-selector = HillClimbingModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- max_iterations=20,
- num_restarts=3,
- patience=3,
-)
-```
-
-| Parameter | Default | Description |
-|:----------|:--------|:------------|
-| `max_iterations` | `20` | Max steps per restart |
-| `num_restarts` | `3` | Number of random restarts |
-| `patience` | `3` | Steps without improvement before restart |
-
-!!! success "When to use"
- Medium-sized spaces where you want to exploit model topology — cheaper models are neighbors of expensive ones.
-
----
-
## Arm Elimination
Progressively eliminates statistically dominated combinations. Starts with a small batch of datapoints, then grows the batch while eliminating underperformers.
@@ -135,92 +75,37 @@ selector = ArmEliminationModelSelector(
| `confidence` | `1.0` | Elimination confidence threshold |
!!! success "When to use"
- When bad combinations should be eliminated early to save budget. Particularly effective when there are clearly weak options.
-
----
-
-## Epsilon LUCB
-
-Identifies an ε-optimal best arm using Lower and Upper Confidence Bounds. Each round, it compares the current leader's lower confidence bound against the best challenger's upper bound. When the gap closes below epsilon, the algorithm stops with statistical confidence that the selected arm is within epsilon of optimal.
-
-```python
-from agentopt import EpsilonLUCBModelSelector
-
-selector = EpsilonLUCBModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- epsilon=0.01,
- confidence=1.0,
-)
-```
-
-| Parameter | Default | Description |
-|:----------|:--------|:------------|
-| `epsilon` | `0.01` | Acceptable gap from the true best |
-| `n_initial` | `1` | Initial datapoints per combination |
-| `confidence` | `1.0` | Confidence level for bound computation |
-
-!!! success "When to use"
- When finding the *exact* best combo isn't necessary and you can tolerate a small accuracy gap (epsilon) in exchange for significant search cost savings. Particularly effective when many combos are close in performance.
+ When bad combinations should be eliminated early to save budget. Particularly effective when there are clearly weak options. This is the default (`method="auto"`).
---
-## Threshold Successive Elimination
+## Matrix UCB
-Instead of finding the single best combination, Threshold SE classifies each combination as above or below a user-defined performance threshold. Each round, it evaluates all surviving combos on one more datapoint and checks their confidence intervals. Once a combo's interval no longer straddles the threshold (entirely above or entirely below), it's classified and removed from the active set.
+UCB exploration over the combination × datapoint matrix. Instead of evaluating every combo on every datapoint, it adaptively picks which cells to observe next.
```python
-from agentopt import ThresholdBanditSEModelSelector
+from agentopt import MatrixUCBModelSelector
-selector = ThresholdBanditSEModelSelector(
+selector = MatrixUCBModelSelector(
agent=MyAgent,
models=models,
eval_fn=eval_fn,
dataset=dataset,
- threshold=0.75,
- confidence=1.0,
+ a=1.0,
+ sample_fraction=0.25,
)
```
| Parameter | Default | Description |
|:----------|:--------|:------------|
-| `threshold` | `0.75` | Performance threshold to classify against |
-| `confidence` | `1.0` | Confidence level for bound computation |
-
-!!! success "When to use"
- When you have a minimum acceptable accuracy in mind (e.g., "I need at least 75%") and want to quickly identify which combinations meet it. Useful for filtering rather than ranking.
-
----
-
-## LM Proposal
-
-Uses a proposer LLM to shortlist promising combinations before evaluation. The proposer sees the candidate models and a dataset preview, then suggests which combinations to try.
-
-```python
-from agentopt import LMProposalModelSelector
-
-selector = LMProposalModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- proposer_model="gpt-4.1",
- objective="maximize accuracy and then minimize latency and cost",
- dataset_preview_size=10,
-)
-```
+| `a` | `1.0` | UCB exploration coefficient |
+| `sample_fraction` | `None` | Fraction of the combo × datapoint grid to observe (alias for `observation_budget_fraction`) |
+| `seed` | `None` | Random seed for reproducibility |
-| Parameter | Default | Description |
-|:----------|:--------|:------------|
-| `proposer_model` | `"gpt-4.1"` | Model used for proposal generation |
-| `proposer_client` | `None` | Custom OpenAI-compatible client; auto-creates `OpenAI()` if omitted |
-| `objective` | `"maximize accuracy and then minimize latency and cost"` | Natural-language objective passed to the proposer |
-| `dataset_preview_size` | `10` | Number of dataset examples shown to the proposer |
+A low-rank factorization variant is available via `MatrixUCBLRFModelSelector` (`method="matrix_ucb_lrf"`). It adds parameters like `rank`, `ensemble_size`, and `warmup_fraction` for structured uncertainty over the matrix.
!!! success "When to use"
- When you want to leverage an LLM's knowledge about model capabilities to skip obviously bad combinations.
+ Large search spaces where you want to sample both combinations and datapoints intelligently rather than running the full grid.
---
diff --git a/docs/concepts/parallelism.md b/docs/concepts/parallelism.md
index 7c42807..75996f9 100644
--- a/docs/concepts/parallelism.md
+++ b/docs/concepts/parallelism.md
@@ -47,7 +47,7 @@ n_combo = max_concurrent // dp_concurrent
| 10 | 10 | 1 | 10 | All slots to a single combo's datapoints |
!!! info "Bandit algorithms"
- Bandit-style selectors (Arm Elimination, Threshold SE, Epsilon-LUCB) often evaluate one datapoint at a time per round (`batch_size=1`). In this case, all `max_concurrent` slots go to running combos in parallel — which is exactly what you want for round-by-round elimination.
+ Bandit-style selectors (Arm Elimination) often evaluate one datapoint at a time per round (`batch_size=1`). In this case, all `max_concurrent` slots go to running combos in parallel — which is exactly what you want for round-by-round elimination.
## Per-Algorithm Behavior
@@ -56,11 +56,8 @@ Each selector recomputes concurrency limits at the appropriate granularity:
| Selector | When recomputed | Notes |
|:---------|:----------------|:------|
| **Brute Force** | Once | Full dataset, fixed batch size |
-| **Random Search** | Once | Same as brute force on sampled combos |
-| **Hill Climbing** | Per iteration | Recomputed for each neighbor batch |
| **Arm Elimination** | Per round | Batch size grows each round |
-| **Threshold SE** | Init + per round | Init batch, then batch_size=1 |
-| **Epsilon-LUCB** | Init + per round | Same pattern as Threshold SE |
+| **Matrix UCB** | Per UCB step | Caps cells per step via `max_concurrent` |
| **Bayesian Optimization** | Per BO batch | Recomputed for each acquisition batch |
## Usage
diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md
index 725d0d1..7869fad 100644
--- a/docs/getting-started/quickstart.md
+++ b/docs/getting-started/quickstart.md
@@ -121,7 +121,7 @@ results.print_summary()
The `models` dict maps each step name (matching the keys your `__init__` expects) to a **list of candidates**. AgentOpt picks one from each list, constructs `MyAgent({"planner": pick1, "solver": pick2})`, and evaluates it across your dataset.
-With 3 candidates per step and 2 steps, that's 9 combinations. Smart algorithms like `HillClimbingModelSelector` or `BayesianOptimizationModelSelector` can find the best combination without evaluating all of them — and they also select which datapoints to run on, stopping early when the winner is clear.
+With 3 candidates per step and 2 steps, that's 9 combinations. Smart algorithms like `ArmEliminationModelSelector` or `BayesianOptimizationModelSelector` can find the best combination without evaluating all of them — and they also select which datapoints to run on, stopping early when the winner is clear.
!!! tip "Optional: trade accuracy for cost or latency"
Add `lambda_cost=0.3` and/or `lambda_latency=0.2` to the selector constructor to rank combinations by a scalar combined objective instead of accuracy alone. Omit them (default `0.0`) for accuracy-first selection. Details: [Combined objective](../api/selectors.md#combined-objective-optional-costlatency-weights).
diff --git a/docs/index.md b/docs/index.md
index b5f2623..075fc79 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -114,7 +114,7 @@ results.print_summary()
---
- 8 algorithms from brute force to Bayesian optimization. Search spaces with thousands of combinations without evaluating them all.
+ 5 algorithms from brute force to Bayesian optimization. Search spaces with thousands of combinations without evaluating them all.
- :material-radar:{ .lg .middle } **Automatic Tracking**
@@ -162,12 +162,8 @@ AgentOpt patches `httpx` at the transport level — the same HTTP library used b
| Algorithm | Strategy | Best For |
|:----------|:---------|:---------|
| **Brute Force** | Evaluate all combinations | Small spaces (< 50 combos) |
-| **Random Search** | Random sampling | Quick baselines |
-| **Hill Climbing** | Greedy + restarts | Medium spaces with model topology |
| **Arm Elimination** | Progressive pruning | Statistical early stopping |
-| **Epsilon LUCB** | ε-optimal best arm | Extra search cost savings when ε-optimal is enough |
-| **Threshold SE** | Threshold classification | Filtering combos above/below a performance target |
-| **LM Proposal** | LLM-guided shortlist | Leveraging model knowledge |
+| **Matrix UCB** | UCB over combo × datapoint grid | Large spaces with selective sampling |
| **Bayesian Optimization** | Gaussian Process | Expensive evaluations |
[:octicons-arrow-right-24: Compare algorithms in detail](concepts/algorithms.md)
diff --git a/examples/selection/README.md b/examples/selection/README.md
index c295ca8..45c228c 100644
--- a/examples/selection/README.md
+++ b/examples/selection/README.md
@@ -10,7 +10,7 @@
| File | What it shows |
|---|---|
| [`custom_agent.py`](local/custom_agent.py) | Plain Python + OpenAI SDK — the canonical starter. |
-| [`advanced_algorithms.py`](local/advanced_algorithms.py) | Every `method=` available on `ModelSelector` (auto, random, arm_elimination, bayesian, …). |
+| [`advanced_algorithms.py`](local/advanced_algorithms.py) | Every `method=` available on `ModelSelector` (auto, arm_elimination, matrix_ucb, bayesian, …). |
| [`openai_sdk.py`](local/openai_sdk.py) | OpenAI Agents SDK. |
| [`langchain.py`](local/langchain.py), [`langgraph.py`](local/langgraph.py) | LangChain / LangGraph. |
| [`llamaindex.py`](local/llamaindex.py) | LlamaIndex. |
diff --git a/examples/selection/local/advanced_algorithms.py b/examples/selection/local/advanced_algorithms.py
index 0f9540b..f090428 100644
--- a/examples/selection/local/advanced_algorithms.py
+++ b/examples/selection/local/advanced_algorithms.py
@@ -15,7 +15,7 @@
4. For matrix UCB-LRF: pip install "agentopt-py[ucb_lrf]"
The matrix UCB demos use ``sample_fraction=0.1`` (~10% of the combination × datapoint
-grid), like ``random`` / ``bayesian``. Matrix UCB-LRF also accepts ``warmup_fraction``
+grid), like ``bayesian``. Matrix UCB-LRF also accepts ``warmup_fraction``
(alias for ``warmup_percentage``). Use ``1.0`` for a full grid; tune ``max_concurrent`` for step size.
"""
@@ -103,32 +103,6 @@ def run_auto():
return selector.select_best(parallel=True)
-def run_random():
- """method="random" — evaluate a random subset of combinations."""
- selector = ModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- method="random",
- sample_fraction=0.25, # evaluate 25% of all combinations
- )
- return selector.select_best(parallel=True)
-
-
-def run_hill_climbing():
- """method="hill_climbing" — greedy search using model quality/speed rankings."""
- selector = ModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- method="hill_climbing",
- batch_size=4, # number of neighbors to evaluate per step
- )
- return selector.select_best(parallel=True)
-
-
def run_arm_elimination():
"""method="arm_elimination" — eliminates statistically dominated combinations early."""
selector = ModelSelector(
@@ -141,44 +115,6 @@ def run_arm_elimination():
return selector.select_best(parallel=True)
-def run_epsilon_lucb():
- """method="epsilon_lucb" — stops when the best arm is identified within epsilon."""
- selector = ModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- method="epsilon_lucb",
- epsilon=0.01, # acceptable gap from the true best
- )
- return selector.select_best(parallel=True)
-
-
-def run_threshold():
- """method="threshold" — classify combinations as above/below a quality threshold."""
- selector = ModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- method="threshold",
- threshold=0.75, # minimum acceptable accuracy
- )
- return selector.select_best(parallel=True)
-
-
-def run_lm_proposal():
- """method="lm_proposal" — use a proposer LLM to shortlist promising combinations."""
- selector = ModelSelector(
- agent=MyAgent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- method="lm_proposal",
- )
- return selector.select_best(parallel=True)
-
-
def run_bayesian():
"""method="bayesian" — GP-based Bayesian optimization (requires agentopt[bayesian])."""
selector = ModelSelector(
@@ -233,12 +169,7 @@ def run_matrix_ucb_lrf():
METHODS = {
"auto": run_auto,
- "random": run_random,
- "hill_climbing": run_hill_climbing,
"arm_elimination": run_arm_elimination,
- "epsilon_lucb": run_epsilon_lucb,
- "threshold": run_threshold,
- "lm_proposal": run_lm_proposal,
"bayesian": run_bayesian,
"matrix_ucb": run_matrix_ucb,
"matrix_ucb_lrf": run_matrix_ucb_lrf,
@@ -253,12 +184,7 @@ def run_matrix_ucb_lrf():
epilog="""
Available methods:
auto Automatically finds the best combination (wired to arm_elimination; lower search cost than brute_force) (default)
- random Evaluate a random subset of combinations
- hill_climbing Greedy search using model quality/speed rankings
arm_elimination Eliminate statistically dominated combinations early
- epsilon_lucb Stop when best arm is identified within epsilon
- threshold Classify combinations above/below a quality threshold
- lm_proposal Use a proposer LLM to shortlist promising combinations
bayesian GP-based Bayesian optimization (requires agentopt[bayesian])
matrix_ucb UCB on the combination × datapoint matrix (demo: 10%% budget)
matrix_ucb_lrf Same with low-rank uncertainty (requires agentopt[ucb_lrf])
diff --git a/src/agentopt/__init__.py b/src/agentopt/__init__.py
index 0790714..8ce8162 100644
--- a/src/agentopt/__init__.py
+++ b/src/agentopt/__init__.py
@@ -80,16 +80,11 @@ def get_current_session_proxy() -> Optional[SessionProxy]:
ArmEliminationModelSelector,
BaseModelSelector,
BruteForceModelSelector,
- EpsilonLUCBModelSelector,
- HillClimbingModelSelector,
- LMProposalModelSelector,
DatapointResult,
MatrixUCBLRFModelSelector,
MatrixUCBModelSelector,
ModelResult,
- RandomSearchModelSelector,
SelectionResults,
- ThresholdBanditSEModelSelector,
)
# Bayesian is optional (requires torch/botorch)
@@ -101,14 +96,9 @@ def get_current_session_proxy() -> Optional[SessionProxy]:
_METHODS = {
"auto": ArmEliminationModelSelector,
"brute_force": BruteForceModelSelector,
- "random": RandomSearchModelSelector,
- "hill_climbing": HillClimbingModelSelector,
"arm_elimination": ArmEliminationModelSelector,
- "epsilon_lucb": EpsilonLUCBModelSelector,
"matrix_ucb": MatrixUCBModelSelector,
"matrix_ucb_lrf": MatrixUCBLRFModelSelector,
- "threshold": ThresholdBanditSEModelSelector,
- "lm_proposal": LMProposalModelSelector,
"bayesian": BayesianOptimizationModelSelector,
}
@@ -131,13 +121,11 @@ def ModelSelector(
the best combination (same implementation as ``"arm_elimination"`` —
strong best-arm identification with lower search cost than
``"brute_force"``). Other options: ``"brute_force"``,
- ``"random"``, ``"hill_climbing"``, ``"arm_elimination"``,
- ``"epsilon_lucb"``, ``"matrix_ucb"``, ``"matrix_ucb_lrf"``,
- ``"threshold"``,
- ``"lm_proposal"``, ``"bayesian"``.
+ ``"arm_elimination"``, ``"matrix_ucb"``, ``"matrix_ucb_lrf"``,
+ ``"bayesian"``.
**kwargs: Additional arguments passed to the selector
- (e.g. ``epsilon``, ``threshold``, ``sample_fraction``, ``warmup_fraction``
- for matrix UCB-LRF; ``lambda_cost``, ``lambda_latency`` for the optional
+ (e.g. ``sample_fraction``, ``warmup_fraction`` for matrix UCB-LRF;
+ ``lambda_cost``, ``lambda_latency`` for the optional
combined objective ``score - lambda_cost*norm_cost -
lambda_latency*norm_latency`` — both default to ``0.0`` / accuracy-only).
@@ -167,14 +155,9 @@ def ModelSelector(
"CallRecord",
# Selectors
"BruteForceModelSelector",
- "RandomSearchModelSelector",
- "HillClimbingModelSelector",
"ArmEliminationModelSelector",
- "EpsilonLUCBModelSelector",
"MatrixUCBModelSelector",
"MatrixUCBLRFModelSelector",
- "ThresholdBanditSEModelSelector",
- "LMProposalModelSelector",
"BayesianOptimizationModelSelector",
# Result types
"DatapointResult",
diff --git a/src/agentopt/model_selection/__init__.py b/src/agentopt/model_selection/__init__.py
index 56feae6..cec5306 100644
--- a/src/agentopt/model_selection/__init__.py
+++ b/src/agentopt/model_selection/__init__.py
@@ -3,11 +3,6 @@
from .arm_elimination import ArmEliminationModelSelector
from .base import BaseModelSelector, DatapointResult, ModelResult, SelectionResults
from .brute_force import BruteForceModelSelector
-from .epsilon_lucb import EpsilonLUCBModelSelector
-from .hill_climbing import HillClimbingModelSelector
-from .lm_proposal import LMProposalModelSelector
-from .random_search import RandomSearchModelSelector
-from .threshold_successive_elimination import ThresholdBanditSEModelSelector
from .matrix_ucb import MatrixUCBLRFModelSelector, MatrixUCBModelSelector
# Bayesian is optional (requires torch/botorch)
@@ -19,12 +14,7 @@
__all__ = [
"BaseModelSelector",
"BruteForceModelSelector",
- "RandomSearchModelSelector",
- "HillClimbingModelSelector",
"ArmEliminationModelSelector",
- "EpsilonLUCBModelSelector",
- "ThresholdBanditSEModelSelector",
- "LMProposalModelSelector",
"MatrixUCBModelSelector",
"MatrixUCBLRFModelSelector",
"BayesianOptimizationModelSelector",
diff --git a/src/agentopt/model_selection/base.py b/src/agentopt/model_selection/base.py
index 177487f..157e747 100644
--- a/src/agentopt/model_selection/base.py
+++ b/src/agentopt/model_selection/base.py
@@ -716,7 +716,6 @@ def __init__(
eval_fn: EvalFn = None,
dataset: Dataset = None,
model_prices: Optional[Dict[str, Dict[str, float]]] = None,
- node_descriptions: Optional[Dict[str, str]] = None,
tracker: Optional[LLMTracker] = None,
lambda_cost: float = 0.0,
lambda_latency: float = 0.0,
@@ -739,9 +738,6 @@ def __init__(
model_prices: Optional custom pricing overrides. Maps model names
to dicts with ``'input_price'`` and ``'output_price'`` keys
($/MTok).
- node_descriptions: Optional dict mapping node names to human-readable
- descriptions of what each node does, e.g.
- ``{"planner": "Decomposes queries into sub-tasks"}``.
tracker: Optional :class:`LLMTracker` instance. If not provided,
one is created and started automatically.
lambda_cost: Weight on normalized per-sample cost in the combined
@@ -776,7 +772,6 @@ def __init__(
self._models = models
self._node_names = list(models.keys())
self.model_prices = model_prices
- self.node_descriptions = node_descriptions
self.lambda_cost = float(lambda_cost)
self.lambda_latency = float(lambda_latency)
diff --git a/src/agentopt/model_selection/epsilon_lucb.py b/src/agentopt/model_selection/epsilon_lucb.py
deleted file mode 100644
index 52cd05c..0000000
--- a/src/agentopt/model_selection/epsilon_lucb.py
+++ /dev/null
@@ -1,341 +0,0 @@
-"""
-Epsilon-optimal LUCB model selector.
-
-Identifies an epsilon-optimal best model combination using confidence bounds.
-"""
-
-import asyncio
-import logging
-import math
-from typing import Any, Callable, Dict, List, Optional, Set, Tuple
-
-from ..base_models import Dataset, EvalFn, ModelCandidate
-from .base import BaseModelSelector, ModelResult, SelectionResults
-
-logger = logging.getLogger(__name__)
-
-
-class EpsilonLUCBModelSelector(BaseModelSelector):
- """Select models via epsilon-optimal LUCB."""
-
- def __init__(
- self,
- agent: Any = None,
- models: Dict[str, List[ModelCandidate]] = None,
- eval_fn: EvalFn = None,
- dataset: Dataset = None,
- epsilon: float = 0.01,
- n_initial: int = 1,
- confidence: float = 1.0,
- model_prices: Optional[Dict[str, Dict[str, float]]] = None,
- tracker=None,
- lambda_cost: float = 0.0,
- lambda_latency: float = 0.0,
- ) -> None:
- super().__init__(
- agent=agent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- model_prices=model_prices,
- tracker=tracker,
- lambda_cost=lambda_cost,
- lambda_latency=lambda_latency,
- )
- self.epsilon = max(0.0, float(epsilon))
- self.n_initial = max(1, int(n_initial))
- self.confidence = confidence
-
- def _run_selection(
- self, parallel: bool = False, max_concurrent: int = 20,
- ) -> SelectionResults:
- if parallel:
- return asyncio.run(self._select_async(max_concurrent))
- return self._select_sequential()
-
- def _select_sequential(self) -> SelectionResults:
- all_combos = self._all_combos()
- dataset_list = list(self.dataset)
- n_total = len(dataset_list)
- n_arms = len(all_combos)
-
- combo_scores: Dict[int, List[float]] = {i: [] for i in range(n_arms)}
- combo_latencies: Dict[int, List[float]] = {i: [] for i in range(n_arms)}
- combo_costs: Dict[int, List[Optional[float]]] = {i: [] for i in range(n_arms)}
- combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(n_arms)}
- active: Set[int] = set(range(n_arms))
-
- print(f"\n{'='*60}")
- print(
- f"Epsilon-LUCB (sequential): {n_arms} combinations, "
- f"{n_total} samples, epsilon={self.epsilon}"
- )
- print(f"{'='*60}")
-
- offset = 0
- init_batch_size = min(self.n_initial, n_total)
- assert init_batch_size > 0
- init_batch = dataset_list[offset : offset + init_batch_size]
- for idx in range(n_arms):
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = self._evaluate_combo(
- combo, init_batch, label=combo_name, dp_offset=offset
- )
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- offset += init_batch_size
-
- round_num = 1
- while active and offset < n_total:
- h_idx, l_idx, h_lcb, l_ucb = self._choose_lucb_pair(
- active, combo_scores, combo_latencies, combo_costs
- )
- gap = l_ucb - h_lcb
- if gap <= self.epsilon or l_idx is None:
- break
-
- batch = [dataset_list[offset]]
- offset += 1
- sample_idxs = [h_idx] if l_idx == h_idx else [h_idx, l_idx]
-
- print(
- f"\nRound {round_num} [sample {offset}/{n_total}] "
- f"h={self._combo_name(all_combos[h_idx])}, "
- f"l={self._combo_name(all_combos[l_idx])}, gap={gap:.4f}"
- )
-
- for idx in sample_idxs:
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = self._evaluate_combo(
- combo, batch, label=combo_name, dp_offset=offset - 1
- )
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- objs = self._compute_objectives(
- combo_scores[idx], combo_latencies[idx], combo_costs[idx]
- )
- mu, _ = self._compute_stats(objs)
- print(f" {combo_name}: mu={mu:.3f} (n={len(objs)})")
-
- round_num += 1
-
- return self._build_results(
- all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids
- )
-
- async def _select_async(self, max_concurrent: int = 20) -> SelectionResults:
- all_combos = self._all_combos()
- dataset_list = list(self.dataset)
- n_total = len(dataset_list)
- n_arms = len(all_combos)
-
- combo_scores: Dict[int, List[float]] = {i: [] for i in range(n_arms)}
- combo_latencies: Dict[int, List[float]] = {i: [] for i in range(n_arms)}
- combo_costs: Dict[int, List[Optional[float]]] = {i: [] for i in range(n_arms)}
- combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(n_arms)}
- active: Set[int] = set(range(n_arms))
-
- print(f"\n{'='*60}")
- print(
- f"Epsilon-LUCB (async): {n_arms} combinations, "
- f"{n_total} samples, epsilon={self.epsilon}, "
- f"max {max_concurrent} total concurrent"
- )
- print(f"{'='*60}")
-
- offset = 0
- init_batch_size = min(self.n_initial, n_total)
- assert init_batch_size > 0
- init_batch = dataset_list[offset : offset + init_batch_size]
- n_combo_init, dp_concurrent_init = self._compute_concurrency(
- max_concurrent, init_batch_size
- )
- init_combo_sem = asyncio.Semaphore(n_combo_init)
-
- async def _eval_initial(
- idx: int,
- ) -> Tuple[int, List[float], List[float], List[str]]:
- async with init_combo_sem:
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = await self._evaluate_combo_async(
- combo,
- init_batch,
- label=combo_name,
- max_concurrent=dp_concurrent_init,
- dp_offset=offset,
- )
- return idx, scores, latencies, dp_ids
-
- round_results = await asyncio.gather(
- *[_eval_initial(idx) for idx in range(n_arms)], return_exceptions=True,
- )
- for res in round_results:
- if isinstance(res, Exception):
- logger.warning("Initial LUCB batch evaluation error: %s", res)
- continue
- idx, scores, latencies, dp_ids = res
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- offset += init_batch_size
-
- round_num = 1
- # Per-round batch_size is always 1
- n_combo_round, dp_concurrent_round = self._compute_concurrency(
- max_concurrent, 1
- )
- round_combo_sem = asyncio.Semaphore(n_combo_round)
-
- while active and offset < n_total:
- h_idx, l_idx, h_lcb, l_ucb = self._choose_lucb_pair(
- active, combo_scores, combo_latencies, combo_costs
- )
- gap = l_ucb - h_lcb
- if gap <= self.epsilon or l_idx is None:
- break
-
- batch = [dataset_list[offset]]
- offset += 1
- sample_idxs = [h_idx] if l_idx == h_idx else [h_idx, l_idx]
-
- print(
- f"\nRound {round_num} [sample {offset}/{n_total}] "
- f"h={self._combo_name(all_combos[h_idx])}, "
- f"l={self._combo_name(all_combos[l_idx])}, gap={gap:.4f}"
- )
-
- async def _eval_pair(
- idx: int,
- ) -> Tuple[int, List[float], List[float], List[str]]:
- async with round_combo_sem:
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = await self._evaluate_combo_async(
- combo,
- batch,
- label=combo_name,
- max_concurrent=dp_concurrent_round,
- dp_offset=offset - 1,
- )
- return idx, scores, latencies, dp_ids
-
- round_results = await asyncio.gather(
- *[_eval_pair(idx) for idx in sample_idxs], return_exceptions=True,
- )
- for res in round_results:
- if isinstance(res, Exception):
- logger.warning("LUCB pair evaluation error: %s", res)
- continue
- idx, scores, latencies, dp_ids = res
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- objs = self._compute_objectives(
- combo_scores[idx], combo_latencies[idx], combo_costs[idx]
- )
- mu, _ = self._compute_stats(objs)
- print(
- f" {self._combo_name(all_combos[idx])}: "
- f"mu={mu:.3f} (n={len(objs)})"
- )
-
- round_num += 1
-
- return self._build_results(
- all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids
- )
-
- def _choose_lucb_pair(
- self,
- active: Set[int],
- combo_scores: Dict[int, List[float]],
- combo_latencies: Dict[int, List[float]],
- combo_costs: Dict[int, List[Optional[float]]],
- ) -> Tuple[int, Optional[int], float, float]:
- stats: Dict[int, Tuple[float, float, float]] = {}
- for idx in active:
- objs = self._compute_objectives(
- combo_scores[idx], combo_latencies[idx], combo_costs[idx]
- )
- stats[idx] = self._confidence_bounds(objs)
-
- h_idx = max(active, key=lambda i: stats[i][0]) # highest empirical mean
- if len(active) == 1:
- _, h_lcb, _ = stats[h_idx]
- return h_idx, None, h_lcb, h_lcb
-
- competitors = [i for i in active if i != h_idx]
- l_idx = max(competitors, key=lambda i: stats[i][2]) # highest UCB
- h_lcb = stats[h_idx][1]
- l_ucb = stats[l_idx][2]
- return h_idx, l_idx, h_lcb, l_ucb
-
- def _confidence_bounds(self, values: List[float]) -> Tuple[float, float, float]:
- n = len(values)
- if n == 0:
- return 0.0, float("-inf"), float("inf")
- mu, std = self._compute_stats(values)
- se = std / math.sqrt(n)
- radius = self.confidence * se
- return mu, mu - radius, mu + radius
-
- def _build_results(
- self,
- all_combos: List[Dict[str, ModelCandidate]],
- combo_scores: Dict[int, List[float]],
- combo_latencies: Dict[int, List[float]],
- combo_costs: Dict[int, List[Optional[float]]],
- combo_dp_ids: Dict[int, List[str]],
- ) -> SelectionResults:
- all_results: List[ModelResult] = []
- for idx, combo in enumerate(all_combos):
- combo_name = self._combo_name(combo)
- scores = combo_scores[idx]
- if scores:
- all_results.append(
- self._build_combo_result(
- combo_name,
- scores,
- combo_latencies[idx],
- combo_dp_ids[idx],
- costs=combo_costs[idx],
- )
- )
- else:
- all_results.append(
- self._make_result(
- model_name=combo_name,
- accuracy=0.0,
- latency_seconds=0.0,
- input_tokens={},
- output_tokens={},
- attribute="combination",
- is_best=False,
- )
- )
-
- self._finalize_combined_objectives(all_results)
- best_info = self._find_best(all_results)
- if best_info is not None:
- best_name, _ = best_info
- for result in all_results:
- if result.model_name == best_name:
- result.is_best = True
- break
- else:
- print("\n No combinations succeeded.")
-
- return SelectionResults(results=all_results)
diff --git a/src/agentopt/model_selection/hill_climbing.py b/src/agentopt/model_selection/hill_climbing.py
deleted file mode 100644
index b6b8ab9..0000000
--- a/src/agentopt/model_selection/hill_climbing.py
+++ /dev/null
@@ -1,559 +0,0 @@
-"""
-Hill-climbing model selector with random restarts.
-
-Uses the model topology (quality / speed rankings) to define
-neighbours so that each iteration makes an informed single-step move.
-"""
-
-import asyncio
-import random
-from typing import Any, Callable, Dict, List, Optional, Set, Tuple
-
-from ..base_models import Dataset, EvalFn, ModelCandidate
-from ..model_price import compute_price
-from ..model_topology import get_faster_neighbor, get_higher_quality_neighbor
-from .base import BaseModelSelector, DatapointResult, ModelResult, SelectionResults
-
-
-class HillClimbingModelSelector(BaseModelSelector):
- """Select models via stochastic hill climbing with random restarts."""
-
- def __init__(
- self,
- agent: Any = None,
- models: Dict[str, List[ModelCandidate]] = None,
- eval_fn: EvalFn = None,
- dataset: Dataset = None,
- max_iterations: int = 20,
- num_restarts: int = 3,
- patience: int = 3,
- seed: Optional[int] = None,
- batch_size: int = 1,
- model_prices: Optional[Dict[str, Dict[str, float]]] = None,
- tracker=None,
- lambda_cost: float = 0.0,
- lambda_latency: float = 0.0,
- ) -> None:
- super().__init__(
- agent=agent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- model_prices=model_prices,
- tracker=tracker,
- lambda_cost=lambda_cost,
- lambda_latency=lambda_latency,
- )
- self.max_iterations = max_iterations
- self.num_restarts = num_restarts
- self.patience = patience
- self.batch_size = max(1, int(batch_size))
-
- if seed is not None:
- random.seed(seed)
-
- # Pre-compute all combinations for random starts.
- self._all_combo_list = self._all_combos()
-
- # Cache: combo_name -> (accuracy, latency, input_tokens, output_tokens, datapoint_results).
- self._eval_cache: Dict[
- str,
- Tuple[float, float, Dict[str, int], Dict[str, int], List[DatapointResult]],
- ] = {}
-
- def _objective_from_dp(self, dp_results: List[DatapointResult]) -> Optional[float]:
- """Recompute the mean combined objective from cached datapoint results."""
- if not self._has_combined_objective or not dp_results:
- return None
- scores = [dp.score for dp in dp_results]
- lats = [dp.latency_seconds for dp in dp_results]
- costs = [
- compute_price(
- dp.input_tokens, dp.output_tokens, custom_prices=self._custom_prices,
- )
- for dp in dp_results
- ]
- return self._mean_objective(scores, lats, costs)
-
- def _primary_value(
- self, accuracy: float, dp_results: List[DatapointResult],
- ) -> float:
- """Ranking key for tiebreaks: combined objective if configured, else accuracy."""
- obj = self._objective_from_dp(dp_results)
- return obj if obj is not None else accuracy
-
- # ------------------------------------------------------------------
- # Shared helpers
- # ------------------------------------------------------------------
-
- def _random_combination(
- self, seen: Set[str]
- ) -> Optional[Dict[str, ModelCandidate]]:
- """Pick a random unseen combination, or ``None`` if all exhausted."""
- unseen = [c for c in self._all_combo_list if self._combo_name(c) not in seen]
- if unseen:
- return dict(random.choice(unseen))
- return None
-
- def _process_eval_result(
- self,
- combo_name: str,
- scores: List[float],
- latencies: List[float],
- dp_ids: List[str],
- ) -> Tuple[
- str, float, float, Dict[str, int], Dict[str, int], List[DatapointResult], bool
- ]:
- """Compute stats, absorb cost samples, cache, and return the eval tuple."""
- self._observe_combo(scores, latencies, dp_ids)
- input_tokens, output_tokens = self._fetch_tokens(combo_name)
- accuracy, _ = self._compute_stats(scores)
- latency = sum(latencies) / len(latencies) if latencies else 0.0
- dp_results = self._build_datapoint_results(scores, latencies, dp_ids)
- self._eval_cache[combo_name] = (
- accuracy,
- latency,
- input_tokens,
- output_tokens,
- dp_results,
- )
- return (
- combo_name,
- accuracy,
- latency,
- input_tokens,
- output_tokens,
- dp_results,
- False,
- )
-
- def _evaluate_cached(
- self, combo: Dict[str, ModelCandidate],
- ) -> Tuple[
- str, float, float, Dict[str, int], Dict[str, int], List[DatapointResult], bool
- ]:
- """Evaluate a combo synchronously, using an in-memory cache."""
- combo_name = self._combo_name(combo)
- if combo_name in self._eval_cache:
- acc, lat, in_tok, out_tok, dp_results = self._eval_cache[combo_name]
- return combo_name, acc, lat, in_tok, out_tok, dp_results, True
- scores, latencies, dp_ids = self._evaluate_combo(
- combo, self.dataset, label=combo_name
- )
- return self._process_eval_result(combo_name, scores, latencies, dp_ids)
-
- async def _evaluate_cached_async(
- self, combo: Dict[str, ModelCandidate], max_concurrent: int
- ) -> Tuple[
- str, float, float, Dict[str, int], Dict[str, int], List[DatapointResult], bool
- ]:
- """Evaluate a combo asynchronously, using an in-memory cache."""
- combo_name = self._combo_name(combo)
- if combo_name in self._eval_cache:
- acc, lat, in_tok, out_tok, dp_results = self._eval_cache[combo_name]
- return combo_name, acc, lat, in_tok, out_tok, dp_results, True
- scores, latencies, dp_ids = await self._evaluate_combo_async(
- combo, self.dataset, label=combo_name, max_concurrent=max_concurrent
- )
- return self._process_eval_result(combo_name, scores, latencies, dp_ids)
-
- def _get_neighbors(
- self, combo: Dict[str, ModelCandidate], seen: Set[str], accuracy: float,
- ) -> List[Dict[str, ModelCandidate]]:
- """Generate neighbors with quality/speed fallback logic."""
- if accuracy < 1.0:
- neighbors = self._generate_neighbors(
- combo, seen, max_neighbors=self.batch_size, improve_quality=True,
- )
- if not neighbors:
- neighbors = self._generate_neighbors(
- combo, seen, max_neighbors=self.batch_size, improve_quality=False,
- )
- else:
- neighbors = self._generate_neighbors(
- combo, seen, max_neighbors=self.batch_size, improve_quality=False,
- )
- if not neighbors:
- neighbors = self._generate_neighbors(
- combo, seen, max_neighbors=self.batch_size, improve_quality=True,
- )
- return neighbors
-
- def _generate_neighbors(
- self,
- combo: Dict[str, ModelCandidate],
- seen: Set[str],
- max_neighbors: int,
- improve_quality: bool,
- ) -> List[Dict[str, ModelCandidate]]:
- """Generate up to *max_neighbors* unseen neighbors that differ by one node."""
- neighbors: List[Dict[str, ModelCandidate]] = []
- node_names = list(combo.keys())
- random.shuffle(node_names)
-
- for node in node_names:
- current = combo[node]
- if improve_quality:
- neighbor = get_higher_quality_neighbor(current, self._models[node])
- else:
- neighbor = get_faster_neighbor(current, self._models[node])
- if neighbor is None:
- continue
-
- new_combo = dict(combo)
- new_combo[node] = neighbor
- if self._combo_name(new_combo) in seen:
- continue
-
- neighbors.append(new_combo)
- if len(neighbors) >= max_neighbors:
- break
-
- return neighbors
-
- def _pick_best_neighbor(
- self,
- eval_results: List[Tuple],
- neighbors: List[Dict[str, ModelCandidate]],
- seen: Set[str],
- current_value: float,
- current_latency: float,
- tol: float,
- ) -> Optional[Dict[str, ModelCandidate]]:
- """Select the best neighbor from eval results, or None if none improves.
-
- Ranks by primary value (combined objective when ``lambda_*`` are set,
- else accuracy), with latency as the tiebreaker.
- """
- best_neighbor: Optional[Dict[str, ModelCandidate]] = None
- best_n_val = float("-inf")
- best_n_lat = float("inf")
-
- for neighbor, eval_result in zip(neighbors, eval_results):
- n_name, n_acc, n_lat, _, _, n_dp_results, _ = eval_result
- seen.add(n_name)
- n_val = self._primary_value(n_acc, n_dp_results)
-
- if n_val > best_n_val + tol:
- best_neighbor, best_n_val, best_n_lat = neighbor, n_val, n_lat
- elif abs(n_val - best_n_val) <= tol and n_lat < best_n_lat:
- best_neighbor, best_n_val, best_n_lat = neighbor, n_val, n_lat
-
- if best_neighbor is None or (
- best_n_val < current_value - tol
- or (
- abs(best_n_val - current_value) <= tol
- and best_n_lat >= current_latency
- )
- ):
- return None
- return best_neighbor
-
- def _hc_finalize(
- self,
- all_results: List[ModelResult],
- global_best_combo: Optional[Dict[str, ModelCandidate]],
- global_best_value: float,
- ) -> SelectionResults:
- """Finalize combined objectives, mark the best result, return results."""
- self._finalize_combined_objectives(all_results)
- if global_best_combo is None:
- print("\nNo combinations succeeded\n")
- return SelectionResults(results=all_results)
-
- # Prefer the combined-objective-aware _find_best when lambdas are set;
- # otherwise honor the within-search global best to preserve the
- # original tie-breaking semantics.
- if self._has_combined_objective:
- best_info = self._find_best(all_results)
- best_name = best_info[0] if best_info else self._combo_name(global_best_combo)
- else:
- best_name = self._combo_name(global_best_combo)
-
- tol = 1e-9
- for result in all_results:
- if result.model_name != best_name:
- continue
- if self._has_combined_objective:
- result.is_best = True
- break
- # Accuracy-mode: match by name AND the tracked best value.
- if abs(result.accuracy - global_best_value) < tol:
- result.is_best = True
- break
- return SelectionResults(results=all_results)
-
- # ------------------------------------------------------------------
- # Single restart (sequential)
- # ------------------------------------------------------------------
-
- def _hill_climb_once_sequential(
- self, seen: Set[str],
- ) -> Optional[Tuple[Dict[str, ModelCandidate], float, float, List[ModelResult]]]:
- combo = self._random_combination(seen)
- if combo is None:
- return None
-
- results: List[ModelResult] = []
- best_combo = dict(combo)
- best_value = float("-inf")
- best_latency = float("inf")
- tol = 1e-9
- no_improve_count = 0
-
- for iteration in range(self.max_iterations):
- combo_name = self._combo_name(combo)
- seen.add(combo_name)
-
- (
- _,
- accuracy,
- latency,
- input_tokens,
- output_tokens,
- dp_results,
- cached,
- ) = self._evaluate_cached(combo)
-
- result = self._make_result(
- model_name=combo_name,
- accuracy=accuracy,
- latency_seconds=latency,
- input_tokens=input_tokens,
- output_tokens=output_tokens,
- attribute="combination",
- is_best=False,
- datapoint_results=dp_results,
- )
- suffix = " (cached)" if cached else ""
- print(f" Iter {iteration + 1}: {result}{suffix}")
- results.append(result)
-
- current_value = self._primary_value(accuracy, dp_results)
- should_update = (
- best_value == float("-inf")
- or current_value > best_value + tol
- or (abs(current_value - best_value) <= tol and latency < best_latency)
- )
- if should_update:
- best_value, best_latency, best_combo = current_value, latency, dict(combo)
- no_improve_count = 0
- else:
- no_improve_count += 1
-
- if no_improve_count >= self.patience:
- print(
- f" No improvement for {self.patience} iterations. "
- f"Converged at iteration {iteration + 1}."
- )
- break
-
- neighbors = self._get_neighbors(combo, seen, accuracy)
- if not neighbors:
- print(f" No improving moves at iteration {iteration + 1}. Stopping.")
- break
-
- eval_results = [self._evaluate_cached(n) for n in neighbors]
- best_neighbor = self._pick_best_neighbor(
- eval_results, neighbors, seen, current_value, latency, tol
- )
- if best_neighbor is None:
- print(
- f" No neighbor in batch of {len(neighbors)} improves at "
- f"iteration {iteration + 1}. Stopping."
- )
- break
-
- combo = dict(best_neighbor)
-
- return best_combo, best_value, best_latency, results
-
- # ------------------------------------------------------------------
- # Single restart (async)
- # ------------------------------------------------------------------
-
- async def _hill_climb_once_async(
- self, seen: Set[str], max_concurrent: int
- ) -> Optional[Tuple[Dict[str, ModelCandidate], float, float, List[ModelResult]]]:
- combo = self._random_combination(seen)
- if combo is None:
- return None
-
- results: List[ModelResult] = []
- best_combo = dict(combo)
- best_value = float("-inf")
- best_latency = float("inf")
- tol = 1e-9
- no_improve_count = 0
-
- for iteration in range(self.max_iterations):
- combo_name = self._combo_name(combo)
- seen.add(combo_name)
-
- (
- _,
- accuracy,
- latency,
- input_tokens,
- output_tokens,
- dp_results,
- cached,
- ) = await self._evaluate_cached_async(combo, max_concurrent=max_concurrent)
-
- result = self._make_result(
- model_name=combo_name,
- accuracy=accuracy,
- latency_seconds=latency,
- input_tokens=input_tokens,
- output_tokens=output_tokens,
- attribute="combination",
- is_best=False,
- datapoint_results=dp_results,
- )
- suffix = " (cached)" if cached else ""
- print(f" Iter {iteration + 1}: {result}{suffix}")
- results.append(result)
-
- current_value = self._primary_value(accuracy, dp_results)
- should_update = (
- best_value == float("-inf")
- or current_value > best_value + tol
- or (abs(current_value - best_value) <= tol and latency < best_latency)
- )
- if should_update:
- best_value, best_latency, best_combo = current_value, latency, dict(combo)
- no_improve_count = 0
- else:
- no_improve_count += 1
-
- if no_improve_count >= self.patience:
- print(
- f" No improvement for {self.patience} iterations. "
- f"Converged at iteration {iteration + 1}."
- )
- break
-
- neighbors = self._get_neighbors(combo, seen, accuracy)
- if not neighbors:
- print(f" No improving moves at iteration {iteration + 1}. Stopping.")
- break
-
- batch_size = len(self.dataset)
- n_combo_nb, dp_concurrent_nb = self._compute_concurrency(
- max_concurrent, batch_size
- )
- neighbor_sem = asyncio.Semaphore(n_combo_nb)
-
- async def _eval_neighbor_throttled(
- n: Dict[str, ModelCandidate],
- ) -> Tuple[str, float, float, Dict[str, int], Dict[str, int], List, bool]:
- async with neighbor_sem:
- return await self._evaluate_cached_async(
- n, max_concurrent=dp_concurrent_nb
- )
-
- eval_results = await asyncio.gather(
- *(_eval_neighbor_throttled(n) for n in neighbors)
- )
- best_neighbor = self._pick_best_neighbor(
- eval_results, neighbors, seen, current_value, latency, tol
- )
- if best_neighbor is None:
- print(
- f" No neighbor in batch of {len(neighbors)} improves at "
- f"iteration {iteration + 1}. Stopping."
- )
- break
-
- combo = dict(best_neighbor)
-
- return best_combo, best_value, best_latency, results
-
- # ------------------------------------------------------------------
- # Public API
- # ------------------------------------------------------------------
-
- def _run_selection(
- self, parallel: bool = False, max_concurrent: int = 20,
- ) -> SelectionResults:
- if parallel:
- return asyncio.run(self._run_selection_async(max_concurrent))
- return self._run_selection_sequential()
-
- def _run_selection_sequential(self) -> SelectionResults:
- all_results: List[ModelResult] = []
- global_best_combo: Optional[Dict[str, ModelCandidate]] = None
- global_best_value = float("-inf")
- global_best_latency = float("inf")
- tol = 1e-9
-
- print(f"\n{'=' * 60}")
- print(
- f"Hill climbing (sequential): {self.num_restarts} restart(s), "
- f"max {self.max_iterations} iterations each, patience {self.patience}"
- )
- print(f"{'=' * 60}\n")
-
- seen: Set[str] = set()
- for restart in range(self.num_restarts):
- print(f"--- Restart {restart + 1}/{self.num_restarts} ---")
- result = self._hill_climb_once_sequential(seen)
- if result is None:
- print(" All combinations exhausted. Stopping.\n")
- break
- best_combo, best_val, best_lat, run_results = result
- all_results.extend(run_results)
-
- if (
- global_best_combo is None
- or best_val > global_best_value + tol
- or (
- abs(best_val - global_best_value) <= tol
- and best_lat < global_best_latency
- )
- ):
- global_best_value = best_val
- global_best_latency = best_lat
- global_best_combo = best_combo
-
- return self._hc_finalize(all_results, global_best_combo, global_best_value)
-
- async def _run_selection_async(self, max_concurrent: int = 20,) -> SelectionResults:
- all_results: List[ModelResult] = []
- global_best_combo: Optional[Dict[str, ModelCandidate]] = None
- global_best_value = float("-inf")
- global_best_latency = float("inf")
- tol = 1e-9
-
- print(f"\n{'=' * 60}")
- print(
- f"Hill climbing (parallel): {self.num_restarts} restart(s), "
- f"max {self.max_iterations} iterations each, patience {self.patience}"
- )
- print(f"{'=' * 60}\n")
-
- seen: Set[str] = set()
- for restart in range(self.num_restarts):
- print(f"--- Restart {restart + 1}/{self.num_restarts} ---")
- result = await self._hill_climb_once_async(
- seen, max_concurrent=max_concurrent
- )
- if result is None:
- print(" All combinations exhausted. Stopping.\n")
- break
- best_combo, best_val, best_lat, run_results = result
- all_results.extend(run_results)
-
- if (
- global_best_combo is None
- or best_val > global_best_value + tol
- or (
- abs(best_val - global_best_value) <= tol
- and best_lat < global_best_latency
- )
- ):
- global_best_value = best_val
- global_best_latency = best_lat
- global_best_combo = best_combo
-
- return self._hc_finalize(all_results, global_best_combo, global_best_value)
diff --git a/src/agentopt/model_selection/lm_proposal.py b/src/agentopt/model_selection/lm_proposal.py
deleted file mode 100644
index addf44d..0000000
--- a/src/agentopt/model_selection/lm_proposal.py
+++ /dev/null
@@ -1,314 +0,0 @@
-"""
-LLM-proposal model selector.
-
-This selector asks a proposer LLM to suggest the single best model combination
-for a multi-node agent, using node descriptions, model prices, and a dataset
-preview to inform its recommendation. The proposed combination is returned
-directly without evaluation.
-"""
-
-from __future__ import annotations
-
-import json
-import logging
-from typing import Any, Callable, Dict, List, Optional, Tuple
-
-from pydantic import BaseModel, Field, ValidationError
-
-from ..base_models import Dataset, EvalFn, ModelCandidate
-from ..model_price import get_model_price
-from .base import BaseModelSelector, SelectionResults
-
-logger = logging.getLogger(__name__)
-
-
-class ProposalResponse(BaseModel):
- """Expected JSON response from the proposer LLM."""
-
- combination: Dict[str, str] = Field(
- description="Mapping of node name to selected model name.",
- )
- reasoning: str = Field(
- default="", description="Brief explanation of why this combination was chosen.",
- )
-
-
-class LMProposalModelSelector(BaseModelSelector):
- """Model selector where an LLM proposes the single best combination."""
-
- def __init__(
- self,
- agent: Any = None,
- models: Dict[str, List[ModelCandidate]] = None,
- eval_fn: EvalFn = None,
- dataset: Dataset = None,
- proposer_model: str = "gpt-4.1",
- proposer_client: Any = None,
- objective: str = "maximize accuracy and then minimize latency and cost",
- dataset_preview_size: int = 10,
- model_prices: Optional[Dict[str, Dict[str, float]]] = None,
- node_descriptions: Optional[Dict[str, str]] = None,
- lambda_cost: float = 0.0,
- lambda_latency: float = 0.0,
- ) -> None:
- super().__init__(
- agent=agent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- model_prices=model_prices,
- node_descriptions=node_descriptions,
- lambda_cost=lambda_cost,
- lambda_latency=lambda_latency,
- )
- if dataset_preview_size < 1:
- raise ValueError("dataset_preview_size must be >= 1.")
-
- self.proposer_model = proposer_model
- self.objective = objective
- self.dataset_preview_size = dataset_preview_size
-
- # Build label→index lookup per node for parsing LLM responses.
- self._label_to_index: Dict[str, Dict[str, int]] = {}
- for node in self._node_names:
- self._label_to_index[node] = {
- self._candidate_label(c): idx
- for idx, c in enumerate(self._models[node])
- }
-
- if proposer_client is None:
- try:
- from openai import OpenAI
- except ImportError as e:
- raise ImportError(
- "LMProposalModelSelector requires `openai` unless proposer_client is supplied. "
- "Install with: pip install openai"
- ) from e
- proposer_client = OpenAI()
- self.proposer_client = proposer_client
-
- # ------------------------------------------------------------------
- # Public API
- # ------------------------------------------------------------------
-
- def _run_selection(
- self, parallel: bool = False, max_concurrent: int = 20,
- ) -> SelectionResults:
- if parallel:
- logger.warning(
- "LMProposalModelSelector received parallel=True, but only a single "
- "combination is evaluated; proceeding with sequential evaluation."
- )
- combo_idx = self._ask_proposer()
- if combo_idx is None:
- combo_idx = tuple(0 for _ in self._node_names)
-
- combo = self._index_combo_to_combo(combo_idx)
- combo_name = self._combo_name(combo)
-
- print(f"\n{'='*60}")
- print(f"LM proposal: evaluating proposed combination")
- print(f"{'='*60}\n")
- print(f" [1/1] Evaluating: {combo_name}")
-
- try:
- scores, latencies, dp_ids = self._evaluate_combo(
- combo, self.dataset, label=combo_name
- )
- result = self._build_combo_result(
- combo_name, scores, latencies, dp_ids, is_best=True,
- )
- print(f" {result}")
- except Exception as e:
- print(f" [{combo_name}] failed: {e}")
- result = self._make_result(
- model_name=combo_name,
- accuracy=0.0,
- latency_seconds=0.0,
- input_tokens={},
- output_tokens={},
- attribute="combination",
- is_best=True,
- )
-
- results = [result]
- self._finalize_combined_objectives(results)
- return SelectionResults(results=results)
-
- # ------------------------------------------------------------------
- # Prompt construction
- # ------------------------------------------------------------------
-
- @staticmethod
- def _safe_json(value: Any) -> Any:
- try:
- json.dumps(value)
- return value
- except TypeError:
- return str(value)
-
- def _dataset_preview(self) -> List[Dict[str, Any]]:
- preview: List[Dict[str, Any]] = []
- for input_data, expected in list(self.dataset)[: self.dataset_preview_size]:
- preview.append(
- {"input": self._safe_json(input_data), "expected": str(expected)}
- )
- return preview
-
- def _index_combo_to_combo(
- self, idx_combo: Tuple[int, ...],
- ) -> Dict[str, ModelCandidate]:
- return {
- node: self._models[node][idx]
- for node, idx in zip(self._node_names, idx_combo)
- }
-
- def _build_prompt(self, preview: List[Dict[str, Any]]) -> str:
- # -- Build nodes info ------------------------------------------------
- nodes_info = []
- for node in self._node_names:
- node_entry: Dict[str, Any] = {"node_name": node}
- if self.node_descriptions and node in self.node_descriptions:
- node_entry["description"] = self.node_descriptions[node]
-
- candidates = []
- for c in self._models[node]:
- label = self._candidate_label(c)
- candidate_entry: Dict[str, Any] = {"name": label}
- price = get_model_price(label, custom_prices=self._custom_prices)
- if price is not None:
- candidate_entry["input_price_per_mtok"] = price[0]
- candidate_entry["output_price_per_mtok"] = price[1]
- candidates.append(candidate_entry)
- node_entry["candidates"] = candidates
- nodes_info.append(node_entry)
-
- # -- Build response example ------------------------------------------
- example = {
- "combination": {
- node: self._candidate_label(self._models[node][0])
- for node in self._node_names
- },
- "reasoning": "Your explanation here.",
- }
-
- # -- Assemble prompt -------------------------------------------------
- sections = [
- # Role & Task
- "# Task\n"
- "You are an expert AI model selector. You will be given a multi-agent "
- "workflow where each node can use one of several candidate LLMs. "
- "Your job is to select the best combination of models for the nodes.\n",
- # Objective
- "# The objective to target when selecting the model combination:\n"
- f"{self.objective}\n",
- # Agent Pipeline
- "# Agent Pipeline\n"
- "The agent has the following nodes and each can be assigned one of its candidate models.\n"
- f"```json\n{json.dumps(nodes_info, indent=2, ensure_ascii=True)}\n```\n",
- # Dataset Preview
- "# Dataset Preview\n"
- "Below are sample inputs and their expected outputs. Use these to understand "
- "the task complexity and choose models accordingly.\n"
- f"```json\n{json.dumps(preview, indent=2, ensure_ascii=True)}\n```\n",
- # Response Format
- "# Response Format\n"
- "Respond with a JSON object like this example:\n"
- f"```json\n{json.dumps(example, ensure_ascii=True)}\n```\n",
- # Constraints
- "# Constraints\n"
- "- Each key in `combination` must be a node name from the pipeline above.\n"
- "- Each value must be a candidate model name from that node's candidates list.\n"
- "- All nodes must be included.\n"
- "- Return exactly one combination.\n",
- ]
-
- prompt = "\n".join(sections)
-
- return prompt
-
- # ------------------------------------------------------------------
- # Parsing & proposer
- # ------------------------------------------------------------------
-
- def _parse_proposed_combination(self, text: str,) -> Optional[Tuple[int, ...]]:
- if not text.strip():
- return None
- try:
- payload = json.loads(text)
- except json.JSONDecodeError:
- logger.warning(
- "LMProposalModelSelector: proposer returned non-JSON output."
- )
- return None
-
- try:
- response = ProposalResponse.model_validate(payload)
- except ValidationError as e:
- logger.warning("LMProposalModelSelector: invalid response structure: %s", e)
- return None
-
- if set(response.combination.keys()) != set(self._node_names):
- logger.warning(
- "LMProposalModelSelector: response nodes don't match pipeline nodes."
- )
- return None
-
- indices: List[int] = []
- for node in self._node_names:
- model_name = response.combination[node]
- lookup = self._label_to_index.get(node, {})
- if model_name not in lookup:
- logger.warning(
- "LMProposalModelSelector: unknown model '%s' for node '%s'.",
- model_name,
- node,
- )
- return None
- indices.append(lookup[model_name])
-
- return tuple(indices)
-
- def _ask_proposer(self, max_retries: int = 3) -> Optional[Tuple[int, ...]]:
- preview = self._dataset_preview()
- prompt = self._build_prompt(preview)
- messages = [
- {
- "role": "system",
- "content": (
- "You are an expert model-selection assistant. "
- "Analyze the agent pipeline, candidate models, and dataset, "
- "then return a single JSON object with your recommended "
- "model combination."
- ),
- },
- {"role": "user", "content": prompt},
- ]
-
- for attempt in range(1, max_retries + 1):
- try:
- response = self.proposer_client.chat.completions.create(
- model=self.proposer_model,
- temperature=0.0,
- response_format={"type": "json_object"},
- messages=messages,
- )
- raw = response.choices[0].message.content or ""
- proposed = self._parse_proposed_combination(raw)
- if proposed is not None:
- return proposed
- logger.warning(
- "LM proposer attempt %d/%d: invalid response, retrying...",
- attempt,
- max_retries,
- )
- except Exception as e:
- logger.warning(
- "LM proposer attempt %d/%d failed: %s", attempt, max_retries, e,
- )
-
- logger.warning(
- "LM proposer exhausted all %d retries; falling back to defaults.",
- max_retries,
- )
- return None
diff --git a/src/agentopt/model_selection/matrix_ucb.py b/src/agentopt/model_selection/matrix_ucb.py
index b6f45b8..5955560 100644
--- a/src/agentopt/model_selection/matrix_ucb.py
+++ b/src/agentopt/model_selection/matrix_ucb.py
@@ -39,7 +39,7 @@
def _resolve_observation_budget_fraction(
observation_budget_fraction: float, sample_fraction: Optional[float],
) -> float:
- """Match RandomSearch/Bayesian: ``sample_fraction`` overrides ``observation_budget_fraction``."""
+ """Match Bayesian: ``sample_fraction`` overrides ``observation_budget_fraction``."""
if sample_fraction is not None:
s = float(sample_fraction)
if not 0 < s <= 1:
@@ -220,7 +220,7 @@ class MatrixUCBModelSelector(BaseModelSelector):
``select_best(..., max_concurrent=...)`` matters (``parallel`` is ignored).
``observation_budget_fraction`` or, equivalently, ``sample_fraction`` (same meaning
- as in :class:`RandomSearchModelSelector` / Bayesian: fraction of the search budget —
+ as in Bayesian optimization: fraction of the search budget —
here, **fraction of matrix cells** to observe) caps evaluations. ``1.0`` fills the
full grid; ``0.1`` stops after about 10% of cells. If both are passed, ``sample_fraction``
wins.
diff --git a/src/agentopt/model_selection/matrix_ucb_factorization.py b/src/agentopt/model_selection/matrix_ucb_factorization.py
deleted file mode 100644
index 54b91a2..0000000
--- a/src/agentopt/model_selection/matrix_ucb_factorization.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Copyright (c) 2025 Jin Peng Zhou, Christian K. Belardi, Ruihan Wu
-# SPDX-License-Identifier: MIT
-# Adapted from https://github.com/kilian-group/banditeval (banditeval/factorization.py)
-
-import torch
-from einops import rearrange
-
-
-class Factorization(torch.nn.Module):
- r"""Low-rank factorization ensemble :math:`X \approx UV^\top` with ALS.
-
- * :math:`X` has shape (**combinations** × **datapoints**), matching banditeval’s
- “methods × examples” layout.
- """
-
- def __init__(
- self,
- n_combos: int,
- n_datapoints: int,
- rank: int,
- ensemble_size: int,
- regularizer_weight: float = 0.00,
- drop_probability: float = 0.05,
- ) -> None:
- super().__init__()
- self.register_buffer("U", torch.randn(ensemble_size, n_combos, rank))
- self.register_buffer("V", torch.randn(ensemble_size, n_datapoints, rank))
- self.register_buffer("L", regularizer_weight * torch.eye(rank))
-
- self.n_combos = n_combos
- self.n_datapoints = n_datapoints
- self.rank = rank
- self.ensemble_size = ensemble_size
- self.regularizer_weight = regularizer_weight
- self.drop_probability = drop_probability
-
- def forward(self) -> torch.Tensor:
- return torch.bmm(self.U, self.V.transpose(1, 2))
-
- def _als_step(
- self, data_matrix: torch.Tensor, fixed_matrix: torch.Tensor
- ) -> torch.Tensor:
- non_zero_mask = (~torch.isnan(data_matrix)).float()
- y = fixed_matrix.unsqueeze(2)
- y_t = y.transpose(1, 2)
- A = (non_zero_mask.unsqueeze(2) * torch.bmm(y, y_t)).sum(0) + self.L
- b = (torch.nan_to_num(data_matrix * non_zero_mask) * y.squeeze(2)).sum(0)
- return torch.linalg.solve(A, b)
-
- def fit(self, X: torch.Tensor, iterations: int = 10) -> None:
- # X: (combinations, datapoints); einops axes e, m, n = ensemble, combo, datapoint
- X = X.unsqueeze(0).repeat(self.ensemble_size, 1, 1)
- if self.drop_probability > 0:
- mask = torch.rand_like(X) < self.drop_probability
- X[mask] = torch.nan
- X_u = rearrange(X, "e combo dp -> (e combo) dp 1")
- X_v = rearrange(X, "e combo dp -> (e dp) combo 1")
- vmap_als_step = torch.vmap(self._als_step, in_dims=(0, 0))
- for _ in range(iterations):
- self.V.data = (
- vmap_als_step(X_v, self.U.repeat(self.n_datapoints, 1, 1))
- ).reshape(self.ensemble_size, self.n_datapoints, self.rank)
- self.U.data = (
- vmap_als_step(X_u, self.V.repeat(self.n_combos, 1, 1))
- ).reshape(self.ensemble_size, self.n_combos, self.rank)
-
- def reset(self) -> None:
- torch.nn.init.normal_(self.U)
- torch.nn.init.normal_(self.V)
diff --git a/src/agentopt/model_selection/random_search.py b/src/agentopt/model_selection/random_search.py
deleted file mode 100644
index 59b22ba..0000000
--- a/src/agentopt/model_selection/random_search.py
+++ /dev/null
@@ -1,197 +0,0 @@
-"""
-Random-search model selection: evaluates a random subset of the Cartesian
-product of candidate models across all nodes.
-"""
-
-import asyncio
-import logging
-import math
-import random
-from typing import Any, Callable, Dict, List, Optional, Tuple
-
-from ..base_models import Dataset, EvalFn, ModelCandidate
-from .base import BaseModelSelector, ModelResult, SelectionResults
-
-logger = logging.getLogger(__name__)
-
-
-class RandomSearchModelSelector(BaseModelSelector):
- """
- Selects the best model combination from a random subset of candidates.
- """
-
- def __init__(
- self,
- agent: Any = None,
- models: Dict[str, List[ModelCandidate]] = None,
- eval_fn: EvalFn = None,
- dataset: Dataset = None,
- sample_fraction: float = 0.25,
- seed: Optional[int] = None,
- model_prices: Optional[Dict[str, Dict[str, float]]] = None,
- tracker=None,
- lambda_cost: float = 0.0,
- lambda_latency: float = 0.0,
- ) -> None:
- super().__init__(
- agent=agent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- model_prices=model_prices,
- tracker=tracker,
- lambda_cost=lambda_cost,
- lambda_latency=lambda_latency,
- )
- if not 0 < sample_fraction <= 1:
- raise ValueError("sample_fraction must be in the range (0, 1].")
- self.sample_fraction = sample_fraction
- self.seed = seed
-
- def _run_selection(
- self, parallel: bool = False, max_concurrent: int = 20,
- ) -> SelectionResults:
- if parallel:
- return asyncio.run(self._select_async(max_concurrent))
- return self._select_sequential()
-
- def _get_sampled_combinations(
- self,
- ) -> Tuple[List[Dict[str, ModelCandidate]], List[Dict[str, ModelCandidate]]]:
- """Return (all_combos, sampled_combos)."""
- all_combos = self._all_combos()
- total = len(all_combos)
- sample_size = max(1, math.ceil(total * self.sample_fraction))
- sample_size = min(sample_size, total)
-
- if sample_size == total:
- return all_combos, all_combos
-
- rng = random.Random(self.seed)
- sampled_indices = sorted(rng.sample(range(total), sample_size))
- sampled = [all_combos[i] for i in sampled_indices]
- return all_combos, sampled
-
- def _select_sequential(self) -> SelectionResults:
- all_combos, sampled = self._get_sampled_combinations()
-
- all_results: List[ModelResult] = []
-
- print(f"\n{'='*60}")
- print(
- f"Random search (sequential): "
- f"{len(sampled)}/{len(all_combos)} combinations "
- f"({self.sample_fraction:.1%} sample)"
- )
- print(f"{'='*60}\n")
-
- for idx, combo in enumerate(sampled, 1):
- combo_name = self._combo_name(combo)
- print(f" [{idx}/{len(sampled)}] Evaluating: {combo_name}")
-
- try:
- scores, latencies, dp_ids = self._evaluate_combo(
- combo, self.dataset, label=combo_name
- )
- result = self._build_combo_result(
- combo_name, scores, latencies, dp_ids,
- )
- print(f" {result}")
- all_results.append(result)
-
- except Exception as e:
- print(f" [{combo_name}] failed: {e}")
- all_results.append(
- self._make_result(
- model_name=combo_name,
- accuracy=0.0,
- latency_seconds=0.0,
- input_tokens={},
- output_tokens={},
- attribute="combination",
- is_best=False,
- )
- )
-
- self._finalize_combined_objectives(all_results)
- best_info = self._find_best(all_results)
- if best_info is not None:
- best_name, _ = best_info
- for result in all_results:
- if result.model_name == best_name:
- result.is_best = True
- break
- else:
- print("\n No sampled combinations succeeded")
-
- results = SelectionResults(results=all_results)
- return results
-
- async def _select_async(self, max_concurrent: int = 20) -> SelectionResults:
- all_combos, sampled = self._get_sampled_combinations()
-
- batch_size = len(self.dataset)
- n_combo, dp_concurrent = self._compute_concurrency(max_concurrent, batch_size)
- combo_sem = asyncio.Semaphore(n_combo)
-
- print(f"\n{'='*60}")
- print(
- f"Random search (async): "
- f"{len(sampled)}/{len(all_combos)} combinations "
- f"({self.sample_fraction:.1%} sample), "
- f"max {max_concurrent} total concurrent"
- )
- print(f"{'='*60}\n")
-
- async def _eval_combo(
- combo: Dict[str, ModelCandidate],
- ) -> Tuple[str, ModelResult]:
- async with combo_sem:
- combo_name = self._combo_name(combo)
- print(f" Evaluating: {combo_name}")
- scores, latencies, dp_ids = await self._evaluate_combo_async(
- combo, self.dataset, label=combo_name, max_concurrent=dp_concurrent
- )
- result = self._build_combo_result(
- combo_name, scores, latencies, dp_ids,
- )
- print(f" {result}")
- return combo_name, result
-
- combo_results = await asyncio.gather(
- *[_eval_combo(c) for c in sampled], return_exceptions=True,
- )
-
- all_results: List[ModelResult] = []
- for i, res in enumerate(combo_results):
- if isinstance(res, Exception):
- combo_name = self._combo_name(sampled[i])
- print(f" [{combo_name}] failed: {res}")
- all_results.append(
- self._make_result(
- model_name=combo_name,
- accuracy=0.0,
- latency_seconds=0.0,
- input_tokens={},
- output_tokens={},
- attribute="combination",
- is_best=False,
- )
- )
- else:
- _, result = res
- all_results.append(result)
-
- self._finalize_combined_objectives(all_results)
- best_info = self._find_best(all_results)
- if best_info is not None:
- best_name, _ = best_info
- for r in all_results:
- if r.model_name == best_name:
- r.is_best = True
- break
- else:
- print("\n No sampled combinations succeeded")
-
- results = SelectionResults(results=all_results)
- return results
diff --git a/src/agentopt/model_selection/threshold_successive_elimination.py b/src/agentopt/model_selection/threshold_successive_elimination.py
deleted file mode 100644
index 5c60d48..0000000
--- a/src/agentopt/model_selection/threshold_successive_elimination.py
+++ /dev/null
@@ -1,416 +0,0 @@
-"""
-Threshold-bandit successive elimination model selector.
-
-Classifies combinations as above/below a user-provided threshold.
-"""
-
-import asyncio
-import logging
-import math
-from typing import Any, Callable, Dict, List, Optional, Set, Tuple
-
-from ..base_models import Dataset, EvalFn, ModelCandidate
-from .base import BaseModelSelector, ModelResult, SelectionResults
-
-logger = logging.getLogger(__name__)
-
-
-class ThresholdBanditSEModelSelector(BaseModelSelector):
- """Select models via threshold-based successive elimination."""
-
- def __init__(
- self,
- agent: Any = None,
- models: Dict[str, List[ModelCandidate]] = None,
- eval_fn: EvalFn = None,
- dataset: Dataset = None,
- threshold: float = 0.75,
- n_initial: Optional[int] = None,
- confidence: float = 1.0,
- model_prices: Optional[Dict[str, Dict[str, float]]] = None,
- tracker=None,
- lambda_cost: float = 0.0,
- lambda_latency: float = 0.0,
- ) -> None:
- super().__init__(
- agent=agent,
- models=models,
- eval_fn=eval_fn,
- dataset=dataset,
- model_prices=model_prices,
- tracker=tracker,
- lambda_cost=lambda_cost,
- lambda_latency=lambda_latency,
- )
- n = len(self.dataset)
- if n_initial is None:
- self.n_initial = max(1, n // 10)
- else:
- self.n_initial = n_initial
- self.confidence = confidence
- self.threshold = threshold
-
- def _run_selection(
- self, parallel: bool = False, max_concurrent: int = 20,
- ) -> SelectionResults:
- if parallel:
- return asyncio.run(self._select_async(max_concurrent))
- return self._select_sequential()
-
- def _arm_objectives(
- self,
- idx: int,
- combo_scores: Dict[int, List[float]],
- combo_latencies: Dict[int, List[float]],
- combo_costs: Dict[int, List[Optional[float]]],
- ) -> List[float]:
- return self._compute_objectives(
- combo_scores[idx], combo_latencies[idx], combo_costs[idx]
- )
-
- def _select_sequential(self) -> SelectionResults:
- all_combos = self._all_combos()
- dataset_list = list(self.dataset)
- n_total = len(dataset_list)
-
- combo_scores: Dict[int, List[float]] = {i: [] for i in range(len(all_combos))}
- combo_latencies: Dict[int, List[float]] = {
- i: [] for i in range(len(all_combos))
- }
- combo_costs: Dict[int, List[Optional[float]]] = {
- i: [] for i in range(len(all_combos))
- }
- combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(len(all_combos))}
- active: Set[int] = set(range(len(all_combos)))
-
- print(f"\n{'='*60}")
- print(
- f"Threshold successive elimination (sequential): {len(all_combos)} "
- f"combinations, {n_total} samples, threshold={self.threshold}"
- )
- print(f"{'='*60}")
-
- offset = 0
- init_batch_size = min(self.n_initial, n_total)
- assert init_batch_size > 0
- init_batch = dataset_list[offset : offset + init_batch_size]
- print(
- f"\nInitial round [samples {offset}-{offset + init_batch_size}, "
- f"{len(active)} active]:"
- )
- for idx in sorted(active):
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = self._evaluate_combo(
- combo, init_batch, label=combo_name, dp_offset=offset
- )
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- mu, lcb, ucb = self._confidence_bounds(objs)
- print(
- f" {combo_name}: mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] "
- f"(n={len(objs)})"
- )
- offset += init_batch_size
-
- round_num = 1
-
- while active and offset < n_total:
- batch = [dataset_list[offset]]
- offset += 1
-
- print(
- f"\nRound {round_num} [sample {offset}/{n_total}, "
- f"{len(active)} active]:"
- )
-
- for idx in sorted(active):
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = self._evaluate_combo(
- combo, batch, label=combo_name, dp_offset=offset - 1
- )
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- mu, lcb, ucb = self._confidence_bounds(objs)
- print(
- f" {combo_name}: mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] "
- f"(n={len(objs)})"
- )
-
- newly_eliminated: Set[int] = set()
- for idx in active:
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- _, lcb, ucb = self._confidence_bounds(objs)
- # Classified wrt threshold, so this arm is no longer ambiguous.
- if ucb < self.threshold or lcb > self.threshold:
- newly_eliminated.add(idx)
-
- if newly_eliminated:
- for idx in sorted(newly_eliminated):
- combo_name = self._combo_name(all_combos[idx])
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- _, lcb, ucb = self._confidence_bounds(objs)
- side = "below" if ucb < self.threshold else "above"
- print(
- f" Eliminated: {combo_name} "
- f"(classified {side} threshold, "
- f"CI=[{lcb:.3f}, {ucb:.3f}])"
- )
- active -= newly_eliminated
- print(f" Ambiguous survivors: {len(active)} / {len(all_combos)}")
- else:
- print(
- f" No eliminations. Ambiguous survivors: "
- f"{len(active)} / {len(all_combos)}"
- )
-
- if not active:
- break
- round_num += 1
-
- return self._build_results(
- all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids
- )
-
- async def _select_async(self, max_concurrent: int = 20) -> SelectionResults:
- all_combos = self._all_combos()
- dataset_list = list(self.dataset)
- n_total = len(dataset_list)
-
- combo_scores: Dict[int, List[float]] = {i: [] for i in range(len(all_combos))}
- combo_latencies: Dict[int, List[float]] = {
- i: [] for i in range(len(all_combos))
- }
- combo_costs: Dict[int, List[Optional[float]]] = {
- i: [] for i in range(len(all_combos))
- }
- combo_dp_ids: Dict[int, List[str]] = {i: [] for i in range(len(all_combos))}
- active: Set[int] = set(range(len(all_combos)))
-
- print(f"\n{'='*60}")
- print(
- f"Threshold successive elimination (async): {len(all_combos)} "
- f"combinations, {n_total} samples, threshold={self.threshold}, "
- f"max {max_concurrent} total concurrent"
- )
- print(f"{'='*60}")
-
- offset = 0
- init_batch_size = min(self.n_initial, n_total)
- assert init_batch_size > 0
- init_batch = dataset_list[offset : offset + init_batch_size]
- n_combo_init, dp_concurrent_init = self._compute_concurrency(
- max_concurrent, init_batch_size
- )
- init_combo_sem = asyncio.Semaphore(n_combo_init)
- print(
- f"\nInitial round [samples {offset}-{offset + init_batch_size}, "
- f"{len(active)} active]:"
- )
-
- async def _eval_initial(
- idx: int,
- ) -> Tuple[int, List[float], List[float], List[str]]:
- async with init_combo_sem:
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = await self._evaluate_combo_async(
- combo,
- init_batch,
- label=combo_name,
- max_concurrent=dp_concurrent_init,
- dp_offset=offset,
- )
- return idx, scores, latencies, dp_ids
-
- init_results = await asyncio.gather(
- *[_eval_initial(idx) for idx in sorted(active)], return_exceptions=True,
- )
-
- for res in init_results:
- if isinstance(res, Exception):
- logger.warning("Initial batch evaluation error: %s", res)
- continue
- idx, scores, latencies, dp_ids = res
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- mu, lcb, ucb = self._confidence_bounds(objs)
- print(
- f" {self._combo_name(all_combos[idx])}: "
- f"mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] "
- f"(n={len(objs)})"
- )
- offset += init_batch_size
-
- round_num = 1
- # Per-round batch_size is always 1, so compute once
- n_combo_round, dp_concurrent_round = self._compute_concurrency(
- max_concurrent, 1
- )
- round_combo_sem = asyncio.Semaphore(n_combo_round)
-
- while active and offset < n_total:
- batch = [dataset_list[offset]]
- offset += 1
-
- print(
- f"\nRound {round_num} [sample {offset}/{n_total}, "
- f"{len(active)} active]:"
- )
-
- async def _eval_batch(
- idx: int,
- ) -> Tuple[int, List[float], List[float], List[str]]:
- async with round_combo_sem:
- combo = all_combos[idx]
- combo_name = self._combo_name(combo)
- scores, latencies, dp_ids = await self._evaluate_combo_async(
- combo,
- batch,
- label=combo_name,
- max_concurrent=dp_concurrent_round,
- dp_offset=offset - 1,
- )
- return idx, scores, latencies, dp_ids
-
- round_results = await asyncio.gather(
- *[_eval_batch(idx) for idx in sorted(active)], return_exceptions=True,
- )
-
- for res in round_results:
- if isinstance(res, Exception):
- logger.warning("Batch evaluation error: %s", res)
- continue
- idx, scores, latencies, dp_ids = res
- costs = self._observe_combo(scores, latencies, dp_ids)
- combo_scores[idx].extend(scores)
- combo_latencies[idx].extend(latencies)
- combo_costs[idx].extend(costs)
- combo_dp_ids[idx].extend(dp_ids)
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- mu, lcb, ucb = self._confidence_bounds(objs)
- print(
- f" {self._combo_name(all_combos[idx])}: "
- f"mu={mu:.3f}, [{lcb:.3f}, {ucb:.3f}] "
- f"(n={len(objs)})"
- )
-
- newly_eliminated: Set[int] = set()
- for idx in active:
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- _, lcb, ucb = self._confidence_bounds(objs)
- if ucb < self.threshold or lcb > self.threshold:
- newly_eliminated.add(idx)
-
- if newly_eliminated:
- for idx in sorted(newly_eliminated):
- combo_name = self._combo_name(all_combos[idx])
- objs = self._arm_objectives(
- idx, combo_scores, combo_latencies, combo_costs
- )
- _, lcb, ucb = self._confidence_bounds(objs)
- side = "below" if ucb < self.threshold else "above"
- print(
- f" Eliminated: {combo_name} "
- f"(classified {side} threshold, "
- f"CI=[{lcb:.3f}, {ucb:.3f}])"
- )
- active -= newly_eliminated
- print(f" Ambiguous survivors: {len(active)} / {len(all_combos)}")
- else:
- print(
- f" No eliminations. Ambiguous survivors: "
- f"{len(active)} / {len(all_combos)}"
- )
-
- if not active:
- break
- round_num += 1
-
- return self._build_results(
- all_combos, combo_scores, combo_latencies, combo_costs, combo_dp_ids
- )
-
- def _confidence_bounds(self, values: List[float]) -> Tuple[float, float, float]:
- n = len(values)
- if n == 0:
- return 0.0, float("-inf"), float("inf")
- mu, std = self._compute_stats(values)
- se = std / math.sqrt(n)
- radius = self.confidence * se
- return mu, mu - radius, mu + radius
-
- def _build_results(
- self,
- all_combos: List[Dict[str, ModelCandidate]],
- combo_scores: Dict[int, List[float]],
- combo_latencies: Dict[int, List[float]],
- combo_costs: Dict[int, List[Optional[float]]],
- combo_dp_ids: Dict[int, List[str]],
- ) -> SelectionResults:
- all_results: List[ModelResult] = []
- for idx, combo in enumerate(all_combos):
- combo_name = self._combo_name(combo)
- scores = combo_scores[idx]
- if scores:
- all_results.append(
- self._build_combo_result(
- combo_name,
- scores,
- combo_latencies[idx],
- combo_dp_ids[idx],
- costs=combo_costs[idx],
- )
- )
- else:
- all_results.append(
- self._make_result(
- model_name=combo_name,
- accuracy=0.0,
- latency_seconds=0.0,
- input_tokens={},
- output_tokens={},
- attribute="combination",
- is_best=False,
- )
- )
-
- self._finalize_combined_objectives(all_results)
- best_info = self._find_best(all_results)
- if best_info is not None:
- best_name, _ = best_info
- for result in all_results:
- if result.model_name == best_name:
- result.is_best = True
- break
- else:
- print("\n No combinations succeeded.")
-
- return SelectionResults(results=all_results)
diff --git a/src/agentopt/model_topology.py b/src/agentopt/model_topology.py
index 5dc4c6a..e8cc9ac 100644
--- a/src/agentopt/model_topology.py
+++ b/src/agentopt/model_topology.py
@@ -1,8 +1,8 @@
"""
Model topology: general-impression rankings of LLM models by quality and speed.
-These rankings encode a rough ordering (not benchmark-precise) so that search
-algorithms like hill climbing can define "neighbors" in model space.
+These rankings encode a rough ordering (not benchmark-precise) of models by
+quality and speed.
"""
from typing import List, Optional