Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ Any output classified DOOM or FRAGILE also gets a "bottom line" callout at the t

The script does **not** invent thresholds for outputs the user did not declare. To get a verdict on an output, declare a threshold on it in the Monte Carlo settings file.

## Audience and tone

The primary consumer of `insights.md` is **downstream AI** (another agent, a planning loop, a follow-on extractor), not a human reader skimming for 7 seconds. A project manager may read it secondarily, but the writing optimises for token-density of useful signal over engagement hooks.

What that means concretely:

- No reader-engagement prefixes ("If you read nothing else, read this", "Stop and pay attention", "Important:"). The structural markers (`## Bad news first`, `### Likely deal-breakers`, the verdict labels) already carry that weight; restating them in prose burns tokens.
- No filler sentences whose only job is to motivate the next sentence. Lead with the substantive claim.
- Keep substantive explanations (what a verdict label means, what a column shows, what makes an item belong in a section). Those are signal, not filler.
- Don't apologise for or hedge the bad news. State it.

## Writing rules — apply to the script's output AND to anything you say back to the user about the insights

These are not stylistic preferences. They are how this skill is meant to communicate.
Expand All @@ -61,7 +72,7 @@ These are not stylistic preferences. They are how this skill is meant to communi

2. **No sugar-coating.** A 5% pass probability is "almost certainly fails", not "shows some challenges". A negative base-scenario number is "the plan is in trouble at its own central assumptions", not "may warrant further attention". Use the strongest accurate language; if the script's wording softens a result, fix the script.

3. **No sycophancy.** Never start a paragraph with "Great plan, but..." or "The team has done strong work; one concern is...". The reader has the plan in front of them. They do not need praise from the report.
3. **No sycophancy.** Never start a paragraph with "Great plan, but..." or "The team has done strong work; one concern is...". The downstream consumer has the plan available. It does not need praise from the report.

4. **No hedging phrases.** Banned in both the script's emitted text and in conversational reporting:
- `the honest read is`, `frankly`, `to be fair`, `in fairness`, `candidly`, `let's be real`, `look, the truth is`
Expand Down
223 changes: 223 additions & 0 deletions experiments/napkin_math/run_monte_carlo.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,82 @@ def safe_pearson(x: np.ndarray, y: np.ndarray) -> float | None:
return r


def is_min_aggregate(entry: dict) -> bool:
"""Identify aggregates whose magnitude is not a usual surplus."""
hint = (entry or {}).get("formula_hint") or ""
return "min(" in hint


def collect_min_aggregates(params: dict) -> dict[str, list[str]]:
"""Map output_name -> ordered list of dependency output_names for min() aggregates."""
out: dict[str, list[str]] = {}
for src in ("recommended_first_calculations", "derived_questions"):
for entry in params.get(src, []):
if is_min_aggregate(entry) and entry.get("output_name"):
out[entry["output_name"]] = list(entry.get("depends_on") or [])
return out


def threshold_passes(arr: np.ndarray, op: str, value: float) -> np.ndarray:
"""Boolean mask of runs where the threshold passes, NaN runs counted as fails."""
finite_mask = np.isfinite(arr)
pass_mask = np.zeros_like(arr, dtype=bool)
pass_mask[finite_mask] = THRESHOLD_OPS[op](arr[finite_mask], value)
return pass_mask


def quartile_pass_rates(input_arr: np.ndarray, threshold_pass: np.ndarray) -> dict | None:
"""For one input × one threshold, return P(pass | input in bottom/top quartile)."""
if np.ptp(input_arr) == 0 or input_arr.size < 100:
return None
q1 = np.percentile(input_arr, 25)
q3 = np.percentile(input_arr, 75)
bottom_mask = input_arr <= q1
top_mask = input_arr >= q3
if bottom_mask.sum() < 20 or top_mask.sum() < 20:
return None
p_low = float(threshold_pass[bottom_mask].mean())
p_high = float(threshold_pass[top_mask].mean())
return {"p_pass_low_quartile": p_low,
"p_pass_high_quartile": p_high,
"delta_pp": round((p_high - p_low) * 100, 2)}


