Durable Execution with Temporal
Note
Durable execution support is in beta and the public interface is subject to change based on user feedback. We expect it to be stable by the release of Pydantic AI v1 at the end of August. Questions and feedback are welcome in GitHub issues and the #pydantic-ai
Slack channel.
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for streaming and MCP, with the added benefit of fault tolerance.
Temporal is a popular durable execution platform that's natively supported by Pydantic AI. The integration only uses Pydantic AI's public interface, so it can also serve as a reference for how to integrate with another durable execution systems.
Durable Execution
In Temporal's durable execution implementation, a program that crashes or encounters an exception while interacting with a model or API will retry until it can successfully complete.
Temporal relies primarily on a replay mechanism to recover from failures. As the program makes progress, Temporal saves key inputs and decisions, allowing a re-started program to pick up right where it left off.
The key to making this work is to separate the application's repeatable (deterministic) and non-repeatable (non-deterministic) parts:
- Deterministic pieces, termed workflows, execute the same way when re-run with the same inputs.
- Non-deterministic pieces, termed activities, can run arbitrary code, performing I/O and any other operations.
Workflow code can run for extended periods and, if interrupted, resume exactly where it left off. Critically, workflow code generally cannot include any kind of I/O, over the network, disk, etc. Activity code faces no restrictions on I/O or external interactions, but if an activity fails part-way through it is restarted from the beginning.
Note
If you are familiar with celery, it may be helpful to think of Temporal activities as similar to celery tasks, but where you wait for the task to complete and obtain its result before proceeding to the next step in the workflow. However, Temporal workflows and activities offer a great deal more flexibility and functionality than celery tasks.
See the Temporal documentation for more information
In the case of Pydantic AI agents, integration with Temporal means that model requests, tool calls that may require I/O, and MCP server communication all need to be offloaded to Temporal activities due to their I/O requirements, while the logic that coordinates them (i.e. the agent run) lives in the workflow. Code that handles a scheduled job or web request can then execute the workflow, which will in turn execute the activities as needed.
The diagram below shows the overall architecture of an agentic application in Temporal. The Temporal Server is responsible for tracking program execution and making sure the associated state is preserved reliably (i.e., stored to an internal database, and possibly replicated across cloud regions). Temporal Server manages data in encrypted form, so all data processing occurs on the Worker, which runs the workflow and activities.
+---------------------+
| Temporal Server | (Stores workflow state,
+---------------------+ schedules activities,
^ persists progress)
|
Save state, | Schedule Tasks,
progress, | load state on resume
timeouts |
|
+------------------------------------------------------+
| Worker |
| +----------------------------------------------+ |
| | Workflow Code | |
| | (Agent Run Loop) | |
| +----------------------------------------------+ |
| | | | |
| v v v |
| +-----------+ +------------+ +-------------+ |
| | Activity | | Activity | | Activity | |
| | (Tool) | | (MCP Tool) | | (Model API) | |
| +-----------+ +------------+ +-------------+ |
| | | | |
+------------------------------------------------------+
| | |
v v v
[External APIs, services, databases, etc.]
See the Temporal documentation for more information.
Durable Agent
Any agent can be wrapped in a TemporalAgent
to get a durable agent that can be used inside a deterministic Temporal workflow, by automatically offloading all work that requires I/O (namely model requests, tool calls, and MCP server communication) to non-deterministic activities.
At the time of wrapping, the agent's model and toolsets (including function tools registered on the agent and MCP servers) are frozen, activities are dynamically created for each, and the original model and toolsets are wrapped to call on the worker to execute the corresponding activities instead of directly performing the actions inside the workflow. The original agent can still be used as normal outside the Temporal workflow, but any changes to its model or toolsets after wrapping will not be reflected in the durable agent.
Here is a simple but complete example of wrapping an agent for durable execution, creating a Temporal workflow with durable execution logic, connecting to a Temporal server, and running the workflow from non-durable code. All it requires is a Temporal server to be running locally:
brew install temporal
temporal server start-dev
import uuid
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
from pydantic_ai import Agent
from pydantic_ai.durable_exec.temporal import AgentPlugin, PydanticAIPlugin, TemporalAgent
agent = Agent(
'gpt-5',
instructions="You're an expert in geography.",
name='geography', # (10)!
)
temporal_agent = TemporalAgent(agent) # (1)!
@workflow.defn
class GeographyWorkflow: # (2)!
@workflow.run
async def run(self, prompt: str) -> str:
result = await temporal_agent.run(prompt) # (3)!
return result.output
async def main():
client = await Client.connect( # (4)!
'localhost:7233', # (5)!
plugins=[PydanticAIPlugin()], # (6)!
)
async with Worker( # (7)!
client,
task_queue='geography',
workflows=[GeographyWorkflow],
plugins=[AgentPlugin(temporal_agent)], # (8)!
):
output = await client.execute_workflow( # (9)!
GeographyWorkflow.run,
args=['What is the capital of Mexico?'],
id=f'geography-{uuid.uuid4()}',
task_queue='geography',
)
print(output)
#> Mexico City (Ciudad de México, CDMX)
- The original
Agent
cannot be used inside a deterministic Temporal workflow, but theTemporalAgent
can. - As explained above, the workflow represents a deterministic piece of code that can use non-deterministic activities for operations that require I/O.
TemporalAgent.run()
works just likeAgent.run()
, but it will automatically offload model requests, tool calls, and MCP server communication to Temporal activities.- We connect to the Temporal server which keeps track of workflow and activity execution.
- This assumes the Temporal server is running locally.
- The
PydanticAIPlugin
tells Temporal to use Pydantic for serialization and deserialization, and to treatUserError
exceptions as non-retryable. - We start the worker that will listen on the specified task queue and run workflows and activities. In a real world application, this might be run in a separate service.
- The
AgentPlugin
registers theTemporalAgent
's activities with the worker. - We call on the server to execute the workflow on a worker that's listening on the specified task queue.
- The agent's
name
is used to uniquely identify its activities.
(This example is complete, it can be run "as is" — you'll need to add asyncio.run(main())
to run main
)
In a real world application, the agent, workflow, and worker are typically defined separately from the code that calls for a workflow to be executed.
Because Temporal workflows need to be defined at the top level of the file and the TemporalAgent
instance is needed inside the workflow and when starting the worker (to register the activities), it needs to be defined at the top level of the file as well.
For more information on how to use Temporal in Python applications, see their Python SDK guide.
Temporal Integration Considerations
There are a few considerations specific to agents and toolsets when using Temporal for durable execution. These are important to understand to ensure that your agents and toolsets work correctly with Temporal's workflow and activity model.
Agent and Toolset Requirements
To ensure that Temporal knows what code to run when an activity fails or is interrupted and then restarted, even if your code is changed in between, each activity needs to have a name that's stable and unique.
When TemporalAgent
dynamically creates activities for the wrapped agent's model requests and toolsets (specifically those that implement their own tool listing and calling, i.e. FunctionToolset
and MCPServer
), their names are derived from the agent's name
and the toolsets' id
s. These fields are normally optional, but are required to be set when using Temporal. They should not be changed once the durable agent has been deployed to production as this would break active workflows.
Other than that, any agent and toolset will just work!
Agent Run Context and Dependencies
As workflows and activities run in separate processes, any values passed between them need to be serializable. As these payloads are stored in the workflow execution event history, Temporal limits their size to 2MB.
To account for these limitations, tool functions and the event stream handler running inside activities receive a limited version of the agent's RunContext
, and it's your responsibility to make sure that the dependencies object provided to TemporalAgent.run()
can be serialized using Pydantic.
Specifically, only the deps
, retries
, tool_call_id
, tool_name
, retry
, and run_step
fields are available by default, and trying to access model
, usage
, prompt
, messages
, or tracer
will raise an error.
If you need one or more of these attributes to be available inside activities, you can create a TemporalRunContext
subclass with custom serialize_run_context
and deserialize_run_context
class methods and pass it to TemporalAgent
as run_context_type
.
Streaming
Because Temporal activities cannot stream output directly to the activity call site, Agent.run_stream()
and Agent.iter()
are not supported.
Instead, you can implement streaming by setting an event_stream_handler
on the Agent
or TemporalAgent
instance and using TemporalAgent.run()
inside the workflow.
The event stream handler function will receive the agent run context and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the streaming docs.
As the streaming model request activity, workflow, and workflow execution call all take place in separate processes, passing data between them requires some care:
- To get data from the workflow call site or workflow to the event stream handler, you can use a dependencies object.
- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from, like a message queue. You can use the dependency object to make sure the same connection string or other unique ID is available in all the places that need it.
Activity Configuration
Temporal activity configuration, like timeouts and retry policies, can be customized by passing temporalio.workflow.ActivityConfig
objects to the TemporalAgent
constructor:
activity_config
: The base Temporal activity config to use for all activities. If no config is provided, astart_to_close_timeout
of 60 seconds is used.model_activity_config
: The Temporal activity config to use for model request activities. This is merged with the base activity config.toolset_activity_config
: The Temporal activity config to use for get-tools and call-tool activities for specific toolsets identified by ID. This is merged with the base activity config.-
tool_activity_config
: The Temporal activity config to use for specific tool call activities identified by toolset ID and tool name. This is merged with the base and toolset-specific activity configs.If a tool does not use I/O, you can specify
False
to disable using an activity. Note that the tool is required to be defined as anasync
function as non-async tools are run in threads which are non-deterministic and thus not supported outside of activities.
Activity Retries
On top of the automatic retries for request failures that Temporal will perform, Pydantic AI and various provider API clients also have their own request retry logic. Enabling these at the same time may cause the request to be retried more often than expected, with improper Retry-After
handling.
When using Temporal, it's recommended to not use HTTP Request Retries and to turn off your provider API client's own retry logic, for example by setting max_retries=0
on a custom OpenAIProvider
API client.
You can customize Temporal's retry policy using activity configuration.
Observability with Logfire
Temporal generates telemetry events and metrics for each workflow and activity execution, and Pydantic AI generates events for each agent run, model request and tool call. These can be sent to Pydantic Logfire to get a complete picture of what's happening in your application.
To use Logfire with Temporal, you need to pass a LogfirePlugin
object to Temporal's Client.connect()
:
from pydantic_ai.durable_exec.temporal import PydanticAIPlugin, LogfirePlugin
from temporalio.client import Client
async def main():
client = await Client.connect(
'localhost:7233',
plugins=[PydanticAIPlugin(), LogfirePlugin()],
)
By default, the LogfirePlugin
will instrument Temporal (including metrics) and Pydantic AI and send all data to Logfire. To customize Logfire configuration and instrumentation, you can pass a logfire_setup
function to the LogfirePlugin
constructor and return a custom Logfire
instance (i.e. the result of logfire.configure()
). To disable sending Temporal metrics to Logfire, you can pass metrics=False
to the LogfirePlugin
constructor.
Known Issues
Pandas
When logfire.info
is used inside an activity and the pandas
package is among your project's dependencies, you may encounter the following error which seems to be the result of an import race condition:
AttributeError: partially initialized module 'pandas' has no attribute '_pandas_parser_CAPI' (most likely due to a circular import)
To fix this, you can use the temporalio.workflow.unsafe.imports_passed_through()
context manager to proactively import the package and not have it be reloaded in the workflow sandbox:
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pandas