Skip to content

Parallel Execution

The beta graph API provides two powerful mechanisms for parallel execution: broadcasting and mapping.

Overview

  • Broadcasting - Send the same data to multiple parallel paths
  • Spreading - Fan out items from an iterable to parallel paths

Both create "forks" in the execution graph that can later be synchronized with join nodes.

Broadcasting

Broadcasting sends identical data to multiple destinations simultaneously:

basic_broadcast.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[int])

    @g.step
    async def source(ctx: StepContext[SimpleState, None, None]) -> int:
        return 10

    @g.step
    async def add_one(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs + 1

    @g.step
    async def add_two(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs + 2

    @g.step
    async def add_three(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs + 3

    collect = g.join(reduce_list_append, initial_factory=list[int])

    # Broadcasting: send the value from source to all three steps
    g.add(
        g.edge_from(g.start_node).to(source),
        g.edge_from(source).to(add_one, add_two, add_three),
        g.edge_from(add_one, add_two, add_three).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> [11, 12, 13]

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

All three steps receive the same input value (10) and execute in parallel.

Spreading

Spreading fans out elements from an iterable, processing each element in parallel:

basic_map.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[int])

    @g.step
    async def generate_list(ctx: StepContext[SimpleState, None, None]) -> list[int]:
        return [1, 2, 3, 4, 5]

    @g.step
    async def square(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs * ctx.inputs

    collect = g.join(reduce_list_append, initial_factory=list[int])

    # Spreading: each item in the list gets its own parallel execution
    g.add(
        g.edge_from(g.start_node).to(generate_list),
        g.edge_from(generate_list).map().to(square),
        g.edge_from(square).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> [1, 4, 9, 16, 25]

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

Spreading AsyncIterables

The .map() operation also works with AsyncIterable values. When mapping over an async iterable, the graph creates parallel tasks dynamically as values are yielded. This is particularly useful for streaming data or processing data that's being generated on-the-fly:

async_iterable_map.py
import asyncio
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[int])

    @g.stream
    async def stream_numbers(ctx: StepContext[SimpleState, None, None]):
        """Stream numbers with delays to simulate real-time data."""
        for i in range(1, 4):
            await asyncio.sleep(0.05)  # Simulate delay
            yield i

    @g.step
    async def triple(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs * 3

    collect = g.join(reduce_list_append, initial_factory=list[int])

    g.add(
        g.edge_from(g.start_node).to(stream_numbers),
        # Map over the async iterable - tasks created as items are yielded
        g.edge_from(stream_numbers).map().to(triple),
        g.edge_from(triple).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> [3, 6, 9]

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

This allows for progressive processing where downstream steps can start working on early results while later results are still being generated.

Using add_mapping_edge()

The convenience method add_mapping_edge() provides a simpler syntax:

mapping_convenience.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[str])

    @g.step
    async def generate_numbers(ctx: StepContext[SimpleState, None, None]) -> list[int]:
        return [10, 20, 30]

    @g.step
    async def stringify(ctx: StepContext[SimpleState, None, int]) -> str:
        return f'Value: {ctx.inputs}'

    collect = g.join(reduce_list_append, initial_factory=list[str])

    g.add(g.edge_from(g.start_node).to(generate_numbers))
    g.add_mapping_edge(generate_numbers, stringify)
    g.add(
        g.edge_from(stringify).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> ['Value: 10', 'Value: 20', 'Value: 30']

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

Empty Iterables

When mapping an empty iterable, you can specify a downstream_join_id to ensure the join still executes:

empty_map.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[int])

    @g.step
    async def generate_empty(ctx: StepContext[SimpleState, None, None]) -> list[int]:
        return []

    @g.step
    async def double(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs * 2

    collect = g.join(reduce_list_append, initial_factory=list[int])

    g.add(g.edge_from(g.start_node).to(generate_empty))
    g.add_mapping_edge(generate_empty, double, downstream_join_id=collect.id)
    g.add(
        g.edge_from(double).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(result)
    #> []

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

Nested Parallel Operations

You can nest broadcasts and maps for complex parallel patterns:

Spread then Broadcast

map_then_broadcast.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[int])

    @g.step
    async def generate_list(ctx: StepContext[SimpleState, None, None]) -> list[int]:
        return [10, 20]

    @g.step
    async def add_one(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs + 1

    @g.step
    async def add_two(ctx: StepContext[SimpleState, None, int]) -> int:
        return ctx.inputs + 2

    collect = g.join(reduce_list_append, initial_factory=list[int])

    g.add(
        g.edge_from(g.start_node).to(generate_list),
        # Spread the list, then broadcast each item to both steps
        g.edge_from(generate_list).map().to(add_one, add_two),
        g.edge_from(add_one, add_two).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> [11, 12, 21, 22]

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

The result contains: - From 10: 10+1=11 and 10+2=12 - From 20: 20+1=21 and 20+2=22

Multiple Sequential Spreads

sequential_maps.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[str])

    @g.step
    async def generate_pairs(ctx: StepContext[SimpleState, None, None]) -> list[tuple[int, int]]:
        return [(1, 2), (3, 4)]

    @g.step
    async def unpack_pair(ctx: StepContext[SimpleState, None, tuple[int, int]]) -> list[int]:
        return [ctx.inputs[0], ctx.inputs[1]]

    @g.step
    async def stringify(ctx: StepContext[SimpleState, None, int]) -> str:
        return f'num:{ctx.inputs}'

    collect = g.join(reduce_list_append, initial_factory=list[str])

    g.add(
        g.edge_from(g.start_node).to(generate_pairs),
        # First map: one task per tuple
        g.edge_from(generate_pairs).map().to(unpack_pair),
        # Second map: one task per number in each tuple
        g.edge_from(unpack_pair).map().to(stringify),
        g.edge_from(stringify).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> ['num:1', 'num:2', 'num:3', 'num:4']

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

Edge Labels

Add labels to parallel edges for better documentation:

labeled_parallel.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[str])

    @g.step
    async def generate(ctx: StepContext[SimpleState, None, None]) -> list[int]:
        return [1, 2, 3]

    @g.step
    async def process(ctx: StepContext[SimpleState, None, int]) -> str:
        return f'item-{ctx.inputs}'

    collect = g.join(reduce_list_append, initial_factory=list[str])

    g.add(g.edge_from(g.start_node).to(generate))
    g.add_mapping_edge(
        generate,
        process,
        pre_map_label='before map',
        post_map_label='after map',
    )
    g.add(
        g.edge_from(process).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> ['item-1', 'item-2', 'item-3']

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

State Sharing in Parallel Execution

All parallel tasks share the same graph state. Be careful with mutations:

parallel_state.py
from dataclasses import dataclass, field

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class CounterState:
    values: list[int] = field(default_factory=list)


async def main():
    g = GraphBuilder(state_type=CounterState, output_type=list[int])

    @g.step
    async def generate(ctx: StepContext[CounterState, None, None]) -> list[int]:
        return [1, 2, 3]

    @g.step
    async def track_and_square(ctx: StepContext[CounterState, None, int]) -> int:
        # All parallel tasks mutate the same state
        ctx.state.values.append(ctx.inputs)
        return ctx.inputs * ctx.inputs

    collect = g.join(reduce_list_append, initial_factory=list[int])

    g.add(
        g.edge_from(g.start_node).to(generate),
        g.edge_from(generate).map().to(track_and_square),
        g.edge_from(track_and_square).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    state = CounterState()
    result = await graph.run(state=state)

    print(f'Squared: {sorted(result)}')
    #> Squared: [1, 4, 9]
    print(f'Tracked: {sorted(state.values)}')
    #> Tracked: [1, 2, 3]

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

Edge Transformations

You can transform data inline as it flows along edges using the .transform() method:

edge_transform.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=str)

    @g.step
    async def generate_number(ctx: StepContext[SimpleState, None, None]) -> int:
        return 42

    @g.step
    async def format_output(ctx: StepContext[SimpleState, None, str]) -> str:
        return f'The answer is: {ctx.inputs}'

    # Transform the number to a string inline
    g.add(
        g.edge_from(g.start_node).to(generate_number),
        g.edge_from(generate_number).transform(lambda ctx: str(ctx.inputs * 2)).to(format_output),
        g.edge_from(format_output).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(result)
    #> The answer is: 84

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

The transform function receives a StepContext with the current inputs and has access to state and dependencies. This is useful for:

  • Converting data types between incompatible steps
  • Extracting specific fields from complex objects
  • Applying simple computations without creating a full step
  • Adapting data formats during routing

Transforms can be chained and combined with other edge operations like .map() and .label():

chained_transforms.py
from dataclasses import dataclass

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append


@dataclass
class SimpleState:
    pass


async def main():
    g = GraphBuilder(state_type=SimpleState, output_type=list[str])

    @g.step
    async def generate_data(ctx: StepContext[SimpleState, None, None]) -> list[dict[str, int]]:
        return [{'value': 10}, {'value': 20}, {'value': 30}]

    @g.step
    async def process_number(ctx: StepContext[SimpleState, None, int]) -> str:
        return f'Processed: {ctx.inputs}'

    collect = g.join(reduce_list_append, initial_factory=list[str])

    g.add(
        g.edge_from(g.start_node).to(generate_data),
        # Transform to extract values, then map over them
        g.edge_from(generate_data)
        .transform(lambda ctx: [item['value'] for item in ctx.inputs])
        .label('Extract values')
        .map()
        .to(process_number),
        g.edge_from(process_number).to(collect),
        g.edge_from(collect).to(g.end_node),
    )

    graph = g.build()
    result = await graph.run(state=SimpleState())
    print(sorted(result))
    #> ['Processed: 10', 'Processed: 20', 'Processed: 30']

(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)

Next Steps