def required_input_percentile(input_arr: np.ndarray, threshold_pass: np.ndarray,
target_prob: float = 0.80) -> dict | None:
"""For a failing threshold, find which percentile of the input is required
for conditional pass-probability to reach the target. Returns the direction
(the input has to stay below / above that percentile)."""
if np.ptp(input_arr) == 0 or input_arr.size < 100:
return None
# Try both directions. The actionable one is the one that gives the looser bound.
best = None
for direction in ("above", "below"):
for pct in (5, 10, 15, 25, 33, 50, 67, 75, 85, 90, 95):
if direction == "above":
cutoff = float(np.percentile(input_arr, pct))
mask = input_arr >= cutoff
else:
cutoff = float(np.percentile(input_arr, 100 - pct))
mask = input_arr <= cutoff
if mask.sum() < 20:
continue
cond_p = float(threshold_pass[mask].mean())
if cond_p >= target_prob:
# Looser bound = larger admitted fraction (1 - pct/100 for above; (100-pct)/100 for below)
admitted = mask.mean()
if best is None or admitted > best["admitted_fraction"]:
best = {
"direction": direction,
"input_percentile_cutoff": pct,
"cutoff_value": cutoff,
"conditional_pass_prob": round(cond_p, 4),
"admitted_fraction": round(float(admitted), 4),
}
break # take loosest pct in this direction
return best


def run(params_path: Path, bounds_path: Path, calc_path: Path,
settings_path: Path | None, output_path: Path) -> dict:
params = load_json(params_path)
Expand Down Expand Up @@ -281,6 +357,8 @@ def run(params_path: Path, bounds_path: Path, calc_path: Path,
name: list(inspect.signature(fn).parameters) for name, _, fn in plan
}
fn_lookup: dict[str, Any] = {name: fn for name, _, fn in plan}
min_aggregates = collect_min_aggregates(params)
binding_dep_arrays: dict[str, list[str | None]] = {name: [None] * n_runs for name in min_aggregates}

