diff --git a/Makefile b/Makefile index b51d485..460b18e 100644 --- a/Makefile +++ b/Makefile @@ -64,10 +64,15 @@ fmt-check: fi @echo "==> gofmt -s OK" -# Typos in source (same family of checks as Go Report Card "misspell"; no extra install — uses go run) +# Typos in tracked source only (skips node_modules, .next, and other gitignored paths). spell: @echo "==> misspell" - go run github.com/client9/misspell/cmd/misspell@latest -error . + @files=$$(git ls-files -z -- '*.go' '*.md' '*.yaml' '*.yml'); \ + if [ -z "$$files" ]; then \ + echo "No files to spell-check"; \ + else \ + printf '%s' "$$files" | xargs -0 go run github.com/client9/misspell/cmd/misspell@latest -error; \ + fi # Run linters (gofmt -s, misspell, go vet + golangci-lint). # Use golangci-lint v2 when go.mod is 1.26+ — v1.x binaries error on newer language targets. diff --git a/README.md b/README.md index 94efd3f..911771b 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ - [Development](#development) - [Code Coverage](#code-coverage) - [Setup and run examples](#setup-and-run-examples) +- [Benchmarks](#benchmarks) - [Production Readiness Checklist](#production-readiness-checklist) - [Disclaimer](#disclaimer) @@ -1244,6 +1245,12 @@ Or run with a custom config path: `go run ./cmd -config /path/to/config.yaml`. See **[cmd/README.md](cmd/README.md)** for CLI details and env vars. +## Benchmarks + +Config-driven benchmark suite to measure agent performance in your environment. + +See [benchmarks/README.md](benchmarks/README.md). + ## Production Readiness Checklist - **Run and approval limits** — Use `WithTimeout` and/or a context deadline on `Run` / `Stream`; use `WithApprovalTimeout` when tools require approval (activity retry counts inside workflows are fixed in the SDK, not user-tunable). diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000..5504697 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,324 @@ +# Massive Concurrency & Scale Benchmark + +This directory contains a standalone performance utility for the Go Agent SDK. It runs real `pkg/agent` execution loops under configurable load—mock LLM and tools by default—so you can measure latency, memory, CPU, token counts, and success rate without external API keys. + +Use it to stress-test orchestration behavior (multi-turn runs, tool batches, sub-agents, local vs Temporal runtime) before pointing the same harness at real LLMs and tools. + +--- + +## The Core Purpose + +Most agent benchmarks focus on token throughput alone. Production workloads also depend on how well the **orchestration layer** scales: many concurrent runs, multi-turn tool loops, sub-agent delegation, durable Temporal workflows, and stable memory use over hundreds of executions. + +This benchmark exercises the SDK’s actual agent engine (`agent.NewAgent`, `Run`) with: + +- Configurable run count and concurrency +- Mock or (later) real LLM + tool backends +- Optional sub-agent trees +- Local in-process runtime or Temporal with optional external workers +- Structured metrics and reports for comparison across config changes + +--- + +## How It Works + +Each **run** calls `agent.Run()` once on a shared root agent instance. The mock LLM follows a fixed multi-turn script: + +1. **Turn 1** — returns tool calls for all registered tools (benchmark tools and sub-agent tools when configured). +2. **Turn 2** — returns a final text response after tool results are applied. + +Mock components apply configurable latency and jitter so results reflect realistic timing, not instant stubs: + +| Component | Behavior | +| :--- | :--- | +| **Mock LLM** | `Generate` with base latency + jitter; reports fixed token usage per call (`mock_tokens`, split into input/output). | +| **Mock tools** | `benchmark_tool_1` … `benchmark_tool_N` with base latency + jitter. | +| **Sub-agents** | Built as real SDK sub-agents (`subagent-1`, `subagent-1.1`, …); each runs the same mock script inside its own agent loop. | +| **Tool execution mode** | `sequential` or `parallel` maps to `agent.WithAgentToolExecutionMode`. | + +**Concurrency:** one root agent is reused for all runs. When `concurrent: true`, runs execute in batches of `concurrent_count` (goroutines), each batch waiting for the previous batch to finish. + +**Metrics collected per benchmark session:** + +- Latency p50 / p95 / p99 / avg (wall-clock per `Run()`) +- Heap and total allocation delta +- Process CPU time +- Total input/output tokens (from mock LLM stats; includes sub-agent LLM calls) +- Success rate (`Run()` completed without error) +- `est_cost_usd` — placeholder `0` until pricing is configured + +Reports are written to `benchmarks/reports/` (JSON or text). SDK logs (optional) go to `benchmarks/logs/`. + +--- + +## Running the Benchmark + +Run from the **repository root** (`agent-sdk-go/`): + +### Quick run (default config) + +Uses `benchmarks/config.yaml` (100 sequential runs, local runtime, 3 tools, 2 sub-agents): + +```bash +go run ./benchmarks/ +``` + +### Custom config file + +```bash +go run ./benchmarks/ -config benchmarks/config.yaml +go run ./benchmarks/ -config /path/to/my-benchmark.yaml +``` + +### Command-line flags + +| Flag | Default | Description | +| :--- | :--- | :--- | +| `-config` | `benchmarks/config.yaml` | Path to YAML config (searches `benchmarks/config.yaml` or `./config.yaml` if unset). | + +All other settings are controlled via the YAML file. Edit `benchmarks/config.yaml` (or copy it) and re-run with `-config`. + +### Example scenarios + +**Fast local smoke test** — reduce runs and latency in a copy of the config: + +```yaml +runtime: local +llm: + latency_ms: 5 + jitter_ms: 0 +tool: + latency_ms: 2 + jitter_ms: 0 +agent: + runs: 10 + concurrent: false + tools: + count: 2 + execution: parallel + subagents: + count: 0 + levels: 0 +``` + +```bash +go run ./benchmarks/ -config /tmp/fast-benchmark.yaml +``` + +**Concurrent batch runs:** + +```yaml +agent: + runs: 100 + concurrent: true + concurrent_count: 10 # 10 runs in parallel per batch +``` + +**Temporal runtime** — requires a running Temporal server (`localhost:7233` by default): + +```yaml +runtime: temporal +temporal: + host: localhost + port: 7233 + namespace: default + task_queue: agent-sdk-go + workers_count: 0 # embedded worker in agent process only +``` + +```bash +go run ./benchmarks/ -config benchmarks/config.yaml +``` + +**External root workers** (`workers_count: 1+`) — benchmark spawns separate worker processes and enables `EnableRemoteWorkers()` on the root agent. Embedded local workers still run for the root agent and all sub-agents (sub-agents always use embedded workers on their own task queues). + +```yaml +runtime: temporal +temporal: + workers_count: 2 +``` + +Workers are started automatically and stopped when the benchmark finishes. You can also run a worker manually: + +```bash +go run ./benchmarks/worker -config benchmarks/config.yaml -worker-id 1 +``` + +**Debug logging** — SDK logs to timestamped files under `benchmarks/logs/`: + +```yaml +logger: + enabled: true + dir: benchmarks/logs + level: debug # debug | info | warn | error +``` + +Log files: `agent_.log`, `worker_1_.log`, … + +--- + +## Configuration reference + +All paths in config (`dir` fields) are relative to the **repository root** unless absolute. + +### `runtime` + +| Value | Description | +| :--- | :--- | +| `local` | In-process SDK runtime (default). No Temporal server required. | +| `temporal` | Durable execution via Temporal. Server must be running before the benchmark. | + +### `temporal` + +| Field | Description | +| :--- | :--- | +| `host` | Temporal server host (default `localhost`). | +| `port` | gRPC port (default `7233`). | +| `namespace` | Temporal namespace (default `default`). | +| `task_queue` | Root agent task queue (default `agent-sdk-go`). Sub-agents use `{task_queue}-subagent-*` suffixes. | +| `workers_count` | `0` = embedded worker only. `1+` = spawn that many external root worker processes (Temporal only). Ignored when `runtime: local`. | + +### `llm` + +| Field | Description | +| :--- | :--- | +| `latency_ms` | Base delay per mock LLM `Generate` call. | +| `jitter_ms` | Random extra delay `[0, jitter_ms]` added on top of base latency. | +| `mock_tokens` | Total tokens reported per LLM call (split ~60% input / ~40% output). | + +### `tool` + +| Field | Description | +| :--- | :--- | +| `latency_ms` | Base delay per mock tool execution. | +| `jitter_ms` | Random extra delay `[0, jitter_ms]` on tool execution. | + +### `agent` + +| Field | Description | +| :--- | :--- | +| `runs` | Number of `Run()` calls on the root agent. | +| `concurrent` | `false` = runs one after another; `true` = batched parallel runs. | +| `concurrent_count` | Max parallel runs per batch when `concurrent: true`. | +| `tools.count` | Number of mock tools (`benchmark_tool_1` … `benchmark_tool_N`). | +| `tools.execution` | `sequential` or `parallel` — SDK tool batch execution mode. | +| `subagents.count` | Sub-agents per level (0 to disable). | +| `subagents.levels` | Max sub-agent nesting depth (1–5). | + +### `logger` + +| Field | Description | +| :--- | :--- | +| `enabled` | `true` writes JSON SDK logs to files; `false` discards SDK logs. | +| `dir` | Log directory (default `benchmarks/logs`). | +| `level` | `debug`, `info`, `warn`, or `error`. | + +### `output` + +| Field | Description | +| :--- | :--- | +| `console` | Print report to stdout when `true`. | +| `file` | Write timestamped report file when `true`. | +| `dir` | Report directory (default `benchmarks/reports`). | +| `format` | `json` or `text`. | + +### Sample output + +#### Text (`output.format: text`) + +``` +=== Benchmark Report === +Runtime : local +Concurrent : false +Total runs : 100 +Tools : 3 (sequential) +Sub-agents : 2 (levels 1) +--- +Latency p50 (ms) : 245.00 +Latency p95 (ms) : 312.00 +Latency p99 (ms) : 389.00 +Latency avg (ms) : 250.00 +Heap alloc (B) : 12345678 +Total alloc (B) : 98765432 +CPU time (ms) : 1500.00 +Input tokens : 50000 +Output tokens : 33333 +Est. cost (USD) : 0.0000 # pricing placeholder +Success rate (%) : 100.00 +``` + +#### JSON (`output.format: json`) + +Written to `benchmarks/reports/benchmark_.json`: + +```json +{ + "runtime": "local", + "generated_at": "2026-06-06T03:23:33Z", + "config": { + "Runtime": "local", + "Temporal": { + "Host": "localhost", + "Port": 7233, + "Namespace": "default", + "TaskQueue": "agent-sdk-go", + "WorkersCount": 0 + }, + "LLM": { + "LatencyMs": 200, + "JitterMs": 50, + "MockTokens": 500 + }, + "Tool": { + "LatencyMs": 50, + "JitterMs": 10 + }, + "Agent": { + "Runs": 100, + "Concurrent": false, + "ConcurrentCount": 10, + "Tools": { + "Count": 3, + "Execution": "sequential" + }, + "Subagents": { + "Count": 2, + "Levels": 1 + } + }, + "Logger": { + "Enabled": false, + "Dir": "benchmarks/logs", + "Level": "info" + }, + "Output": { + "Console": true, + "File": true, + "Dir": "benchmarks/reports", + "Format": "json" + } + }, + "metrics": { + "p50_ms": 245, + "p95_ms": 312, + "p99_ms": 389, + "avg_ms": 250, + "heap_alloc_bytes": 12345678, + "total_alloc_bytes": 98765432, + "cpu_time_ms": 1500, + "total_input_tokens": 50000, + "total_output_tokens": 33333, + "est_cost_usd": 0, + "total_runs": 100, + "success_rate": 100 + } +} +``` + +--- + +## Note + +LLM and tool calls are **mocked by default** with configurable latency and fixed token counts to keep results reproducible and free of API cost. Latency percentiles, memory, and CPU reflect real SDK orchestration overhead under that simulation. + +When you swap in **real LLMs and tools**, metrics will change: latency follows network and model speed, token counts come from provider usage, and cost requires your own pricing model (the benchmark leaves `est_cost_usd` at `0` until configured). The harness structure—runs, concurrency, reporting, Temporal workers—stays the same; only the LLM client and tool registry need to be replaced in `benchmarks/setup/`. diff --git a/benchmarks/agent_builder.go b/benchmarks/agent_builder.go new file mode 100644 index 0000000..1103e6e --- /dev/null +++ b/benchmarks/agent_builder.go @@ -0,0 +1,57 @@ +package main + +import ( + "fmt" + "time" + + "github.com/agenticenv/agent-sdk-go/benchmarks/setup" + "github.com/agenticenv/agent-sdk-go/pkg/agent" + "github.com/agenticenv/agent-sdk-go/pkg/logger" +) + +type AgentBundle struct { + Root *agent.Agent + All []*agent.Agent +} + +func buildAgentBundle(cfg *setup.Config, llm *setup.MockLLMClient, lgr logger.Logger, tree *setup.AgentTree) (*AgentBundle, error) { + enableRemote := cfg.ExternalWorkersEnabled() + opts := setup.RootOptions(cfg, llm, lgr, setup.RootAgentName, tree.RootPrompt, tree.SubAgents, cfg.Temporal.TaskQueue, enableRemote) + + root, err := agent.NewAgent(opts...) + if err != nil { + return nil, err + } + if cfg.UseTemporal() { + time.Sleep(300 * time.Millisecond) + } + + all := append([]*agent.Agent{root}, tree.Created...) + return &AgentBundle{Root: root, All: all}, nil +} + +func buildAgentPool(cfg *setup.Config, llm *setup.MockLLMClient, lgr logger.Logger, size int) ([]*AgentBundle, error) { + if size <= 0 { + size = 1 + } + bundles := make([]*AgentBundle, 0, size) + for i := 0; i < size; i++ { + tree, err := setup.BuildAgentTree(cfg, llm, lgr) + if err != nil { + for _, b := range bundles { + setup.CloseAgents(b.All) + } + return nil, fmt.Errorf("agent tree index %d: %w", i, err) + } + bundle, err := buildAgentBundle(cfg, llm, lgr, tree) + if err != nil { + setup.CloseAgents(tree.Created) + for _, b := range bundles { + setup.CloseAgents(b.All) + } + return nil, fmt.Errorf("agent pool index %d: %w", i, err) + } + bundles = append(bundles, bundle) + } + return bundles, nil +} diff --git a/benchmarks/config.yaml b/benchmarks/config.yaml new file mode 100644 index 0000000..ad5781c --- /dev/null +++ b/benchmarks/config.yaml @@ -0,0 +1,41 @@ +runtime: local # local or temporal + +temporal: + host: localhost + port: 7233 + namespace: default + task_queue: agent-sdk-go + workers_count: 0 # 0 = embedded worker only; 1+ = spawn external root workers + +llm: + latency_ms: 200 + jitter_ms: 50 # random +/- added to latency + mock_tokens: 500 + +tool: + latency_ms: 50 + jitter_ms: 10 + +agent: + runs: 100 + concurrent: false + concurrent_count: 10 # only if concurrent: true + + tools: + count: 3 + execution: parallel # sequential or parallel + + subagents: + count: 2 + levels: 1 # 1 or 2 + +logger: + enabled: false # true writes SDK logs under benchmarks/logs + dir: benchmarks/logs + level: info # debug | info | warn | error + +output: + console: true + file: true + dir: benchmarks/reports # output directory for reports + format: text # json or text \ No newline at end of file diff --git a/benchmarks/main.go b/benchmarks/main.go new file mode 100644 index 0000000..24917a8 --- /dev/null +++ b/benchmarks/main.go @@ -0,0 +1,105 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/agenticenv/agent-sdk-go/benchmarks/setup" +) + +type BenchmarkMetrics struct { + P50Ms float64 `json:"p50_ms"` + P95Ms float64 `json:"p95_ms"` + P99Ms float64 `json:"p99_ms"` + AvgMs float64 `json:"avg_ms"` + + HeapAllocBytes uint64 `json:"heap_alloc_bytes"` + TotalAllocBytes uint64 `json:"total_alloc_bytes"` + + CPUTimeMs float64 `json:"cpu_time_ms"` + + TotalInputTokens int `json:"total_input_tokens"` + TotalOutputTokens int `json:"total_output_tokens"` + EstCostUSD float64 `json:"est_cost_usd"` + + TotalRuns int `json:"total_runs"` + SuccessRate float64 `json:"success_rate"` +} + +func main() { + configPath := flag.String("config", "", "path to benchmark config.yaml (default: benchmarks/config.yaml)") + flag.Parse() + + cfg, err := setup.LoadConfig(*configPath) + if err != nil { + log.Fatalf("load config: %v", err) + } + + repoRoot, err := setup.FindRepoRoot(".") + if err != nil { + log.Fatalf("find repo root: %v", err) + } + + resolvedConfig := *configPath + if resolvedConfig == "" { + resolvedConfig = setup.DefaultConfigPath() + } + absConfig, err := filepath.Abs(resolvedConfig) + if err != nil { + log.Fatalf("resolve config path: %v", err) + } + + lgr, closeLogger, err := setup.SetupAgentLogger(cfg, repoRoot) + if err != nil { + log.Fatalf("setup logger: %v", err) + } + defer closeLogger() + + stats := setup.NewLLMStats() + runRng := setup.RunRNG() + llm := setup.NewMockLLMClient(cfg.LLM, stats, runRng) + + fmt.Println("================================================================") + fmt.Printf("Starting agent-sdk-go benchmark (%s runtime)\n", cfg.Runtime) + fmt.Printf("Runs: %d Concurrent: %t Tools: %d Sub-agents: %d (levels %d)\n", + cfg.Agent.Runs, cfg.Agent.Concurrent, cfg.Agent.Tools.Count, cfg.Agent.Subagents.Count, cfg.Agent.Subagents.Levels) + if cfg.UseTemporal() { + fmt.Printf("External workers : %d\n", cfg.Temporal.WorkersCount) + } + if cfg.Logger.Enabled { + fmt.Printf("Logger : enabled (%s)\n", cfg.LogDir(repoRoot)) + } + fmt.Println("================================================================") + + ctx := context.Background() + + var workers *externalWorkerManager + if cfg.ExternalWorkersEnabled() { + workers, err = startExternalWorkers(ctx, absConfig, repoRoot, cfg.Temporal.WorkersCount) + if err != nil { + log.Fatalf("start external workers: %v", err) + } + defer func() { + if stopErr := workers.stop(); stopErr != nil { + log.Printf("stop external workers: %v", stopErr) + } + }() + } + + metrics, err := runBenchmark(ctx, cfg, llm, lgr, runRng) + if err != nil { + log.Fatalf("benchmark failed: %v", err) + } + + if err := writeReport(cfg, metrics, repoRoot); err != nil { + log.Fatalf("write report: %v", err) + } + + if metrics.SuccessRate < 100 { + os.Exit(1) + } +} diff --git a/benchmarks/metrics.go b/benchmarks/metrics.go new file mode 100644 index 0000000..4ccf3f6 --- /dev/null +++ b/benchmarks/metrics.go @@ -0,0 +1,75 @@ +package main + +import ( + "math" + "runtime" + "sort" +) + +func aggregateMetrics(outcomes []runOutcome, memBefore, memAfter runtime.MemStats, cpuMs float64, inputTokens, outputTokens int) *BenchmarkMetrics { + latencies := make([]float64, 0, len(outcomes)) + successes := 0 + for _, o := range outcomes { + latencies = append(latencies, o.latencyMs) + if o.success { + successes++ + } + } + sort.Float64s(latencies) + + totalRuns := len(outcomes) + successRate := 0.0 + if totalRuns > 0 { + successRate = float64(successes) / float64(totalRuns) * 100 + } + + return &BenchmarkMetrics{ + P50Ms: percentile(latencies, 50), + P95Ms: percentile(latencies, 95), + P99Ms: percentile(latencies, 99), + AvgMs: average(latencies), + HeapAllocBytes: deltaUint64(memAfter.Alloc, memBefore.Alloc), + TotalAllocBytes: deltaUint64(memAfter.TotalAlloc, memBefore.TotalAlloc), + CPUTimeMs: cpuMs, + TotalInputTokens: inputTokens, + TotalOutputTokens: outputTokens, + EstCostUSD: 0, // pricing to be defined later + TotalRuns: totalRuns, + SuccessRate: successRate, + } +} + +func percentile(sorted []float64, p float64) float64 { + if len(sorted) == 0 { + return 0 + } + if len(sorted) == 1 { + return sorted[0] + } + rank := (p / 100) * float64(len(sorted)-1) + lower := int(math.Floor(rank)) + upper := int(math.Ceil(rank)) + if lower == upper { + return sorted[lower] + } + weight := rank - float64(lower) + return sorted[lower]*(1-weight) + sorted[upper]*weight +} + +func average(values []float64) float64 { + if len(values) == 0 { + return 0 + } + sum := 0.0 + for _, v := range values { + sum += v + } + return sum / float64(len(values)) +} + +func deltaUint64(after, before uint64) uint64 { + if after >= before { + return after - before + } + return after +} diff --git a/benchmarks/report.go b/benchmarks/report.go new file mode 100644 index 0000000..027ac37 --- /dev/null +++ b/benchmarks/report.go @@ -0,0 +1,93 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/agenticenv/agent-sdk-go/benchmarks/setup" +) + +func writeReport(cfg *setup.Config, metrics *BenchmarkMetrics, repoRoot string) error { + if cfg.Output.Console { + printReport(cfg, metrics) + } + if !cfg.Output.File { + return nil + } + + dir := cfg.OutputDir(repoRoot) + if err := os.MkdirAll(dir, 0o755); err != nil { + return fmt.Errorf("create output dir: %w", err) + } + + ext := "json" + if strings.EqualFold(cfg.Output.Format, "text") { + ext = "txt" + } + filename := fmt.Sprintf("benchmark_%s.%s", time.Now().Format("2006-01-02_15-04-05"), ext) + path := filepath.Join(dir, filename) + + content, err := formatReport(cfg, metrics) + if err != nil { + return err + } + if err := os.WriteFile(path, content, 0o644); err != nil { + return fmt.Errorf("write report: %w", err) + } + return nil +} + +func printReport(cfg *setup.Config, metrics *BenchmarkMetrics) { + content, err := formatReport(cfg, metrics) + if err != nil { + fmt.Printf("failed to format report: %v\n", err) + return + } + fmt.Print(string(content)) +} + +func formatReport(cfg *setup.Config, metrics *BenchmarkMetrics) ([]byte, error) { + if strings.EqualFold(cfg.Output.Format, "text") { + return []byte(formatTextReport(cfg, metrics)), nil + } + payload := map[string]any{ + "runtime": cfg.Runtime, + "config": cfg, + "metrics": metrics, + "generated_at": time.Now().UTC().Format(time.RFC3339), + } + return json.MarshalIndent(payload, "", " ") +} + +func formatTextReport(cfg *setup.Config, metrics *BenchmarkMetrics) string { + var b strings.Builder + fmt.Fprintf(&b, "=== Benchmark Report ===\n") + fmt.Fprintf(&b, "Runtime : %s\n", cfg.Runtime) + fmt.Fprintf(&b, "Concurrent : %t\n", cfg.Agent.Concurrent) + if cfg.Agent.Concurrent { + fmt.Fprintf(&b, "Concurrent count : %d\n", cfg.Agent.ConcurrentCount) + } + if cfg.UseTemporal() { + fmt.Fprintf(&b, "External workers : %d\n", cfg.Temporal.WorkersCount) + } + fmt.Fprintf(&b, "Total runs : %d\n", metrics.TotalRuns) + fmt.Fprintf(&b, "Tools : %d (%s)\n", cfg.Agent.Tools.Count, cfg.Agent.Tools.Execution) + fmt.Fprintf(&b, "Sub-agents : %d (levels %d)\n", cfg.Agent.Subagents.Count, cfg.Agent.Subagents.Levels) + fmt.Fprintf(&b, "---\n") + fmt.Fprintf(&b, "Latency p50 (ms) : %.2f\n", metrics.P50Ms) + fmt.Fprintf(&b, "Latency p95 (ms) : %.2f\n", metrics.P95Ms) + fmt.Fprintf(&b, "Latency p99 (ms) : %.2f\n", metrics.P99Ms) + fmt.Fprintf(&b, "Latency avg (ms) : %.2f\n", metrics.AvgMs) + fmt.Fprintf(&b, "Heap alloc (B) : %d\n", metrics.HeapAllocBytes) + fmt.Fprintf(&b, "Total alloc (B) : %d\n", metrics.TotalAllocBytes) + fmt.Fprintf(&b, "CPU time (ms) : %.2f\n", metrics.CPUTimeMs) + fmt.Fprintf(&b, "Input tokens : %d\n", metrics.TotalInputTokens) + fmt.Fprintf(&b, "Output tokens : %d\n", metrics.TotalOutputTokens) + fmt.Fprintf(&b, "Est. cost (USD) : %.4f # pricing placeholder\n", metrics.EstCostUSD) + fmt.Fprintf(&b, "Success rate (%%) : %.2f\n", metrics.SuccessRate) + return b.String() +} diff --git a/benchmarks/runner.go b/benchmarks/runner.go new file mode 100644 index 0000000..ad24d86 --- /dev/null +++ b/benchmarks/runner.go @@ -0,0 +1,109 @@ +package main + +import ( + "context" + "math/rand" + "runtime" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/agenticenv/agent-sdk-go/benchmarks/setup" + "github.com/agenticenv/agent-sdk-go/pkg/agent" + "github.com/agenticenv/agent-sdk-go/pkg/logger" +) + +type runOutcome struct { + latencyMs float64 + success bool +} + +func runBenchmark(ctx context.Context, cfg *setup.Config, llm *setup.MockLLMClient, lgr logger.Logger, runRng *rand.Rand) (*BenchmarkMetrics, error) { + poolSize := 1 + if cfg.Agent.Concurrent { + poolSize = cfg.Agent.ConcurrentCount + } + + bundles, err := buildAgentPool(cfg, llm, lgr, poolSize) + if err != nil { + return nil, err + } + defer func() { + for _, b := range bundles { + setup.CloseAgents(b.All) + } + }() + + var memBefore runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&memBefore) + + cpuBefore, err := processCPUTimeMs() + if err != nil { + return nil, err + } + + outcomes := make([]runOutcome, 0, cfg.Agent.Runs) + var outcomesMu sync.Mutex + + remaining := cfg.Agent.Runs + for remaining > 0 { + batchSize := 1 + if cfg.Agent.Concurrent { + batchSize = cfg.Agent.ConcurrentCount + if batchSize > remaining { + batchSize = remaining + } + } + + var wg sync.WaitGroup + var batchErr atomic.Value + for i := 0; i < batchSize; i++ { + wg.Add(1) + agentIdx := i % len(bundles) + go func(bundle *AgentBundle) { + defer wg.Done() + outcome := executeRun(ctx, bundle.Root, runRng) + outcomesMu.Lock() + outcomes = append(outcomes, outcome) + outcomesMu.Unlock() + }(bundles[agentIdx]) + } + wg.Wait() + if errVal := batchErr.Load(); errVal != nil { + return nil, errVal.(error) + } + remaining -= batchSize + } + + var memAfter runtime.MemStats + runtime.ReadMemStats(&memAfter) + + cpuAfter, err := processCPUTimeMs() + if err != nil { + return nil, err + } + + inputTokens, outputTokens := llm.Stats().Snapshot() + return aggregateMetrics(outcomes, memBefore, memAfter, cpuAfter-cpuBefore, inputTokens, outputTokens), nil +} + +func executeRun(ctx context.Context, a *agent.Agent, rng *rand.Rand) runOutcome { + start := time.Now() + _, err := a.Run(ctx, setup.RandomUserPrompt(rng), "") + return runOutcome{ + latencyMs: float64(time.Since(start).Milliseconds()), + success: err == nil, + } +} + +func processCPUTimeMs() (float64, error) { + var usage syscall.Rusage + if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil { + return 0, err + } + user := float64(usage.Utime.Sec)*1000 + float64(usage.Utime.Usec)/1000 + sys := float64(usage.Stime.Sec)*1000 + float64(usage.Stime.Usec)/1000 + return user + sys, nil +} diff --git a/benchmarks/setup/config.go b/benchmarks/setup/config.go new file mode 100644 index 0000000..e2a30fc --- /dev/null +++ b/benchmarks/setup/config.go @@ -0,0 +1,197 @@ +package setup + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/spf13/viper" +) + +const BenchmarkTreeSeed int64 = 42 + +type Config struct { + Runtime string `mapstructure:"runtime"` + Temporal TemporalConfig `mapstructure:"temporal"` + LLM LLMConfig `mapstructure:"llm"` + Tool ToolConfig `mapstructure:"tool"` + Agent AgentConfig `mapstructure:"agent"` + Logger LoggerConfig `mapstructure:"logger"` + Output OutputConfig `mapstructure:"output"` +} + +type TemporalConfig struct { + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + Namespace string `mapstructure:"namespace"` + TaskQueue string `mapstructure:"task_queue"` + WorkersCount int `mapstructure:"workers_count"` +} + +type LLMConfig struct { + LatencyMs int `mapstructure:"latency_ms"` + JitterMs int `mapstructure:"jitter_ms"` + MockTokens int `mapstructure:"mock_tokens"` +} + +type ToolConfig struct { + LatencyMs int `mapstructure:"latency_ms"` + JitterMs int `mapstructure:"jitter_ms"` +} + +type AgentConfig struct { + Runs int `mapstructure:"runs"` + Concurrent bool `mapstructure:"concurrent"` + ConcurrentCount int `mapstructure:"concurrent_count"` + Tools AgentToolsConfig `mapstructure:"tools"` + Subagents SubagentsConfig `mapstructure:"subagents"` +} + +type AgentToolsConfig struct { + Count int `mapstructure:"count"` + Execution string `mapstructure:"execution"` +} + +type SubagentsConfig struct { + Count int `mapstructure:"count"` + Levels int `mapstructure:"levels"` +} + +type LoggerConfig struct { + Enabled bool `mapstructure:"enabled"` + Dir string `mapstructure:"dir"` + Level string `mapstructure:"level"` +} + +type OutputConfig struct { + Console bool `mapstructure:"console"` + File bool `mapstructure:"file"` + Dir string `mapstructure:"dir"` + Format string `mapstructure:"format"` +} + +func (c *Config) UseTemporal() bool { + return c != nil && strings.EqualFold(strings.TrimSpace(c.Runtime), "temporal") +} + +func (c *Config) ExternalWorkersEnabled() bool { + return c.UseTemporal() && c.Temporal.WorkersCount > 0 +} + +func LoadConfig(path string) (*Config, error) { + if path == "" { + path = defaultConfigPath() + } + v := viper.New() + v.SetConfigFile(path) + v.SetConfigType("yaml") + if err := v.ReadInConfig(); err != nil { + return nil, fmt.Errorf("read config %q: %w", path, err) + } + var cfg Config + if err := v.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("parse config: %w", err) + } + if err := cfg.validate(); err != nil { + return nil, err + } + return &cfg, nil +} + +func (c *Config) validate() error { + if c.Agent.Runs <= 0 { + return fmt.Errorf("agent.runs must be > 0") + } + if c.Agent.Concurrent && c.Agent.ConcurrentCount <= 0 { + return fmt.Errorf("agent.concurrent_count must be > 0 when concurrent is true") + } + if c.Agent.Tools.Count <= 0 { + return fmt.Errorf("agent.tools.count must be > 0") + } + if c.Agent.Subagents.Levels < 0 { + return fmt.Errorf("agent.subagents.levels must be >= 0") + } + if c.Agent.Subagents.Levels > 5 { + return fmt.Errorf("agent.subagents.levels must be <= 5") + } + if c.Agent.Subagents.Count < 0 { + return fmt.Errorf("agent.subagents.count must be >= 0") + } + if c.Temporal.WorkersCount < 0 { + return fmt.Errorf("temporal.workers_count must be >= 0") + } + if c.LLM.MockTokens <= 0 { + c.LLM.MockTokens = 500 + } + if c.Logger.Dir == "" { + c.Logger.Dir = "benchmarks/logs" + } + if strings.TrimSpace(c.Logger.Level) == "" { + c.Logger.Level = "info" + } + if c.Output.Dir == "" { + c.Output.Dir = "benchmarks/reports" + } + if c.Output.Format == "" { + c.Output.Format = "json" + } + if c.Temporal.TaskQueue == "" { + c.Temporal.TaskQueue = "agent-sdk-go" + } + if c.Temporal.Port == 0 { + c.Temporal.Port = 7233 + } + if c.Temporal.Host == "" { + c.Temporal.Host = "localhost" + } + if c.Temporal.Namespace == "" { + c.Temporal.Namespace = "default" + } + return nil +} + +func defaultConfigPath() string { + for _, candidate := range []string{"benchmarks/config.yaml", "config.yaml"} { + if _, err := os.Stat(candidate); err == nil { + return candidate + } + } + return "benchmarks/config.yaml" +} + +// DefaultConfigPath returns the default benchmark config file path. +func DefaultConfigPath() string { return defaultConfigPath() } + +func FindRepoRoot(from string) (string, error) { + dir, err := filepath.Abs(from) + if err != nil { + return "", err + } + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir, nil + } + parent := filepath.Dir(dir) + if parent == dir { + return "", fmt.Errorf("go.mod not found from %s", from) + } + dir = parent + } +} + +func (c *Config) OutputDir(repoRoot string) string { + return resolveRepoPath(repoRoot, c.Output.Dir) +} + +func (c *Config) LogDir(repoRoot string) string { + return resolveRepoPath(repoRoot, c.Logger.Dir) +} + +func resolveRepoPath(repoRoot, dir string) string { + dir = strings.TrimSpace(dir) + if filepath.IsAbs(dir) { + return dir + } + return filepath.Join(repoRoot, dir) +} diff --git a/benchmarks/setup/logger.go b/benchmarks/setup/logger.go new file mode 100644 index 0000000..c7be1d9 --- /dev/null +++ b/benchmarks/setup/logger.go @@ -0,0 +1,49 @@ +package setup + +import ( + "fmt" + "math/rand" + "os" + "path/filepath" + "time" + + "github.com/agenticenv/agent-sdk-go/pkg/logger" +) + +func SetupAgentLogger(cfg *Config, repoRoot string) (logger.Logger, func(), error) { + return setupFileLogger(cfg, repoRoot, "agent") +} + +func SetupWorkerLogger(cfg *Config, repoRoot string, workerID int) (logger.Logger, func(), error) { + return setupFileLogger(cfg, repoRoot, fmt.Sprintf("worker_%d", workerID)) +} + +func setupFileLogger(cfg *Config, repoRoot, prefix string) (logger.Logger, func(), error) { + if cfg == nil || !cfg.Logger.Enabled { + return logger.NoopLogger(), func() {}, nil + } + + dir := cfg.LogDir(repoRoot) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, nil, fmt.Errorf("create log dir: %w", err) + } + + filename := fmt.Sprintf("%s_%s.log", prefix, time.Now().Format("2006-01-02_15-04-05")) + path := filepath.Join(dir, filename) + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return nil, nil, fmt.Errorf("open log file: %w", err) + } + + lgr := logger.NewWriterLogger(f, cfg.Logger.Level, "json", true) + cleanup := func() { _ = f.Close() } + return lgr, cleanup, nil +} + +func TreeRNG() *rand.Rand { + return rand.New(rand.NewSource(BenchmarkTreeSeed)) +} + +func RunRNG() *rand.Rand { + return rand.New(rand.NewSource(time.Now().UnixNano())) +} diff --git a/benchmarks/setup/mock_llm.go b/benchmarks/setup/mock_llm.go new file mode 100644 index 0000000..da6390d --- /dev/null +++ b/benchmarks/setup/mock_llm.go @@ -0,0 +1,178 @@ +package setup + +import ( + "context" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + "github.com/agenticenv/agent-sdk-go/internal/runtime" + "github.com/agenticenv/agent-sdk-go/pkg/interfaces" +) + +const MockLLMModel = "benchmark-mock" + +type LLMStats struct { + mu sync.Mutex + TotalInputTokens int + TotalOutputTokens int +} + +func NewLLMStats() *LLMStats { return &LLMStats{} } + +func (s *LLMStats) add(input, output int) { + if s == nil { + return + } + s.mu.Lock() + s.TotalInputTokens += input + s.TotalOutputTokens += output + s.mu.Unlock() +} + +func (s *LLMStats) Snapshot() (input, output int) { + if s == nil { + return 0, 0 + } + s.mu.Lock() + defer s.mu.Unlock() + return s.TotalInputTokens, s.TotalOutputTokens +} + +type MockLLMClient struct { + cfg LLMConfig + stats *LLMStats + rng *rand.Rand +} + +func NewMockLLMClient(cfg LLMConfig, stats *LLMStats, rng *rand.Rand) *MockLLMClient { + if rng == nil { + rng = rand.New(rand.NewSource(time.Now().UnixNano())) + } + return &MockLLMClient{cfg: cfg, stats: stats, rng: rng} +} + +func (m *MockLLMClient) Stats() *LLMStats { return m.stats } + +func (m *MockLLMClient) Generate(ctx context.Context, request *interfaces.LLMRequest) (*interfaces.LLMResponse, error) { + if err := sleepWithJitter(ctx, m.cfg.LatencyMs, m.cfg.JitterMs, m.rng); err != nil { + return nil, err + } + + promptTokens, completionTokens := splitMockTokens(m.cfg.MockTokens) + m.stats.add(promptTokens, completionTokens) + + if hasToolResultMessages(request) { + return &interfaces.LLMResponse{ + Content: "benchmark complete", + Usage: &interfaces.LLMUsage{ + PromptTokens: int64(promptTokens), + CompletionTokens: int64(completionTokens), + TotalTokens: int64(promptTokens + completionTokens), + }, + }, nil + } + + toolCalls := make([]*interfaces.ToolCall, 0, len(request.Tools)) + for i, spec := range request.Tools { + toolCalls = append(toolCalls, &interfaces.ToolCall{ + ToolCallID: fmt.Sprintf("tc-%d", i+1), + ToolName: spec.Name, + Args: mockToolArgs(spec.Name), + }) + } + + return &interfaces.LLMResponse{ + Content: "executing tools", + ToolCalls: toolCalls, + Usage: &interfaces.LLMUsage{ + PromptTokens: int64(promptTokens), + CompletionTokens: int64(completionTokens), + TotalTokens: int64(promptTokens + completionTokens), + }, + }, nil +} + +func (m *MockLLMClient) GenerateStream(ctx context.Context, request *interfaces.LLMRequest) (interfaces.LLMStream, error) { + resp, err := m.Generate(ctx, request) + if err != nil { + return nil, err + } + return &mockLLMStream{resp: resp}, nil +} + +func (m *MockLLMClient) GetModel() string { return MockLLMModel } + +func (m *MockLLMClient) GetProvider() interfaces.LLMProvider { + return interfaces.LLMProviderOpenAI +} + +func (m *MockLLMClient) IsStreamSupported() bool { return false } + +type mockLLMStream struct { + resp *interfaces.LLMResponse + done bool + err error +} + +func (s *mockLLMStream) Next() bool { + if s.done { + return false + } + s.done = true + return true +} + +func (s *mockLLMStream) Current() *interfaces.LLMStreamChunk { + if s.resp == nil { + return nil + } + return &interfaces.LLMStreamChunk{ContentDelta: s.resp.Content, ToolCalls: s.resp.ToolCalls} +} + +func (s *mockLLMStream) Err() error { return s.err } + +func (s *mockLLMStream) GetResult() *interfaces.LLMResponse { return s.resp } + +func hasToolResultMessages(request *interfaces.LLMRequest) bool { + if request == nil { + return false + } + for _, msg := range request.Messages { + if msg.Role == interfaces.MessageRoleTool { + return true + } + } + return false +} + +func mockToolArgs(toolName string) map[string]any { + if strings.HasPrefix(toolName, "subagent_") { + return map[string]any{runtime.SubAgentToolParamQuery: "benchmark subtask"} + } + return map[string]any{"input": "benchmark"} +} + +func splitMockTokens(total int) (prompt, completion int) { + if total <= 0 { + return 0, 0 + } + prompt = total * 3 / 5 + completion = total - prompt + return prompt, completion +} + +func sleepWithJitter(ctx context.Context, baseMs, jitterMs int, rng *rand.Rand) error { + delay := time.Duration(baseMs) * time.Millisecond + if jitterMs > 0 && rng != nil { + delay += time.Duration(rng.Intn(jitterMs+1)) * time.Millisecond + } + select { + case <-time.After(delay): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/benchmarks/setup/mock_tool.go b/benchmarks/setup/mock_tool.go new file mode 100644 index 0000000..6dde63c --- /dev/null +++ b/benchmarks/setup/mock_tool.go @@ -0,0 +1,59 @@ +package setup + +import ( + "context" + "fmt" + "math/rand" + + "github.com/agenticenv/agent-sdk-go/pkg/interfaces" + "github.com/agenticenv/agent-sdk-go/pkg/tools" +) + +const BenchmarkToolPrefix = "benchmark_tool_" + +type MockBenchmarkTool struct { + name string + cfg ToolConfig + rng *rand.Rand +} + +func NewMockBenchmarkTool(index int, cfg ToolConfig, rng *rand.Rand) *MockBenchmarkTool { + return &MockBenchmarkTool{ + name: fmt.Sprintf("%s%d", BenchmarkToolPrefix, index), + cfg: cfg, + rng: rng, + } +} + +func (t *MockBenchmarkTool) Name() string { return t.name } + +func (t *MockBenchmarkTool) DisplayName() string { return t.name } + +func (t *MockBenchmarkTool) Description() string { + return "Benchmark mock tool for load testing." +} + +func (t *MockBenchmarkTool) Parameters() interfaces.JSONSchema { + return tools.Params(map[string]interfaces.JSONSchema{ + "input": tools.ParamString("Input payload for the benchmark tool."), + }, "input") +} + +func (t *MockBenchmarkTool) Execute(ctx context.Context, args map[string]any) (any, error) { + if err := sleepWithJitter(ctx, t.cfg.LatencyMs, t.cfg.JitterMs, t.rng); err != nil { + return nil, err + } + input, _ := args["input"].(string) + if input == "" { + input = "benchmark" + } + return map[string]any{"tool": t.name, "input": input, "status": "ok"}, nil +} + +func RegisterBenchmarkTools(count int, cfg ToolConfig, rng *rand.Rand) *tools.Registry { + reg := tools.NewRegistry() + for i := 1; i <= count; i++ { + reg.Register(NewMockBenchmarkTool(i, cfg, rng)) + } + return reg +} diff --git a/benchmarks/setup/opts.go b/benchmarks/setup/opts.go new file mode 100644 index 0000000..98fe940 --- /dev/null +++ b/benchmarks/setup/opts.go @@ -0,0 +1,79 @@ +package setup + +import ( + "strings" + + "github.com/agenticenv/agent-sdk-go/pkg/agent" + "github.com/agenticenv/agent-sdk-go/pkg/interfaces" + "github.com/agenticenv/agent-sdk-go/pkg/logger" +) + +// RootOptions returns agent options shared by the benchmark root agent and root worker. +// enableRemoteWorkers is agent-only (EnableRemoteWorkers). +func RootOptions( + cfg *Config, + llm interfaces.LLMClient, + lgr logger.Logger, + name, systemPrompt string, + subAgents []*agent.Agent, + taskQueue string, + enableRemoteWorkers bool, +) []agent.Option { + if lgr == nil { + lgr = logger.NoopLogger() + } + + opts := []agent.Option{ + agent.WithName(name), + agent.WithDescription("Benchmark agent for SDK load testing."), + agent.WithSystemPrompt(systemPrompt), + agent.WithLLMClient(llm), + agent.WithToolRegistry(RegisterBenchmarkTools(cfg.Agent.Tools.Count, cfg.Tool, TreeRNG())), + agent.WithToolApprovalPolicy(agent.AutoToolApprovalPolicy()), + agent.WithAgentToolExecutionMode(mapToolExecutionMode(cfg.Agent.Tools.Execution)), + agent.WithLogger(lgr), + } + if len(subAgents) > 0 { + opts = append(opts, agent.WithSubAgents(subAgents...)) + } + if cfg.Agent.Subagents.Levels > 0 { + opts = append(opts, agent.WithMaxSubAgentDepth(cfg.Agent.Subagents.Levels)) + } + if cfg.UseTemporal() { + opts = append(opts, agent.WithTemporalConfig(&agent.TemporalConfig{ + Host: cfg.Temporal.Host, + Port: cfg.Temporal.Port, + Namespace: cfg.Temporal.Namespace, + TaskQueue: taskQueue, + })) + } + if enableRemoteWorkers { + opts = append(opts, agent.EnableRemoteWorkers()) + } + return opts +} + +func mapToolExecutionMode(raw string) agent.AgentToolExecutionMode { + switch strings.ToLower(strings.TrimSpace(raw)) { + case "sequential": + return agent.AgentToolExecutionModeSequential + default: + return agent.AgentToolExecutionModeParallel + } +} + +func TaskQueueFor(cfg *Config, suffix string) string { + base := strings.TrimSpace(cfg.Temporal.TaskQueue) + if suffix == "" { + return base + } + return base + "-" + suffix +} + +func CloseAgents(agents []*agent.Agent) { + for _, a := range agents { + if a != nil { + a.Close() + } + } +} diff --git a/benchmarks/setup/prompts.go b/benchmarks/setup/prompts.go new file mode 100644 index 0000000..e100202 --- /dev/null +++ b/benchmarks/setup/prompts.go @@ -0,0 +1,26 @@ +package setup + +import ( + "fmt" + "math/rand" +) + +const RootAgentName = "benchmark-agent" + +func RandomUserPrompt(rng *rand.Rand) string { + verbs := []string{"Analyze", "Review", "Summarize", "Evaluate", "Process"} + nouns := []string{"system state", "metrics", "workflow", "request batch", "task queue"} + return fmt.Sprintf("%s the %s for benchmark run %d.", verbs[rng.Intn(len(verbs))], nouns[rng.Intn(len(nouns))], rng.Intn(1_000_000)) +} + +func systemPrompt(rng *rand.Rand) string { + topics := []string{"analysis", "planning", "summarization", "debugging", "research"} + return fmt.Sprintf("You are a benchmark assistant focused on %s. Respond concisely.", topics[rng.Intn(len(topics))]) +} + +func RootSystemPrompt(treeRng *rand.Rand) string { + if treeRng == nil { + treeRng = TreeRNG() + } + return systemPrompt(treeRng) +} diff --git a/benchmarks/setup/tree.go b/benchmarks/setup/tree.go new file mode 100644 index 0000000..15a355f --- /dev/null +++ b/benchmarks/setup/tree.go @@ -0,0 +1,98 @@ +package setup + +import ( + "fmt" + "math/rand" + "strings" + "time" + + "github.com/agenticenv/agent-sdk-go/pkg/agent" + "github.com/agenticenv/agent-sdk-go/pkg/interfaces" + "github.com/agenticenv/agent-sdk-go/pkg/logger" +) + +type AgentTree struct { + RootPrompt string + SubAgents []*agent.Agent + Created []*agent.Agent +} + +func BuildAgentTree(cfg *Config, llm interfaces.LLMClient, lgr logger.Logger) (*AgentTree, error) { + treeRng := TreeRNG() + rootPrompt := RootSystemPrompt(treeRng) + + subAgents, created, err := buildSubAgentTree(cfg, llm, lgr, treeRng, "", 1, cfg.Agent.Subagents.Levels) + if err != nil { + CloseAgents(created) + return nil, err + } + + return &AgentTree{ + RootPrompt: rootPrompt, + SubAgents: subAgents, + Created: created, + }, nil +} + +func buildSubAgentTree( + cfg *Config, + llm interfaces.LLMClient, + lgr logger.Logger, + treeRng *rand.Rand, + parentPath string, + depth, maxLevels int, +) ([]*agent.Agent, []*agent.Agent, error) { + if depth > maxLevels || cfg.Agent.Subagents.Count == 0 { + return nil, nil, nil + } + + subAgents := make([]*agent.Agent, 0, cfg.Agent.Subagents.Count) + created := make([]*agent.Agent, 0) + + for i := 1; i <= cfg.Agent.Subagents.Count; i++ { + nameSuffix := fmt.Sprintf("%d", i) + if parentPath != "" { + nameSuffix = parentPath + "." + nameSuffix + } + displayName := "subagent-" + nameSuffix + queueSuffix := strings.ReplaceAll(displayName, ".", "-") + + children, childCreated, err := buildSubAgentTree(cfg, llm, lgr, treeRng, nameSuffix, depth+1, maxLevels) + if err != nil { + CloseAgents(created) + return nil, nil, err + } + created = append(created, childCreated...) + + sub, err := newSubAgent(cfg, llm, lgr, treeRng, displayName, systemPrompt(treeRng), children, TaskQueueFor(cfg, queueSuffix)) + if err != nil { + CloseAgents(created) + return nil, nil, err + } + subAgents = append(subAgents, sub) + created = append(created, sub) + } + + return subAgents, created, nil +} + +func newSubAgent( + cfg *Config, + llm interfaces.LLMClient, + lgr logger.Logger, + treeRng *rand.Rand, + name, prompt string, + subAgents []*agent.Agent, + taskQueue string, +) (*agent.Agent, error) { + opts := RootOptions(cfg, llm, lgr, name, prompt, subAgents, taskQueue, false) + a, err := agent.NewAgent(opts...) + if err != nil { + return nil, err + } + if cfg.UseTemporal() { + time.Sleep(300 * time.Millisecond) + } + _ = treeRng + return a, nil +} diff --git a/benchmarks/worker/main.go b/benchmarks/worker/main.go new file mode 100644 index 0000000..1e2f0b7 --- /dev/null +++ b/benchmarks/worker/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/agenticenv/agent-sdk-go/benchmarks/setup" + "github.com/agenticenv/agent-sdk-go/pkg/agent" +) + +func main() { + configPath := flag.String("config", "", "path to benchmark config.yaml") + workerID := flag.Int("worker-id", 1, "worker instance id for logging") + flag.Parse() + + cfg, err := setup.LoadConfig(*configPath) + if err != nil { + log.Fatalf("load config: %v", err) + } + if !cfg.UseTemporal() { + log.Fatal("benchmark worker requires runtime: temporal in config") + } + + repoRoot, err := setup.FindRepoRoot(".") + if err != nil { + log.Fatalf("find repo root: %v", err) + } + + lgr, closeLogger, err := setup.SetupWorkerLogger(cfg, repoRoot, *workerID) + if err != nil { + log.Fatalf("setup logger: %v", err) + } + defer closeLogger() + + llm := setup.NewMockLLMClient(cfg.LLM, setup.NewLLMStats(), setup.TreeRNG()) + tree, err := setup.BuildAgentTree(cfg, llm, lgr) + if err != nil { + log.Fatalf("build agent tree: %v", err) + } + defer setup.CloseAgents(tree.Created) + + opts := setup.RootOptions(cfg, llm, lgr, setup.RootAgentName, tree.RootPrompt, tree.SubAgents, cfg.Temporal.TaskQueue, false) + w, err := agent.NewAgentWorker(opts...) + if err != nil { + log.Fatalf("create agent worker: %v", err) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + fmt.Printf("benchmark worker %d starting on task queue %q\n", *workerID, cfg.Temporal.TaskQueue) + + done := make(chan error, 1) + go func() { + done <- w.Start(ctx) + }() + + select { + case err := <-done: + if err != nil && ctx.Err() == nil { + log.Fatalf("worker stopped: %v", err) + } + case <-ctx.Done(): + w.Stop() + <-done + } + + fmt.Printf("benchmark worker %d stopped\n", *workerID) +} diff --git a/benchmarks/worker_manager.go b/benchmarks/worker_manager.go new file mode 100644 index 0000000..37b677a --- /dev/null +++ b/benchmarks/worker_manager.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "syscall" + "time" +) + +type externalWorkerManager struct { + cmds []*exec.Cmd +} + +func startExternalWorkers(ctx context.Context, cfgPath, repoRoot string, count int) (*externalWorkerManager, error) { + if count <= 0 { + return &externalWorkerManager{}, nil + } + + absConfig, err := filepath.Abs(cfgPath) + if err != nil { + return nil, fmt.Errorf("resolve config path: %w", err) + } + + mgr := &externalWorkerManager{cmds: make([]*exec.Cmd, 0, count)} + for i := 1; i <= count; i++ { + cmd := exec.CommandContext(ctx, "go", "run", "./benchmarks/worker", + "-config", absConfig, + "-worker-id", fmt.Sprintf("%d", i), + ) + cmd.Dir = repoRoot + cmd.Env = os.Environ() + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + if err := cmd.Start(); err != nil { + _ = mgr.stop() + return nil, fmt.Errorf("start worker %d: %w", i, err) + } + mgr.cmds = append(mgr.cmds, cmd) + } + + time.Sleep(2 * time.Second) + return mgr, nil +} + +func (m *externalWorkerManager) stop() error { + if m == nil { + return nil + } + var firstErr error + for _, cmd := range m.cmds { + if cmd == nil || cmd.Process == nil { + continue + } + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM) + } + deadline := time.Now().Add(10 * time.Second) + for _, cmd := range m.cmds { + if cmd == nil || cmd.Process == nil { + continue + } + done := make(chan error, 1) + go func(c *exec.Cmd) { done <- c.Wait() }(cmd) + select { + case err := <-done: + if err != nil && firstErr == nil { + firstErr = err + } + case <-time.After(time.Until(deadline)): + _ = cmd.Process.Kill() + } + } + return firstErr +}