Beta Graph API
Beta API
This is the new beta graph API. It provides enhanced capabilities for parallel execution, conditional branching, and complex workflows.
The original graph API is still available (and compatible of interop with the new beta API) and is documented in the main graph documentation.
Overview
The beta graph API in pydantic-graph provides a powerful builder pattern for constructing parallel execution graphs with:
- Step nodes for executing async functions
- Decision nodes for conditional branching
- Spread operations for parallel processing of iterables
- Broadcast operations for sending the same data to multiple parallel paths
- Join nodes and Reducers for aggregating results from parallel execution
This API is designed for advanced workflows where you want declarative control over parallelism, routing, and data aggregation.
Installation
The beta graph API is included with pydantic-graph:
pip install pydantic-graph
Or as part of pydantic-ai:
pip install pydantic-ai
Quick Start
Here's a simple example to get you started:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class CounterState:
"""State for tracking a counter value."""
value: int = 0
async def main():
# Create a graph builder with state and output types
g = GraphBuilder(state_type=CounterState, output_type=int)
# Define steps using the decorator
@g.step
async def increment(ctx: StepContext[CounterState, None, None]) -> int:
"""Increment the counter and return its value."""
ctx.state.value += 1
return ctx.state.value
@g.step
async def double_it(ctx: StepContext[CounterState, None, int]) -> int:
"""Double the input value."""
return ctx.inputs * 2
# Add edges connecting the nodes
g.add(
g.edge_from(g.start_node).to(increment),
g.edge_from(increment).to(double_it),
g.edge_from(double_it).to(g.end_node),
)
# Build and run the graph
graph = g.build()
state = CounterState()
result = await graph.run(state=state)
print(f'Result: {result}')
#> Result: 2
print(f'Final state: {state.value}')
#> Final state: 1
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
Key Concepts
GraphBuilder
The GraphBuilder is the main entry point for constructing graphs. It's generic over:
StateT- The type of mutable state shared across all nodesDepsT- The type of dependencies injected into nodesInputT- The type of initial input to the graphOutputT- The type of final output from the graph
Steps
Steps are async functions decorated with @g.step that define the actual work to be done in each node. They receive a StepContext with access to:
ctx.state- The mutable graph statectx.deps- Injected dependenciesctx.inputs- Input data for this step
Edges
Edges define the connections between nodes. The builder provides multiple ways to create edges:
g.add()- Add one or more edge pathsg.add_edge()- Add a simple edge between two nodesg.edge_from()- Start building a complex edge path
Start and End Nodes
Every graph has:
g.start_node- The entry point receiving initial inputsg.end_node- The exit point producing final outputs
A More Complex Example
Here's an example showcasing parallel execution with a map operation:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append
@dataclass
class ProcessingState:
"""State for tracking processing metrics."""
items_processed: int = 0
async def main():
g = GraphBuilder(
state_type=ProcessingState,
input_type=list[int],
output_type=list[int],
)
@g.step
async def square(ctx: StepContext[ProcessingState, None, int]) -> int:
"""Square a number and track that we processed it."""
ctx.state.items_processed += 1
return ctx.inputs * ctx.inputs
# Create a join to collect results
collect_results = g.join(reduce_list_append, initial_factory=list[int])
# Build the graph with map operation
g.add(
g.edge_from(g.start_node).map().to(square),
g.edge_from(square).to(collect_results),
g.edge_from(collect_results).to(g.end_node),
)
graph = g.build()
state = ProcessingState()
result = await graph.run(state=state, inputs=[1, 2, 3, 4, 5])
print(f'Results: {sorted(result)}')
#> Results: [1, 4, 9, 16, 25]
print(f'Items processed: {state.items_processed}')
#> Items processed: 5
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
In this example:
- The start node receives a list of integers
- The
.map()operation fans out each item to a separate parallel execution of thesquarestep - All results are collected back together using
reduce_list_append - The joined results flow to the end node
Next Steps
Explore the detailed documentation for each feature:
- Steps - Learn about step nodes and execution contexts
- Joins - Understand join nodes and reducer patterns
- Decisions - Implement conditional branching
- Parallel Execution - Master broadcasting and mapping
Advanced Execution Control
Beyond the basic graph.run() method, the beta API provides fine-grained control over graph execution.
Step-by-Step Execution
Use graph.iter() to execute the graph one step at a time:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class CounterState:
value: int = 0
async def main():
g = GraphBuilder(state_type=CounterState, output_type=int)
@g.step
async def increment(ctx: StepContext[CounterState, None, None]) -> int:
ctx.state.value += 1
return ctx.state.value
@g.step
async def double_it(ctx: StepContext[CounterState, None, int]) -> int:
return ctx.inputs * 2
g.add(
g.edge_from(g.start_node).to(increment),
g.edge_from(increment).to(double_it),
g.edge_from(double_it).to(g.end_node),
)
graph = g.build()
state = CounterState()
# Use iter() for step-by-step execution
async with graph.iter(state=state) as graph_run:
print(f'Initial state: {state.value}')
#> Initial state: 0
# Advance execution step by step
async for event in graph_run:
print(f'{state.value=} | {event=}')
#> state.value=0 | event=[GraphTask(node_id='increment', inputs=None)]
#> state.value=1 | event=[GraphTask(node_id='double_it', inputs=1)]
#> state.value=1 | event=[GraphTask(node_id='__end__', inputs=2)]
#> state.value=1 | event=EndMarker(_value=2)
if graph_run.output is not None:
print(f'Final output: {graph_run.output}')
#> Final output: 2
break
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
The GraphRun object provides:
- Async iteration: Iterate through execution events
next_taskproperty: Inspect upcoming tasksoutputproperty: Check if the graph has completed and get the final outputnext()method: Manually advance execution with optional value injection
Visualizing Graphs
Generate Mermaid diagrams of your graph structure using graph.render():
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class SimpleState:
pass
g = GraphBuilder(state_type=SimpleState, output_type=str)
@g.step
async def step_a(ctx: StepContext[SimpleState, None, None]) -> int:
return 10
@g.step
async def step_b(ctx: StepContext[SimpleState, None, int]) -> str:
return f'Result: {ctx.inputs}'
g.add(
g.edge_from(g.start_node).to(step_a),
g.edge_from(step_a).to(step_b),
g.edge_from(step_b).to(g.end_node),
)
graph = g.build()
# Generate a Mermaid diagram
mermaid_diagram = graph.render(title='My Graph', direction='LR')
print(mermaid_diagram)
"""
---
title: My Graph
---
stateDiagram-v2
direction LR
step_a
step_b
[*] --> step_a
step_a --> step_b
step_b --> [*]
"""
The rendered diagram can be displayed in documentation, notebooks, or any tool that supports Mermaid syntax.
Comparison with Original API
The original graph API (documented in the main graph page) uses a class-based approach with BaseNode subclasses. The beta API uses a builder pattern with decorated functions, which provides:
Advantages: - More concise syntax for simple workflows - Explicit control over parallelism with map/broadcast - Built-in reducers for common aggregation patterns - Easier to visualize complex data flows
Trade-offs: - Requires understanding of builder patterns - Less object-oriented, more functional style
Both APIs are fully supported and can even be integrated together when needed.