for vid, spec in input_specs.items():
if "fixed" in spec:
Expand Down Expand Up @@ -313,6 +391,11 @@ def run(params_path: Path, bounds_path: Path, calc_path: Path,
continue
output_arrays[name][i] = val
pool[name] = float(val)
if name in min_aggregates:
for dep in min_aggregates[name]:
if dep in pool and pool[dep] == val:
binding_dep_arrays[name][i] = dep
break

for name, counts in exception_counts.items():
for exc_name, count in counts.items():
Expand Down Expand Up @@ -394,6 +477,141 @@ def run(params_path: Path, bounds_path: Path, calc_path: Path,
"top_inputs": [{"id": vid, "correlation": round(r, 4)} for vid, r in candidates[:5]]
}

binding_gate_analysis: dict[str, dict] = {}
for agg_name, deps in min_aggregates.items():
if agg_name not in thresholds_section:
continue
t = thresholds_section[agg_name]
agg_arr = output_arrays[agg_name]
fail_mask = ~threshold_passes(agg_arr, t["operator"], t["value"]) & np.isfinite(agg_arr)
if fail_mask.sum() == 0:
continue
counts: dict[str, int] = {dep: 0 for dep in deps}
for i in range(n_runs):
if not fail_mask[i]:
continue
d = binding_dep_arrays[agg_name][i]
if d is not None:
counts[d] = counts.get(d, 0) + 1
total = sum(counts.values())
if total == 0:
continue
binding_gate_analysis[agg_name] = {
"fail_count": int(fail_mask.sum()),
"binding_when_aggregate_fails": [
{"dependency": d, "frequency": round(c / total, 4)}
for d, c in sorted(counts.items(), key=lambda x: -x[1]) if c > 0
],
}

quartile_analysis: dict[str, list[dict]] = {}
required_input_thresholds: dict[str, list[dict]] = {}
for output_id, t in thresholds_section.items():
if t.get("probability") is None:
continue
pass_arr = threshold_passes(output_arrays[output_id], t["operator"], t["value"])
all_used = set(fn_signatures.get(output_id, []))
changed = True
while changed:
changed = False
for other_name, other_args in fn_signatures.items():
if other_name in all_used and not set(other_args).issubset(all_used):
all_used.update(other_args)
changed = True
rows = []
req_rows = []
for vid in input_arrays:
if vid not in all_used or vid not in input_specs or "bound" not in input_specs[vid]:
continue
q = quartile_pass_rates(input_arrays[vid], pass_arr)
if q is None:
continue
rows.append({"id": vid, **q})
if t["probability"] is not None and t["probability"] < 0.80:
req = required_input_percentile(input_arrays[vid], pass_arr)
if req is not None:
req_rows.append({"id": vid, **req})
if rows:
rows.sort(key=lambda x: -abs(x["delta_pp"]))
quartile_analysis[output_id] = rows[:5]
if req_rows:
req_rows.sort(key=lambda x: -x["admitted_fraction"])
required_input_thresholds[output_id] = req_rows[:5]

missing_value_priority: list[dict] = []
missing_ids = [mv["id"] for mv in params.get("missing_values_to_estimate", [])]
for vid in missing_ids:
if vid not in input_arrays:
continue
bound = bounds.get(vid, {})
low, high, base = bound.get("low", 0), bound.get("high", 0), bound.get("base", 0)
denom = max(abs(base), 1e-9) if base else max(abs(high - low), 1e-9)
bound_width_ratio = abs(high - low) / denom
score = 0.0
worst_threshold: str | None = None
for output_id, t in thresholds_section.items():
p = t.get("probability")
if p is None:
continue
rows = quartile_analysis.get(output_id, [])
entry = next((r for r in rows if r["id"] == vid), None)
if entry is None:
continue
impact = abs(entry["delta_pp"]) * (1.0 - p) * bound_width_ratio
if impact > score:
score = impact
worst_threshold = output_id
if worst_threshold:
missing_value_priority.append({
"id": vid,
"score": round(score, 4),
"worst_gate": worst_threshold,
"bound_width_ratio": round(bound_width_ratio, 4),
"source": bound.get("source", "assumption"),
})
missing_value_priority.sort(key=lambda x: -x["score"])

model_confidence: dict[str, dict] = {}
for output_id in outputs_section:
used: set[str] = set(fn_signatures.get(output_id, []))
changed = True
while changed:
changed = False
for other_name, other_args in fn_signatures.items():
if other_name in used and not set(other_args).issubset(used):
used.update(other_args)
changed = True
bound_inputs = [vid for vid in used if vid in input_specs and "bound" in input_specs[vid]]
if not bound_inputs:
model_confidence[output_id] = {"grade": "HIGH",
"reasons": ["all inputs are fixed values"]}
continue
data_n = sum(1 for vid in bound_inputs if bounds.get(vid, {}).get("source") == "data")
assumption_n = len(bound_inputs) - data_n
data_fraction = data_n / len(bound_inputs)
widths = []
for vid in bound_inputs:
b = bounds.get(vid, {})
base = b.get("base", 0)
denom = max(abs(base), 1e-9) if base else max(abs(b.get("high", 0) - b.get("low", 0)), 1e-9)
widths.append(abs(b.get("high", 0) - b.get("low", 0)) / denom)
avg_width = sum(widths) / len(widths)
reasons: list[str] = []
if data_fraction >= 0.70 and avg_width < 0.5:
grade = "HIGH"
elif data_fraction < 0.30 or avg_width > 1.5:
grade = "LOW"
else:
grade = "MEDIUM"
reasons.append(f"{data_n}/{len(bound_inputs)} input bounds anchored in data; {assumption_n}/{len(bound_inputs)} are assumptions")
reasons.append(f"average bound-width-to-base ratio = {avg_width:.2f}")
model_confidence[output_id] = {
"grade": grade,
"data_source_fraction": round(data_fraction, 4),
"average_bound_width_ratio": round(avg_width, 4),
"reasons": reasons,
}

result = {
"valid": True,
"plan_summary": {
Expand All @@ -408,6 +626,11 @@ def run(params_path: Path, bounds_path: Path, calc_path: Path,
"outputs": outputs_section,
"thresholds": thresholds_section,
"sensitivity": sensitivity_section,
"quartile_analysis": quartile_analysis,
"binding_gate_analysis": binding_gate_analysis,
"required_input_thresholds": required_input_thresholds,
"missing_value_priority": missing_value_priority,
"model_confidence": model_confidence,
"warnings": [
{"stage": "monte_carlo", "run": None, "calculation": None,
"message": msg, "severity": "WARN"}
Expand Down
Loading