Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/wayflowcore/requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
sphinx==8.1.3
sphinx-autodoc-typehints==3.0.1
sphinx-substitution-extensions==2022.2.16
sphinx-tabs==3.4.5
sphinx-copybutton==0.5.2
Expand Down
15 changes: 15 additions & 0 deletions docs/wayflowcore/source/core/api/conversation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ Base class for conversations. Can manipulate a conversation object, and can be s
.. _conversation:
.. autoclass:: wayflowcore.conversation.Conversation

Checkpointing
-------------

.. _conversationcheckpoint:
.. autoclass:: wayflowcore.checkpointing.checkpointer.ConversationCheckpoint

.. _checkpointinginterval:
.. autoclass:: wayflowcore.checkpointing.checkpointer.CheckpointingInterval

.. _storageconfig:
.. autoclass:: wayflowcore.checkpointing.checkpointer.StorageConfig

.. _checkpointer:
.. autoclass:: wayflowcore.checkpointing.checkpointer.Checkpointer

Execution Plan
--------------

Expand Down
11 changes: 10 additions & 1 deletion docs/wayflowcore/source/core/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,23 @@ New features
For more information read the :doc:`API Reference on LLM models <api/llmmodels>` and the guide on
:doc:`how to use LLMs from different providers <howtoguides/llm_from_different_providers>`.


* **Logprob support in `LlmGenerationConfig` and `PromptExecutionStep`**

Add per-token log-probabilities support with the ``top_logprobs`` generation config parameter and support returning
per-token log-probabilities in the ``PromptExecutionStep``.
For more information please read the guide on :ref:`How to request per-token log-probabilities <request_logprobs>`

* **First-class conversation checkpointing**

Added shared conversation checkpointing for Agents, Flows, Swarms, ManagerWorkers, and A2A agents through
``ConversationCheckpoint``, ``Checkpointer``, ``InMemoryCheckpointer``, ``PostgresCheckpointer``, and
``OracleDatabaseCheckpointer``. Conversations can now resume from ``conversation_id``, load specific checkpoints for
time-travel debugging, and choose checkpoint save frequency with ``CheckpointingInterval``.

The OpenAI Responses server path now uses this shared checkpointing subsystem as well, so persisted
``previous_response_id`` and ``conversation`` behavior is handled through the same checkpoint model.

For more information, see :doc:`how to checkpoint and resume conversations <howtoguides/howto_checkpointing>`.

Improvements
^^^^^^^^^^^^
Expand Down
69 changes: 69 additions & 0 deletions docs/wayflowcore/source/core/code_examples/howto_checkpointing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright © 2025, 2026 Oracle and/or its affiliates.
#
# This software is under the Apache License 2.0
# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.

# isort:skip_file
# fmt: off
# mypy: ignore-errors
# docs-title: Code Example - How to Checkpoint and Resume Conversations

# .. start-##_Configure_your_LLM
from wayflowcore.models import VllmModel

llm = VllmModel(
model_id="LLAMA_MODEL_ID",
host_port="LLAMA_API_URL",
)
# .. end-##_Configure_your_LLM

llm: VllmModel # docs-skiprow
(llm,) = _update_globals(["llm_small"]) # docs-skiprow # type: ignore

# .. start-##_Start_a_checkpointed_conversation
from wayflowcore import Agent
from wayflowcore.checkpointing import InMemoryCheckpointer

agent = Agent(llm=llm, agent_id="support-agent")
checkpointer = InMemoryCheckpointer()

conversation = agent.start_conversation(
conversation_id="support-thread-1",
checkpointer=checkpointer,
)

status = conversation.execute()
# .. end-##_Start_a_checkpointed_conversation

# .. start-##_Resume_the_latest_checkpoint
restored_conversation = agent.start_conversation(
conversation_id="support-thread-1",
checkpointer=checkpointer,
)

restored_conversation.append_user_message("Continue from where you left off.")
status = restored_conversation.execute()
# .. end-##_Resume_the_latest_checkpoint

# .. start-##_Load_a_specific_checkpoint
checkpoints = checkpointer.list_checkpoints("support-thread-1")

previous_checkpoint = checkpoints[-2]
rewound_conversation = agent.start_conversation(
conversation_id="support-thread-1",
checkpoint_id=previous_checkpoint.checkpoint_id,
checkpointer=checkpointer,
)

rewound_conversation.append_user_message("Try a different path from here.")
status = rewound_conversation.execute()
# .. end-##_Load_a_specific_checkpoint

# .. start-##_Control_checkpoint_frequency
from wayflowcore.checkpointing import CheckpointingInterval, InMemoryCheckpointer

checkpointer = InMemoryCheckpointer(
checkpointing_interval=CheckpointingInterval.ALL_INTERNAL_TURNS,
)
# .. end-##_Control_checkpoint_frequency
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

