Steps
Steps are the fundamental units of work in a graph. They're async functions that receive a StepContext and return a value.
Creating Steps
Steps are created using the @g.step decorator on the GraphBuilder:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class MyState:
counter: int = 0
g = GraphBuilder(state_type=MyState, output_type=int)
@g.step
async def increment(ctx: StepContext[MyState, None, None]) -> int:
ctx.state.counter += 1
return ctx.state.counter
g.add(
g.edge_from(g.start_node).to(increment),
g.edge_from(increment).to(g.end_node),
)
graph = g.build()
async def main():
state = MyState()
result = await graph.run(state=state)
print(result)
#> 1
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
Step Context
Every step function receives a StepContext as its first parameter. The context provides access to:
ctx.state- The mutable graph state (type:StateT)ctx.deps- Injected dependencies (type:DepsT)ctx.inputs- Input data for this step (type:InputT)
Accessing State
State is shared across all steps in a graph and can be freely mutated:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class AppState:
messages: list[str]
async def main():
g = GraphBuilder(state_type=AppState, output_type=list[str])
@g.step
async def add_hello(ctx: StepContext[AppState, None, None]) -> None:
ctx.state.messages.append('Hello')
@g.step
async def add_world(ctx: StepContext[AppState, None, None]) -> None:
ctx.state.messages.append('World')
@g.step
async def get_messages(ctx: StepContext[AppState, None, None]) -> list[str]:
return ctx.state.messages
g.add(
g.edge_from(g.start_node).to(add_hello),
g.edge_from(add_hello).to(add_world),
g.edge_from(add_world).to(get_messages),
g.edge_from(get_messages).to(g.end_node),
)
graph = g.build()
state = AppState(messages=[])
result = await graph.run(state=state)
print(result)
#> ['Hello', 'World']
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
Working with Inputs
Steps can receive and transform input data:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class SimpleState:
pass
async def main():
g = GraphBuilder(
state_type=SimpleState,
input_type=int,
output_type=str,
)
@g.step
async def double_it(ctx: StepContext[SimpleState, None, int]) -> int:
"""Double the input value."""
return ctx.inputs * 2
@g.step
async def stringify(ctx: StepContext[SimpleState, None, int]) -> str:
"""Convert to a formatted string."""
return f'Result: {ctx.inputs}'
g.add(
g.edge_from(g.start_node).to(double_it),
g.edge_from(double_it).to(stringify),
g.edge_from(stringify).to(g.end_node),
)
graph = g.build()
result = await graph.run(state=SimpleState(), inputs=21)
print(result)
#> Result: 42
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
Dependency Injection
Steps can access injected dependencies through ctx.deps:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class AppState:
pass
@dataclass
class AppDeps:
"""Dependencies injected into the graph."""
multiplier: int
async def main():
g = GraphBuilder(
state_type=AppState,
deps_type=AppDeps,
input_type=int,
output_type=int,
)
@g.step
async def multiply(ctx: StepContext[AppState, AppDeps, int]) -> int:
"""Multiply input by the injected multiplier."""
return ctx.inputs * ctx.deps.multiplier
g.add(
g.edge_from(g.start_node).to(multiply),
g.edge_from(multiply).to(g.end_node),
)
graph = g.build()
deps = AppDeps(multiplier=10)
result = await graph.run(state=AppState(), deps=deps, inputs=5)
print(result)
#> 50
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
Customizing Steps
Custom Node IDs
By default, step node IDs are inferred from the function name. You can override this:
from pydantic_graph.beta import StepContext
from basic_step import MyState, g
@g.step(node_id='my_custom_id')
async def my_step(ctx: StepContext[MyState, None, None]) -> int:
return 42
# The node ID is now 'my_custom_id' instead of 'my_step'
Human-Readable Labels
Labels provide documentation for diagram generation:
from pydantic_graph.beta import StepContext
from basic_step import MyState, g
@g.step(label='Increment the counter')
async def increment(ctx: StepContext[MyState, None, None]) -> int:
ctx.state.counter += 1
return ctx.state.counter
# Access the label programmatically
print(increment.label)
#> Increment the counter
Sequential Steps
Multiple steps can be chained sequentially:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class MathState:
operations: list[str]
async def main():
g = GraphBuilder(
state_type=MathState,
input_type=int,
output_type=int,
)
@g.step
async def add_five(ctx: StepContext[MathState, None, int]) -> int:
ctx.state.operations.append('add 5')
return ctx.inputs + 5
@g.step
async def multiply_by_two(ctx: StepContext[MathState, None, int]) -> int:
ctx.state.operations.append('multiply by 2')
return ctx.inputs * 2
@g.step
async def subtract_three(ctx: StepContext[MathState, None, int]) -> int:
ctx.state.operations.append('subtract 3')
return ctx.inputs - 3
# Connect steps sequentially
g.add(
g.edge_from(g.start_node).to(add_five),
g.edge_from(add_five).to(multiply_by_two),
g.edge_from(multiply_by_two).to(subtract_three),
g.edge_from(subtract_three).to(g.end_node),
)
graph = g.build()
state = MathState(operations=[])
result = await graph.run(state=state, inputs=10)
print(f'Result: {result}')
#> Result: 27
print(f'Operations: {state.operations}')
#> Operations: ['add 5', 'multiply by 2', 'subtract 3']
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
The computation is: (10 + 5) * 2 - 3 = 27
Streaming Steps
In addition to regular steps that return a single value, you can create streaming steps that yield multiple values over time using the @g.stream decorator:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append
@dataclass
class SimpleState:
pass
g = GraphBuilder(state_type=SimpleState, output_type=list[int])
@g.stream
async def generate_stream(ctx: StepContext[SimpleState, None, None]):
"""Stream numbers from 1 to 5."""
for i in range(1, 6):
yield i
@g.step
async def square(ctx: StepContext[SimpleState, None, int]) -> int:
return ctx.inputs * ctx.inputs
collect = g.join(reduce_list_append, initial_factory=list[int])
g.add(
g.edge_from(g.start_node).to(generate_stream),
# The stream output is an AsyncIterable, so we can map over it
g.edge_from(generate_stream).map().to(square),
g.edge_from(square).to(collect),
g.edge_from(collect).to(g.end_node),
)
graph = g.build()
async def main():
result = await graph.run(state=SimpleState())
print(sorted(result))
#> [1, 4, 9, 16, 25]
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
How Streaming Steps Work
Streaming steps return an AsyncIterable that yields values over time. When you use .map() on a streaming step's output, the graph processes each yielded value as it becomes available, creating parallel tasks dynamically. This is particularly useful for:
- Processing data from APIs that stream responses
- Handling real-time data feeds
- Progressive processing of large datasets
- Any scenario where you want to start processing results before all data is available
Like regular steps, streaming steps can also have custom node IDs and labels:
from pydantic_graph.beta import StepContext
from streaming_step import SimpleState, g
@g.stream(node_id='my_stream', label='Generate numbers progressively')
async def labeled_stream(ctx: StepContext[SimpleState, None, None]):
for i in range(10):
yield i
Edge Building Convenience Methods
The builder provides helper methods for common edge patterns:
Simple Edges with add_edge()
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class SimpleState:
pass
async def main():
g = GraphBuilder(state_type=SimpleState, output_type=int)
@g.step
async def step_a(ctx: StepContext[SimpleState, None, None]) -> int:
return 10
@g.step
async def step_b(ctx: StepContext[SimpleState, None, int]) -> int:
return ctx.inputs + 5
# Using add_edge() for simple connections
g.add_edge(g.start_node, step_a)
g.add_edge(step_a, step_b, label='from a to b')
g.add_edge(step_b, g.end_node)
graph = g.build()
result = await graph.run(state=SimpleState())
print(result)
#> 15
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
Type Safety
The beta graph API provides strong type checking through generics. Type parameters on StepContext ensure:
- State access is properly typed
- Dependencies are correctly typed
- Input/output types match across edges
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class MyState:
pass
g = GraphBuilder(state_type=MyState, output_type=str)
# Type checker will catch mismatches
@g.step
async def expects_int(ctx: StepContext[MyState, None, int]) -> str:
return str(ctx.inputs)
@g.step
async def returns_str(ctx: StepContext[MyState, None, None]) -> str:
return 'hello'
# This would be a type error - expects_int needs int input, but returns_str outputs str
# g.add(g.edge_from(returns_str).to(expects_int)) # Type error!
Next Steps
- Learn about parallel execution with broadcasting and mapping
- Understand join nodes for aggregating parallel results
- Explore conditional branching with decision nodes