Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,8 @@ result, _ := a.Run(ctx, "Hello", nil)

Pass `agent.WithConversation(conv)` to persist message history for multi-turn context. Use `agent.WithConversationSize(n)` to limit how many messages are fetched for LLM context (default 20).

By default, messages from the current run are saved once when the run finishes. Use `agent.EnableConversationSaveOnIteration()` when external consumers (e.g. a UI polling Redis) need live updates **during** a multi-step run—after each tool round, not only at the end. This adds extra store writes. For Temporal remote workers, set it on **`AgentWorker`** (where `WithConversation` and persistence run); the agent caller process does not need it.

**Conversation ID:** When the agent is configured with a conversation, pass an `*agent.AgentRunOptions` with `ConversationOptions.ID` set to the same session ID on every call to `Run`, `RunAsync`, and `Stream`—so history is shared across turns.

Choose implementation by deployment:
Expand Down Expand Up @@ -1177,6 +1179,7 @@ A Temporal connection (`WithTemporalConfig` or `WithTemporalClient`) is **option
- **WithResponseFormat**: LLM response format. Omit for text-only. Use `&interfaces.ResponseFormat{Type, Name, Schema}` for JSON with schema. See [Response format](#response-format).
- **WithConversation**: Message history store. Use `inmem` for single process; `redis` for remote workers. Pass the conversation ID via `AgentRunOptions` to `Run`, `RunAsync`, and `Stream` to share history across turns. See [Conversation](#conversation-message-history).
- **WithConversationSize**: Max messages to fetch for LLM context (default 20). Only applies when `WithConversation` is set.
- **EnableConversationSaveOnIteration**: Persist conversation messages after each tool round instead of batching at run end. For live visibility (e.g. Redis UI) during long runs. Set on `AgentWorker` for Temporal remote workers.
- **EnableRemoteWorkers**: Pass `EnableRemoteWorkers()` when using `DisableLocalWorker` with approval or streaming (starts the event worker/workflow path).
- **WithSubAgents**: Attach specialist agents the main agent can delegate to. Each needs its own task queue and worker. See [Sub-agents](#sub-agents).
- **WithMaxSubAgentDepth**: Maximum delegation hops from this agent (default 2). See [Sub-agents](#sub-agents).
Expand Down
3 changes: 3 additions & 0 deletions examples/.env.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ LOG_LEVEL=error
# local = in-process (default). temporal = requires Temporal server (see temporal-setup.md).
AGENT_RUNTIME=local

# --- Redis (agent_with_conversation; task infra:redis:up) ---
REDIS_ADDR=localhost:6379

# --- Temporal (when AGENT_RUNTIME=temporal) ---
# TEMPORAL_TASKQUEUE is a base prefix; each example appends its suffix (e.g. agent-sdk-go-simple_agent).
TEMPORAL_HOST=localhost
Expand Down
7 changes: 5 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ These examples run with `AGENT_RUNTIME=local` (default) or `AGENT_RUNTIME=tempor
| Example | What it demonstrates | Infra (Task, from `examples/`) |
|---------|---------------------|--------------------------------|
| `simple_agent` | Minimal agent, no tools — system prompt, LLM client, single `Run()`; prints `AgentResponse.Usage` (token counts) when the provider reports them | — |
| `agent_with_conversation` | In-memory conversation with `WithConversation` — multi-turn context, same `conversationID` for `Run` | |
| `agent_with_conversation` | Redis conversation with `WithConversation` — multi-turn context, same `conversationID` for `Run` | `infra:redis:up` (or `infra:deps:up`) |
| `agent_with_tools/basic` | Built-in tools (echo, calculator, weather, wikipedia, search) with auto-approval | — |
| `agent_with_tools/approval` | Tools + `WithApprovalHandler` — user approves or rejects each tool run (`Run` only) | — |
| `agent_with_tools/authorizer` | Custom tool authorization via `interfaces.ToolAuthorizer` — denied calls surface as `tool_result` with `denied` status | — |
Expand Down Expand Up @@ -81,9 +81,10 @@ go run ./simple_agent "Hello, what can you do?"

### Agent with conversation (multi-turn)

Uses in-memory conversation. Run **interactive mode** (no args) for multi-turn in one process—history is shared across turns. With args, runs a single turn (useful for testing).
Uses Redis (`REDIS_ADDR`, default `localhost:6379`). Start Redis first: `task infra:redis:up`. Run **interactive mode** (no args) for multi-turn in one process—history is shared across turns. With args, runs a single turn (useful for testing).

```bash
task infra:redis:up
# Interactive: type prompts, get responses; history shared. Type 'exit' to end.
go run ./agent_with_conversation

Expand Down Expand Up @@ -273,6 +274,8 @@ Examples send conversation (user prompt, assistant response) to **stdout** and i
|---------|-------------|
| `AGENT_RUNTIME` | `local` (default) or `temporal` — selects the execution backend |
| `TEMPORAL_HOST`, `TEMPORAL_PORT`, `TEMPORAL_NAMESPACE`, `TEMPORAL_TASKQUEUE` | Temporal connection (used when `AGENT_RUNTIME=temporal`) |
| `REDIS_ADDR` | Redis address for `agent_with_conversation` (default: `localhost:6379`) |
| `CONVERSATION_ID` | Optional session id override for conversation examples |
| `LLM_PROVIDER` | `openai`, `anthropic`, or `gemini` (see `.env.defaults`) |
| `LLM_APIKEY` | API key |
| `LLM_MODEL` | e.g. `gpt-4o`, `claude-3-5-sonnet-20241022` |
Expand Down
19 changes: 18 additions & 1 deletion examples/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ tasks:
cmds:
- task infra:temporal:up
- task infra:lgtm:up
- task infra:redis:up

infra:down:
desc: Stop all infrastructure
deps: [prereq:check]
cmds:
- task infra:temporal:down
- task infra:lgtm:down
- task infra:redis:down

infra:status:
desc: Show status of example infra
Expand All @@ -61,6 +63,7 @@ tasks:

compose_svc temporal
compose_svc otel-lgtm
compose_svc redis
compose_svc weaviate
compose_svc pgvector
if curl -sf "$A2A_URL" >/dev/null 2>&1; then
Expand Down Expand Up @@ -109,13 +112,26 @@ tasks:
cmds:
- docker compose -f docker/docker-compose.yml down otel-lgtm

infra:redis:up:
desc: Start Redis (conversation memory for remote workers)
deps: [prereq:check]
cmds:
- docker compose -f docker/docker-compose.yml up -d --wait redis

infra:redis:down:
desc: Stop Redis
deps: [prereq:check]
cmds:
- docker compose -f docker/docker-compose.yml down redis

# ── Shared deps (local + temporal examples) ─────────────────

infra:deps:up:
desc: Start shared example deps (LGTM, Weaviate, pgvector, A2A server)
desc: Start shared example deps (LGTM, Redis, Weaviate, pgvector, A2A server)
deps: [prereq:check]
cmds:
- task: infra:lgtm:up
- task: infra:redis:up
- task: infra:weaviate:up
- task: infra:pgvector:up
- task: infra:a2a:up
Expand All @@ -127,6 +143,7 @@ tasks:
- task: infra:a2a:down
- task: infra:pgvector:down
- task: infra:weaviate:down
- task: infra:redis:down
- task: infra:lgtm:down

infra:weaviate:up:
Expand Down
23 changes: 19 additions & 4 deletions examples/agent_with_conversation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

config "github.com/agenticenv/agent-sdk-go/examples"
"github.com/agenticenv/agent-sdk-go/pkg/agent"
"github.com/agenticenv/agent-sdk-go/pkg/conversation/inmem"
"github.com/agenticenv/agent-sdk-go/pkg/conversation/redis"
"github.com/agenticenv/agent-sdk-go/pkg/tools"
"github.com/agenticenv/agent-sdk-go/pkg/tools/calculator"
"github.com/agenticenv/agent-sdk-go/pkg/tools/echo"
Expand All @@ -24,22 +24,30 @@ func main() {
log.Fatalf("failed to create LLM client: %v", err)
}

// In-memory conversation for multi-turn context (single process only).
conv := inmem.NewInMemoryConversation(inmem.WithMaxSize(100))
// Redis conversation (start with task infra:redis:up or docker compose redis service).
conv, err := redis.NewRedisConversation(
redis.WithAddr(redisAddrFromEnv()),
redis.WithMaxSize(100),
)
if err != nil {
log.Fatalf("failed to create Redis conversation: %v", err)
}
defer func() { _ = conv.Close() }()

reg := tools.NewRegistry()
reg.Register(echo.New())
reg.Register(calculator.New())

opts := []agent.Option{
agent.WithName("agent-with-conversation"),
agent.WithDescription("Agent with in-memory conversation and tools for multi-turn context"),
agent.WithDescription("Agent with Redis conversation and tools for multi-turn context"),
agent.WithSystemPrompt("You are a helpful assistant. Remember the conversation context. Use tools when helpful: echo for repeating, calculator for math."),
agent.WithLLMClient(llmClient),
agent.WithToolRegistry(reg),
agent.WithToolApprovalPolicy(agent.AutoToolApprovalPolicy()),
agent.WithConversation(conv),
agent.WithConversationSize(20),
agent.EnableConversationSaveOnIteration(),
agent.WithLogger(config.NewLoggerFromLogConfig(cfg)),
}
opts = append(opts, config.RuntimeOption(cfg)...)
Expand All @@ -66,6 +74,13 @@ func main() {
runInteractive(context.Background(), a, convID)
}

func redisAddrFromEnv() string {
if addr := os.Getenv("REDIS_ADDR"); addr != "" {
return addr
}
return "localhost:6379"
}

func runSingleTurn(ctx context.Context, a *agent.Agent, prompt, convID string) {
fmt.Println("user:", prompt)
opts := &agent.AgentRunOptions{
Expand Down
12 changes: 12 additions & 0 deletions examples/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ services:
- "4317:4317" # OTEL gRPC
- "4318:4318" # OTEL HTTP

redis:
container_name: redis
image: redis:7-alpine
ports:
- "${REDIS_PORT:-6379}:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 2s
timeout: 5s
retries: 30
start_period: 5s

weaviate:
container_name: weaviate
image: cr.weaviate.io/semitechnologies/weaviate:1.27.0
Expand Down
27 changes: 22 additions & 5 deletions internal/runtime/local/agent_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ func (rt *LocalRuntime) RunAgentLoop(ctx context.Context, input AgentLoopInput)
{Role: interfaces.MessageRoleUser, Content: input.UserPrompt},
}

// Prepend conversation history when a conversation ID is provided.
if input.ConversationID != "" {
// Prepend conversation history when conversation memory is configured for this run.
persistedMessageCount := 0
if rt.conversationMemoryEnabled(input) {
convMsgs, err := rt.FetchConversationMessages(ctx, log, input.ConversationID)
if err != nil {
log.Warn(ctx, "local: failed to load conversation history, continuing without it",
Expand All @@ -109,6 +110,7 @@ func (rt *LocalRuntime) RunAgentLoop(ctx context.Context, input AgentLoopInput)
slog.Any("error", err))
} else {
messages = append(convMsgs, messages...)
persistedMessageCount = len(convMsgs)
}
}

Expand Down Expand Up @@ -221,11 +223,22 @@ func (rt *LocalRuntime) RunAgentLoop(ctx context.Context, input AgentLoopInput)
}

messages = append(messages, toolResults...)

if rt.conversationMemoryEnabled(input) && rt.AgentExecution.Session.ConversationSaveOnIteration && len(messages) > persistedMessageCount {
if err := persistConversationMessages(ctx, rt, input.ConversationID, messages[persistedMessageCount:]); err != nil {
log.Warn(ctx, "local: persist conversation failed",
slog.String("scope", "loop"),
slog.String("conversationID", input.ConversationID),
slog.Any("error", err))
} else {
persistedMessageCount = len(messages)
}
}
}

// Persist all accumulated messages to conversation when a conversation ID is set.
if input.ConversationID != "" && rt.AgentExecution.Session.Conversation != nil {
if err := persistConversationMessages(ctx, rt, input.ConversationID, messages); err != nil {
// Persist unsaved messages: full run when ConversationSaveOnIteration is false; final assistant only when true.
if rt.conversationMemoryEnabled(input) && len(messages) > persistedMessageCount {
if err := persistConversationMessages(ctx, rt, input.ConversationID, messages[persistedMessageCount:]); err != nil {
log.Warn(ctx, "local: persist conversation failed",
slog.String("scope", "loop"),
slog.String("conversationID", input.ConversationID),
Expand All @@ -242,6 +255,10 @@ func (rt *LocalRuntime) RunAgentLoop(ctx context.Context, input AgentLoopInput)
return &AgentLoopResult{Content: lastContent, Usage: runUsage}, nil
}

func (rt *LocalRuntime) conversationMemoryEnabled(input AgentLoopInput) bool {
return input.ConversationID != "" && rt.AgentExecution.Session.Conversation != nil
}

// executeToolsParallel runs all tool calls concurrently and collects results in submission order.
// Errors from individual tools are returned as synthetic tool messages so the LLM can handle
// partial failures gracefully (same behaviour as the Temporal parallel branch).
Expand Down
5 changes: 3 additions & 2 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ type AgentTools struct {

// AgentSession is conversation storage and how many messages to include in LLM context.
type AgentSession struct {
Conversation interfaces.Conversation
ConversationSize int
Conversation interfaces.Conversation
ConversationSize int
ConversationSaveOnIteration bool
}

// AgentLimits caps iteration and wall-clock behavior for this run.
Expand Down
40 changes: 27 additions & 13 deletions internal/runtime/temporal/agent_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,18 @@ func (rt *TemporalRuntime) AgentWorkflow(ctx workflow.Context, input AgentWorkfl

messages = append(messages, toolResults...)

if rt.conversationMemoryEnabled(input.ConversationID) && rt.AgentExecution.Session.ConversationSaveOnIteration && len(messages) > 0 {
if err := workflow.ExecuteActivity(convCtx, rt.AddConversationMessagesActivity, AddConversationMessagesInput{
ConversationID: input.ConversationID,
Messages: messages,
AgentFingerprint: input.AgentFingerprint,
}).Get(convCtx, nil); err != nil {
logger.Warn("workflow: persist conversation failed", "scope", "workflow", "conversationID", input.ConversationID, "messagesCount", len(messages), "error", err)
} else {
messages = []interfaces.Message{}
}
}

// History-driven ContinueAsNew (same iteration boundary as tool results). Skipped when the LLM
// returns no tools (final answer path breaks earlier in the loop).
info := workflow.GetInfo(ctx)
Expand All @@ -644,17 +656,15 @@ func (rt *TemporalRuntime) AgentWorkflow(ctx workflow.Context, input AgentWorkfl
}
}

// Add all accumulated messages to conversation after execution completes (only when conversationID set)
if input.ConversationID != "" {
if len(messages) == 0 {
logger.Debug("workflow: no conversation messages to persist", "scope", "workflow", "conversationID", input.ConversationID)
} else {
if err := workflow.ExecuteActivity(convCtx, rt.AddConversationMessagesActivity, AddConversationMessagesInput{
ConversationID: input.ConversationID,
Messages: messages,
AgentFingerprint: input.AgentFingerprint,
}).Get(convCtx, nil); err != nil {
logger.Warn("workflow: persist conversation failed", "scope", "workflow", "conversationID", input.ConversationID, "messagesCount", len(messages), "error", err)
// Persist unsaved workflow messages. Flag off: full batch. Flag on: per-iteration saves cleared state; only the final assistant may remain.
if rt.conversationMemoryEnabled(input.ConversationID) && len(messages) > 0 {
if err := workflow.ExecuteActivity(convCtx, rt.AddConversationMessagesActivity, AddConversationMessagesInput{
ConversationID: input.ConversationID,
Messages: messages,
AgentFingerprint: input.AgentFingerprint,
}).Get(convCtx, nil); err != nil {
logger.Warn("workflow: persist conversation failed", "scope", "workflow", "conversationID", input.ConversationID, "messagesCount", len(messages), "error", err)
if !rt.AgentExecution.Session.ConversationSaveOnIteration {
return nil, err
}
}
Expand All @@ -667,6 +677,10 @@ func (rt *TemporalRuntime) AgentWorkflow(ctx workflow.Context, input AgentWorkfl
}, nil
}

func (rt *TemporalRuntime) conversationMemoryEnabled(conversationID string) bool {
return conversationID != "" && rt.AgentExecution.Session.Conversation != nil
}

// newAgentToolCallInput builds activity contexts for one tool-call branch.
// parallelSlot must be unique across concurrent tools (e.g. index string); use empty when calls run sequentially.
func (rt *TemporalRuntime) newAgentToolCallInput(
Expand Down Expand Up @@ -917,7 +931,7 @@ func (rt *TemporalRuntime) AgentLLMStreamActivity(ctx context.Context, input Age
agentName := strings.TrimSpace(input.AgentName)

messages := input.Messages
if input.ConversationID != "" {
if rt.conversationMemoryEnabled(input.ConversationID) {
convMessages, err := rt.FetchConversationMessages(ctx, actLog, input.ConversationID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -963,7 +977,7 @@ func (rt *TemporalRuntime) AgentLLMActivity(ctx context.Context, input AgentLLMI
agentName := strings.TrimSpace(input.AgentName)

messages := input.Messages
if input.ConversationID != "" {
if rt.conversationMemoryEnabled(input.ConversationID) {
convMessages, err := rt.FetchConversationMessages(ctx, actLog, input.ConversationID)
if err != nil {
return nil, err
Expand Down
11 changes: 8 additions & 3 deletions internal/runtime/temporal/agent_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ func TestAgentLLMActivity_ConversationNotConfigured(t *testing.T) {
defer ctrl.Finish()

mockLLM := mocks.NewMockLLMClient(ctrl)
mockLLM.EXPECT().GetModel().Return("test-model").AnyTimes()
mockLLM.EXPECT().GetProvider().Return(interfaces.LLMProviderOpenAI).AnyTimes()
mockLLM.EXPECT().Generate(gomock.Any(), gomock.Any()).Return(&interfaces.LLMResponse{Content: "ok"}, nil)

rt := &TemporalRuntime{
Runtime: base.Runtime{
Expand All @@ -367,12 +370,14 @@ func TestAgentLLMActivity_ConversationNotConfigured(t *testing.T) {

actEnv := newActivityTestEnv(t)
actEnv.RegisterActivity(rt.AgentLLMActivity)
_, err := actEnv.ExecuteActivity(rt.AgentLLMActivity, AgentLLMInput{
val, err := actEnv.ExecuteActivity(rt.AgentLLMActivity, AgentLLMInput{
ConversationID: "any",
Messages: []interfaces.Message{{Role: interfaces.MessageRoleUser, Content: "x"}},
})
require.Error(t, err)
require.Contains(t, err.Error(), "conversation is not configured")
require.NoError(t, err)
var result AgentLLMResult
require.NoError(t, val.Get(&result))
require.Equal(t, "ok", result.Content)
}

func TestAgentLLMStreamActivity_MockLLM_FallbackToGenerate(t *testing.T) {
Expand Down
Loading
Loading