def store_conversation(path: str, conversation: Conversation) -> str:
"""Store the given conversation and return the conversation id."""
conversation_id = conversation.conversation_id
conversation_id = conversation.id
serialized_conversation = serialize(conversation)

# Read existing data
Expand Down Expand Up @@ -87,7 +87,7 @@ def load_conversation(path: str, conversation_id: str) -> Conversation:
# .. start-##_Run_the_agent
# Start a conversation
conversation = assistant.start_conversation()
conversation_id = conversation.conversation_id
conversation_id = conversation.id
print(f"1. Started conversation with ID: {conversation_id}")

# Execute initial greeting
Expand Down Expand Up @@ -155,7 +155,7 @@ def load_conversation(path: str, conversation_id: str) -> Conversation:
# .. end-##_Creating_a_flow
# .. start-##_Run_the_flow
flow_conversation = simple_flow.start_conversation()
flow_id = flow_conversation.conversation_id
flow_id = flow_conversation.id
print(f"1. Started flow conversation with ID: {flow_id}")

# Execute until user input is needed
Expand Down Expand Up @@ -208,7 +208,7 @@ def run_persistent_agent(assistant: Agent, store_path: str, conversation_id: str
conversation = assistant.start_conversation()
else:
conversation = assistant.start_conversation()
print(f"Started new conversation {conversation.conversation_id}")
print(f"Started new conversation {conversation.id}")

# Main conversation loop
while True:
Expand Down
121 changes: 121 additions & 0 deletions docs/wayflowcore/source/core/howtoguides/howto_checkpointing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
.. _top-howtocheckpointing:

==========================================
How to Checkpoint and Resume Conversations
==========================================

.. admonition:: Prerequisites

This guide assumes familiarity with:

- :doc:`Agents <../tutorials/basic_agent>`
- :doc:`Flows <../tutorials/basic_flow>`
- :doc:`Serve Agents with WayFlow <howto_serve_agents>`

WayFlow can now checkpoint the runtime state of a conversation and restore it later by
conversation id. This is useful when you want to:

- resume after a crash or restart
- pause and continue a long-running workflow
- inspect prior checkpoints for debugging
- reload an earlier state and branch from it


Choose a checkpointer
=====================

WayFlow exposes a shared checkpointing subsystem in ``wayflowcore.checkpointing``.
You can use:

- ``InMemoryCheckpointer`` for tests and local experimentation
- ``PostgresCheckpointer`` for PostgreSQL-backed persistence
- ``OracleDatabaseCheckpointer`` for Oracle-backed persistence

All checkpointers share the same API for saving, loading, listing, and deleting checkpoints.


Start a checkpointed conversation
=================================

Attach a checkpointer when you start the conversation. ``conversation_id`` becomes the durable key
used to look up the conversation later. For persistent checkpointers such as PostgreSQL or Oracle
Database, construct the agent, flow, or other top-level component with a stable ``id`` or
component-specific id alias, such as ``agent_id`` for ``Agent``. The same component id must be used
after a process restart to restore the checkpoint safely.

.. literalinclude:: ../code_examples/howto_checkpointing.py
:language: python
:start-after: .. start-##_Start_a_checkpointed_conversation
:end-before: .. end-##_Start_a_checkpointed_conversation

Once checkpointing is enabled, WayFlow saves the root conversation automatically at the configured
checkpoint boundaries.


Resume the latest checkpoint
============================

To restore the latest saved state, call ``start_conversation()`` again with the same
``conversation_id``
and checkpointer.

.. literalinclude:: ../code_examples/howto_checkpointing.py
:language: python
:start-after: .. start-##_Resume_the_latest_checkpoint
:end-before: .. end-##_Resume_the_latest_checkpoint

If no checkpoint exists for that id, WayFlow creates a new conversation instead.


Load a specific checkpoint
==========================

You can inspect checkpoint history and reload an older checkpoint for replay or time-travel
debugging.

.. literalinclude:: ../code_examples/howto_checkpointing.py
:language: python
:start-after: .. start-##_Load_a_specific_checkpoint
:end-before: .. end-##_Load_a_specific_checkpoint

``list_checkpoints()`` returns ordered checkpoint metadata, including the checkpoint id,
creation timestamp, and save metadata recorded at the boundary.


Control checkpoint frequency
============================

Use ``CheckpointingInterval`` to decide how often WayFlow should persist state.

.. literalinclude:: ../code_examples/howto_checkpointing.py
:language: python
:start-after: .. start-##_Control_checkpoint_frequency
:end-before: .. end-##_Control_checkpoint_frequency

The available options are:

- ``CONVERSATION_TURNS``: save after the outermost ``conversation.execute()`` call returns
- ``LLM_TURNS``: also save at internal turn boundaries after turns that used an LLM
- ``ALL_INTERNAL_TURNS``: also save at every internal agent/flow turn boundary

Saving more frequently improves restart fidelity, but it also increases write volume.


Use checkpointing with the OpenAI Responses server
==================================================

The OpenAI Responses server path now uses the shared checkpointing subsystem behind
``ServerStorageConfig``. That means the existing OpenAI-compatible features such as
``previous_response_id``, ``conversation``, ``get_response()``, ``delete_response()``,
and ``store=False`` all run through the same shared checkpoint model.

If you are serving agents, keep using :doc:`Serve Agents with WayFlow <howto_serve_agents>` to
configure the storage backend. The server will use the matching shared checkpointer internally.


Next steps
==========

- :doc:`Serialize and Deserialize Conversations <howto_serialize_conversations>`
- :doc:`Serve Agents with WayFlow <howto_serve_agents>`
- :doc:`Build a Swarm of Agents <howto_swarm>`
1 change: 1 addition & 0 deletions docs/wayflowcore/source/core/howtoguides/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ These guides demonstrate how to configure the components of assistants built wit
:maxdepth: 1

Load and Execute an Agent Spec Configuration <howto_execute_agentspec_with_wayflowcore>
Checkpoint and Resume Conversations <howto_checkpointing>
Serialize and Deserialize Flows and Agents <howto_serdeser>
Serialize and Deserialize Conversations <howto_serialize_conversations>
Build a New WayFlow Component <howto_plugins>
Expand Down
1 change: 1 addition & 0 deletions wayflowcore/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# For docs
sphinx==8.1.3
sphinx-autodoc-typehints==3.0.1
sphinx-substitution-extensions==2022.2.16
sphinx-tabs==3.4.5
sphinx-copybutton==0.5.2
Expand Down
53 changes: 41 additions & 12 deletions wayflowcore/src/wayflowcore/a2a/a2aagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from wayflowcore.tools import Tool

if TYPE_CHECKING:
from wayflowcore.checkpointing import Checkpointer
from wayflowcore.executors._a2aagentconversation import A2AAgentConversation

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -248,43 +249,71 @@ def start_conversation(
self,
inputs: Optional[Dict[str, Any]] = None,
messages: Union[None, str, Message, List[Message], MessageList] = None,
conversation_id: Optional[str] = None,
*,
checkpointer: Optional["Checkpointer"] = None,
checkpoint_id: Optional[str] = None,
_root_conversation_id: Optional[str] = None,
_attach_checkpointer: bool = True,
) -> "A2AAgentConversation":
"""
Initiates a new conversation with the remote server agent.

Creates and returns a conversation instance tied to this agent, optionally initialized
with input data and a message history.
Start a conversation with the remote A2A agent.

Parameters
----------
inputs:
Optional dictionary of initial input data for the conversation. Defaults to an empty
dictionary if not provided.
Optional structured inputs stored on the conversation for interface compatibility.
The A2A runtime currently executes from messages rather than these inputs.
messages:
Optional initial message list for the conversation. Can be either a ``MessageList``
or a list of ``Message`` objects. Defaults to an empty ``MessageList`` if not provided.
Optional initial message history for the remote conversation.
conversation_id:
Optional identifier for this A2A conversation.
checkpointer:
Optional checkpoint backend used to restore and persist this conversation.
checkpoint_id:
Optional checkpoint identifier to restore. Requires ``checkpointer``.
_root_conversation_id:
Internal lineage identifier shared with nested or parent conversations.

Returns
-------
Conversation:
A new conversation object associated with this agent.
Conversation
A new or restored A2A agent conversation.
"""
from wayflowcore.executors._a2aagentconversation import A2AAgentConversation
from wayflowcore.executors._a2aagentexecutor import A2AAgentState

restored_conversation, conversation_runtime_id, conversation_root_id = (
self._prepare_conversation_start(
inputs=inputs,
messages=messages,
conversation_id=conversation_id,
checkpointer=checkpointer,
checkpoint_id=checkpoint_id,
_root_conversation_id=_root_conversation_id,
expected_conversation_type=A2AAgentConversation,
attach_checkpointer=_attach_checkpointer,
)
)
if restored_conversation is not None:
return restored_conversation

if not isinstance(messages, MessageList):
messages = MessageList.from_messages(messages=messages)

return A2AAgentConversation(
conversation = A2AAgentConversation(
component=self,
state=A2AAgentState(last_message_idx=-1),
inputs=inputs or {}, # Inputs are ignored in execution
message_list=messages,
status=None,
conversation_id=IdGenerator.get_or_generate_id(None),
id=conversation_runtime_id,
checkpointer=checkpointer,
name="a2a_conversation",
root_conversation_id=conversation_root_id,
__metadata_info__={},
)
return conversation

@property
def agent_id(self) -> str:
Expand Down
Loading