Skip to content

pydantic_graph

Graph dataclass

Bases: Generic[StateT, DepsT, RunEndT]

Definition of a graph.

In pydantic-graph, a graph is a collection of nodes that can be run in sequence. The nodes define their outgoing edges — e.g. which nodes may be run next, and thereby the structure of the graph.

Here's a very simple example of a graph which increments a number by 1, but makes sure the number is never 42 at the end.

never_42.py
from __future__ import annotations

from dataclasses import dataclass

from pydantic_graph import BaseNode, End, Graph, GraphRunContext

@dataclass
class MyState:
    number: int

@dataclass
class Increment(BaseNode[MyState]):
    async def run(self, ctx: GraphRunContext) -> Check42:
        ctx.state.number += 1
        return Check42()

@dataclass
class Check42(BaseNode[MyState, None, int]):
    async def run(self, ctx: GraphRunContext) -> Increment | End[int]:
        if ctx.state.number == 42:
            return Increment()
        else:
            return End(ctx.state.number)

never_42_graph = Graph(nodes=(Increment, Check42))
(This example is complete, it can be run "as is")

See run For an example of running graph, and mermaid_code for an example of generating a mermaid diagram from the graph.

Source code in pydantic_graph/pydantic_graph/graph.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
@dataclass(init=False)
class Graph(Generic[StateT, DepsT, RunEndT]):
    """Definition of a graph.

    In `pydantic-graph`, a graph is a collection of nodes that can be run in sequence. The nodes define
    their outgoing edges — e.g. which nodes may be run next, and thereby the structure of the graph.

    Here's a very simple example of a graph which increments a number by 1, but makes sure the number is never
    42 at the end.

    ```py {title="never_42.py" noqa="I001" py="3.10"}
    from __future__ import annotations

    from dataclasses import dataclass

    from pydantic_graph import BaseNode, End, Graph, GraphRunContext

    @dataclass
    class MyState:
        number: int

    @dataclass
    class Increment(BaseNode[MyState]):
        async def run(self, ctx: GraphRunContext) -> Check42:
            ctx.state.number += 1
            return Check42()

    @dataclass
    class Check42(BaseNode[MyState, None, int]):
        async def run(self, ctx: GraphRunContext) -> Increment | End[int]:
            if ctx.state.number == 42:
                return Increment()
            else:
                return End(ctx.state.number)

    never_42_graph = Graph(nodes=(Increment, Check42))
    ```
    _(This example is complete, it can be run "as is")_

    See [`run`][pydantic_graph.graph.Graph.run] For an example of running graph, and
    [`mermaid_code`][pydantic_graph.graph.Graph.mermaid_code] for an example of generating a mermaid diagram
    from the graph.
    """

    name: str | None
    node_defs: dict[str, NodeDef[StateT, DepsT, RunEndT]]
    snapshot_state: Callable[[StateT], StateT]
    _state_type: type[StateT] | _utils.Unset = field(repr=False)
    _run_end_type: type[RunEndT] | _utils.Unset = field(repr=False)

    def __init__(
        self,
        *,
        nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
        name: str | None = None,
        state_type: type[StateT] | _utils.Unset = _utils.UNSET,
        run_end_type: type[RunEndT] | _utils.Unset = _utils.UNSET,
        snapshot_state: Callable[[StateT], StateT] = deep_copy_state,
    ):
        """Create a graph from a sequence of nodes.

        Args:
            nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same
                state type.
            name: Optional name for the graph, if not provided the name will be inferred from the calling frame
                on the first call to a graph method.
            state_type: The type of the state for the graph, this can generally be inferred from `nodes`.
            run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`.
            snapshot_state: A function to snapshot the state of the graph, this is used in
                [`NodeStep`][pydantic_graph.state.NodeStep] and [`EndStep`][pydantic_graph.state.EndStep] to record
                the state before each step.
        """
        self.name = name
        self._state_type = state_type
        self._run_end_type = run_end_type
        self.snapshot_state = snapshot_state

        parent_namespace = _utils.get_parent_namespace(inspect.currentframe())
        self.node_defs: dict[str, NodeDef[StateT, DepsT, RunEndT]] = {}
        for node in nodes:
            self._register_node(node, parent_namespace)

        self._validate_edges()

    async def run(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        infer_name: bool = True,
    ) -> tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]:
        """Run the graph from a starting node until it ends.

        Args:
            start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            The result type from ending the run and the history of the run.

        Here's an example of running the graph from [above][pydantic_graph.graph.Graph]:

        ```py {title="run_never_42.py" noqa="I001" py="3.10"}
        from never_42 import Increment, MyState, never_42_graph

        async def main():
            state = MyState(1)
            _, history = await never_42_graph.run(Increment(), state=state)
            print(state)
            #> MyState(number=2)
            print(len(history))
            #> 3

            state = MyState(41)
            _, history = await never_42_graph.run(Increment(), state=state)
            print(state)
            #> MyState(number=43)
            print(len(history))
            #> 5
        ```
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        history: list[HistoryStep[StateT, RunEndT]] = []
        with _logfire.span(
            '{graph_name} run {start=}',
            graph_name=self.name or 'graph',
            start=start_node,
        ) as run_span:
            while True:
                next_node = await self.next(start_node, history, state=state, deps=deps, infer_name=False)
                if isinstance(next_node, End):
                    history.append(EndStep(result=next_node))
                    run_span.set_attribute('history', history)
                    return next_node.data, history
                elif isinstance(next_node, BaseNode):
                    start_node = next_node
                else:
                    if TYPE_CHECKING:
                        typing_extensions.assert_never(next_node)
                    else:
                        raise exceptions.GraphRuntimeError(
                            f'Invalid node return type: `{type(next_node).__name__}`. Expected `BaseNode` or `End`.'
                        )

    def run_sync(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        infer_name: bool = True,
    ) -> tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]:
        """Run the graph synchronously.

        This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`.
        You therefore can't use this method inside async code or if there's an active event loop.

        Args:
            start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            The result type from ending the run and the history of the run.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        return asyncio.get_event_loop().run_until_complete(
            self.run(start_node, state=state, deps=deps, infer_name=False)
        )

    async def next(
        self,
        node: BaseNode[StateT, DepsT, RunEndT],
        history: list[HistoryStep[StateT, RunEndT]],
        *,
        state: StateT = None,
        deps: DepsT = None,
        infer_name: bool = True,
    ) -> BaseNode[StateT, DepsT, Any] | End[RunEndT]:
        """Run a node in the graph and return the next node to run.

        Args:
            node: The node to run.
            history: The history of the graph run so far. NOTE: this will be mutated to add the new step.
            state: The current state of the graph.
            deps: The dependencies of the graph.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            The next node to run or [`End`][pydantic_graph.nodes.End] if the graph has finished.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        node_id = node.get_id()
        if node_id not in self.node_defs:
            raise exceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')

        ctx = GraphRunContext(state, deps)
        with _logfire.span('run node {node_id}', node_id=node_id, node=node):
            start_ts = _utils.now_utc()
            start = perf_counter()
            next_node = await node.run(ctx)
            duration = perf_counter() - start

        history.append(
            NodeStep(state=state, node=node, start_ts=start_ts, duration=duration, snapshot_state=self.snapshot_state)
        )
        return next_node

    def dump_history(self, history: list[HistoryStep[StateT, RunEndT]], *, indent: int | None = None) -> bytes:
        """Dump the history of a graph run as JSON.

        Args:
            history: The history of the graph run.
            indent: The number of spaces to indent the JSON.

        Returns:
            The JSON representation of the history.
        """
        return self.history_type_adapter.dump_json(history, indent=indent)

    def load_history(self, json_bytes: str | bytes | bytearray) -> list[HistoryStep[StateT, RunEndT]]:
        """Load the history of a graph run from JSON.

        Args:
            json_bytes: The JSON representation of the history.

        Returns:
            The history of the graph run.
        """
        return self.history_type_adapter.validate_json(json_bytes)

    @cached_property
    def history_type_adapter(self) -> pydantic.TypeAdapter[list[HistoryStep[StateT, RunEndT]]]:
        nodes = [node_def.node for node_def in self.node_defs.values()]
        state_t = self._get_state_type()
        end_t = self._get_run_end_type()
        token = nodes_schema_var.set(nodes)
        try:
            ta = pydantic.TypeAdapter(list[Annotated[HistoryStep[state_t, end_t], pydantic.Discriminator('kind')]])
        finally:
            nodes_schema_var.reset(token)
        return ta

    def mermaid_code(
        self,
        *,
        start_node: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
        title: str | None | typing_extensions.Literal[False] = None,
        edge_labels: bool = True,
        notes: bool = True,
        highlighted_nodes: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
        highlight_css: str = mermaid.DEFAULT_HIGHLIGHT_CSS,
        infer_name: bool = True,
    ) -> str:
        """Generate a diagram representing the graph as [mermaid](https://mermaid.js.org/) diagram.

        This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code].

        Args:
            start_node: The node or nodes which can start the graph.
            title: The title of the diagram, use `False` to not include a title.
            edge_labels: Whether to include edge labels.
            notes: Whether to include notes on each node.
            highlighted_nodes: Optional node or nodes to highlight.
            highlight_css: The CSS to use for highlighting nodes.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            The mermaid code for the graph, which can then be rendered as a diagram.

        Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]:

        ```py {title="never_42.py" py="3.10"}
        from never_42 import Increment, never_42_graph

        print(never_42_graph.mermaid_code(start_node=Increment))
        '''
        ---
        title: never_42_graph
        ---
        stateDiagram-v2
          [*] --> Increment
          Increment --> Check42
          Check42 --> Increment
          Check42 --> [*]
        '''
        ```

        The rendered diagram will look like this:

        ```mermaid
        ---
        title: never_42_graph
        ---
        stateDiagram-v2
          [*] --> Increment
          Increment --> Check42
          Check42 --> Increment
          Check42 --> [*]
        ```
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if title is None and self.name:
            title = self.name
        return mermaid.generate_code(
            self,
            start_node=start_node,
            highlighted_nodes=highlighted_nodes,
            highlight_css=highlight_css,
            title=title or None,
            edge_labels=edge_labels,
            notes=notes,
        )

    def mermaid_image(
        self, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
    ) -> bytes:
        """Generate a diagram representing the graph as an image.

        The format and diagram can be customized using `kwargs`,
        see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

        !!! note "Uses external service"
            This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
            is a free service not affiliated with Pydantic.

        Args:
            infer_name: Whether to infer the graph name from the calling frame.
            **kwargs: Additional arguments to pass to `mermaid.request_image`.

        Returns:
            The image bytes.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if 'title' not in kwargs and self.name:
            kwargs['title'] = self.name
        return mermaid.request_image(self, **kwargs)

    def mermaid_save(
        self, path: Path | str, /, *, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
    ) -> None:
        """Generate a diagram representing the graph and save it as an image.

        The format and diagram can be customized using `kwargs`,
        see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

        !!! note "Uses external service"
            This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
            is a free service not affiliated with Pydantic.

        Args:
            path: The path to save the image to.
            infer_name: Whether to infer the graph name from the calling frame.
            **kwargs: Additional arguments to pass to `mermaid.save_image`.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if 'title' not in kwargs and self.name:
            kwargs['title'] = self.name
        mermaid.save_image(path, self, **kwargs)

    def _get_state_type(self) -> type[StateT]:
        if _utils.is_set(self._state_type):
            return self._state_type

        for node_def in self.node_defs.values():
            for base in typing_extensions.get_original_bases(node_def.node):
                if typing_extensions.get_origin(base) is BaseNode:
                    args = typing_extensions.get_args(base)
                    if args:
                        return args[0]
                    # break the inner (bases) loop
                    break
        # state defaults to None, so use that if we can't infer it
        return type(None)  # pyright: ignore[reportReturnType]

    def _get_run_end_type(self) -> type[RunEndT]:
        if _utils.is_set(self._run_end_type):
            return self._run_end_type

        for node_def in self.node_defs.values():
            for base in typing_extensions.get_original_bases(node_def.node):
                if typing_extensions.get_origin(base) is BaseNode:
                    args = typing_extensions.get_args(base)
                    if len(args) == 3:
                        t = args[2]
                        if not _utils.is_never(t):
                            return t
                    # break the inner (bases) loop
                    break
        raise exceptions.GraphSetupError('Could not infer run end type from nodes, please set `run_end_type`.')

    def _register_node(
        self, node: type[BaseNode[StateT, DepsT, RunEndT]], parent_namespace: dict[str, Any] | None
    ) -> None:
        node_id = node.get_id()
        if existing_node := self.node_defs.get(node_id):
            raise exceptions.GraphSetupError(
                f'Node ID `{node_id}` is not unique — found on {existing_node.node} and {node}'
            )
        else:
            self.node_defs[node_id] = node.get_node_def(parent_namespace)

    def _validate_edges(self):
        known_node_ids = self.node_defs.keys()
        bad_edges: dict[str, list[str]] = {}

        for node_id, node_def in self.node_defs.items():
            for edge in node_def.next_node_edges.keys():
                if edge not in known_node_ids:
                    bad_edges.setdefault(edge, []).append(f'`{node_id}`')

        if bad_edges:
            bad_edges_list = [f'`{k}` is referenced by {_utils.comma_and(v)}' for k, v in bad_edges.items()]
            if len(bad_edges_list) == 1:
                raise exceptions.GraphSetupError(f'{bad_edges_list[0]} but not included in the graph.')
            else:
                b = '\n'.join(f' {be}' for be in bad_edges_list)
                raise exceptions.GraphSetupError(
                    f'Nodes are referenced in the graph but not included in the graph:\n{b}'
                )

    def _infer_name(self, function_frame: types.FrameType | None) -> None:
        """Infer the agent name from the call frame.

        Usage should be `self._infer_name(inspect.currentframe())`.

        Copied from `Agent`.
        """
        assert self.name is None, 'Name already set'
        if function_frame is not None and (parent_frame := function_frame.f_back):  # pragma: no branch
            for name, item in parent_frame.f_locals.items():
                if item is self:
                    self.name = name
                    return
            if parent_frame.f_locals != parent_frame.f_globals:
                # if we couldn't find the agent in locals and globals are a different dict, try globals
                for name, item in parent_frame.f_globals.items():
                    if item is self:
                        self.name = name
                        return

__init__

__init__(
    *,
    nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
    name: str | None = None,
    state_type: type[StateT] | Unset = UNSET,
    run_end_type: type[RunEndT] | Unset = UNSET,
    snapshot_state: Callable[
        [StateT], StateT
    ] = deep_copy_state
)

Create a graph from a sequence of nodes.

Parameters:

Name Type Description Default
nodes Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]

The nodes which make up the graph, nodes need to be unique and all be generic in the same state type.

required
name str | None

Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method.

None
state_type type[StateT] | Unset

The type of the state for the graph, this can generally be inferred from nodes.

UNSET
run_end_type type[RunEndT] | Unset

The type of the result of running the graph, this can generally be inferred from nodes.

UNSET
snapshot_state Callable[[StateT], StateT]

A function to snapshot the state of the graph, this is used in NodeStep and EndStep to record the state before each step.

deep_copy_state
Source code in pydantic_graph/pydantic_graph/graph.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def __init__(
    self,
    *,
    nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
    name: str | None = None,
    state_type: type[StateT] | _utils.Unset = _utils.UNSET,
    run_end_type: type[RunEndT] | _utils.Unset = _utils.UNSET,
    snapshot_state: Callable[[StateT], StateT] = deep_copy_state,
):
    """Create a graph from a sequence of nodes.

    Args:
        nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same
            state type.
        name: Optional name for the graph, if not provided the name will be inferred from the calling frame
            on the first call to a graph method.
        state_type: The type of the state for the graph, this can generally be inferred from `nodes`.
        run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`.
        snapshot_state: A function to snapshot the state of the graph, this is used in
            [`NodeStep`][pydantic_graph.state.NodeStep] and [`EndStep`][pydantic_graph.state.EndStep] to record
            the state before each step.
    """
    self.name = name
    self._state_type = state_type
    self._run_end_type = run_end_type
    self.snapshot_state = snapshot_state

    parent_namespace = _utils.get_parent_namespace(inspect.currentframe())
    self.node_defs: dict[str, NodeDef[StateT, DepsT, RunEndT]] = {}
    for node in nodes:
        self._register_node(node, parent_namespace)

    self._validate_edges()

run async

run(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True
) -> tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]

Run the graph from a starting node until it ends.

Parameters:

Name Type Description Default
start_node BaseNode[StateT, DepsT, RunEndT]

the first node to run, since the graph definition doesn't define the entry point in the graph, you need to provide the starting node.

required
state StateT

The initial state of the graph.

None
deps DepsT

The dependencies of the graph.

None
infer_name bool

Whether to infer the graph name from the calling frame.

True

Returns:

Type Description
tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]

The result type from ending the run and the history of the run.

Here's an example of running the graph from above:

run_never_42.py
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(1)
    _, history = await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=2)
    print(len(history))
    #> 3

    state = MyState(41)
    _, history = await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=43)
    print(len(history))
    #> 5
Source code in pydantic_graph/pydantic_graph/graph.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def run(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True,
) -> tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]:
    """Run the graph from a starting node until it ends.

    Args:
        start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        The result type from ending the run and the history of the run.

    Here's an example of running the graph from [above][pydantic_graph.graph.Graph]:

    ```py {title="run_never_42.py" noqa="I001" py="3.10"}
    from never_42 import Increment, MyState, never_42_graph

    async def main():
        state = MyState(1)
        _, history = await never_42_graph.run(Increment(), state=state)
        print(state)
        #> MyState(number=2)
        print(len(history))
        #> 3

        state = MyState(41)
        _, history = await never_42_graph.run(Increment(), state=state)
        print(state)
        #> MyState(number=43)
        print(len(history))
        #> 5
    ```
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    history: list[HistoryStep[StateT, RunEndT]] = []
    with _logfire.span(
        '{graph_name} run {start=}',
        graph_name=self.name or 'graph',
        start=start_node,
    ) as run_span:
        while True:
            next_node = await self.next(start_node, history, state=state, deps=deps, infer_name=False)
            if isinstance(next_node, End):
                history.append(EndStep(result=next_node))
                run_span.set_attribute('history', history)
                return next_node.data, history
            elif isinstance(next_node, BaseNode):
                start_node = next_node
            else:
                if TYPE_CHECKING:
                    typing_extensions.assert_never(next_node)
                else:
                    raise exceptions.GraphRuntimeError(
                        f'Invalid node return type: `{type(next_node).__name__}`. Expected `BaseNode` or `End`.'
                    )

run_sync

run_sync(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True
) -> tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]

Run the graph synchronously.

This is a convenience method that wraps self.run with loop.run_until_complete(...). You therefore can't use this method inside async code or if there's an active event loop.

Parameters:

Name Type Description Default
start_node BaseNode[StateT, DepsT, RunEndT]

the first node to run, since the graph definition doesn't define the entry point in the graph, you need to provide the starting node.

required
state StateT

The initial state of the graph.

None
deps DepsT

The dependencies of the graph.

None
infer_name bool

Whether to infer the graph name from the calling frame.

True

Returns:

Type Description
tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]

The result type from ending the run and the history of the run.

Source code in pydantic_graph/pydantic_graph/graph.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def run_sync(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True,
) -> tuple[RunEndT, list[HistoryStep[StateT, RunEndT]]]:
    """Run the graph synchronously.

    This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`.
    You therefore can't use this method inside async code or if there's an active event loop.

    Args:
        start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        The result type from ending the run and the history of the run.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    return asyncio.get_event_loop().run_until_complete(
        self.run(start_node, state=state, deps=deps, infer_name=False)
    )

next async

next(
    node: BaseNode[StateT, DepsT, RunEndT],
    history: list[HistoryStep[StateT, RunEndT]],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True
) -> BaseNode[StateT, DepsT, Any] | End[RunEndT]

Run a node in the graph and return the next node to run.

Parameters:

Name Type Description Default
node BaseNode[StateT, DepsT, RunEndT]

The node to run.

required
history list[HistoryStep[StateT, RunEndT]]

The history of the graph run so far. NOTE: this will be mutated to add the new step.

required
state StateT

The current state of the graph.

None
deps DepsT

The dependencies of the graph.

None
infer_name bool

Whether to infer the graph name from the calling frame.

True

Returns:

Type Description
BaseNode[StateT, DepsT, Any] | End[RunEndT]

The next node to run or End if the graph has finished.

Source code in pydantic_graph/pydantic_graph/graph.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
async def next(
    self,
    node: BaseNode[StateT, DepsT, RunEndT],
    history: list[HistoryStep[StateT, RunEndT]],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True,
) -> BaseNode[StateT, DepsT, Any] | End[RunEndT]:
    """Run a node in the graph and return the next node to run.

    Args:
        node: The node to run.
        history: The history of the graph run so far. NOTE: this will be mutated to add the new step.
        state: The current state of the graph.
        deps: The dependencies of the graph.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        The next node to run or [`End`][pydantic_graph.nodes.End] if the graph has finished.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    node_id = node.get_id()
    if node_id not in self.node_defs:
        raise exceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')

    ctx = GraphRunContext(state, deps)
    with _logfire.span('run node {node_id}', node_id=node_id, node=node):
        start_ts = _utils.now_utc()
        start = perf_counter()
        next_node = await node.run(ctx)
        duration = perf_counter() - start

    history.append(
        NodeStep(state=state, node=node, start_ts=start_ts, duration=duration, snapshot_state=self.snapshot_state)
    )
    return next_node

dump_history

dump_history(
    history: list[HistoryStep[StateT, RunEndT]],
    *,
    indent: int | None = None
) -> bytes

Dump the history of a graph run as JSON.

Parameters:

Name Type Description Default
history list[HistoryStep[StateT, RunEndT]]

The history of the graph run.

required
indent int | None

The number of spaces to indent the JSON.

None

Returns:

Type Description
bytes

The JSON representation of the history.

Source code in pydantic_graph/pydantic_graph/graph.py
244
245
246
247
248
249
250
251
252
253
254
def dump_history(self, history: list[HistoryStep[StateT, RunEndT]], *, indent: int | None = None) -> bytes:
    """Dump the history of a graph run as JSON.

    Args:
        history: The history of the graph run.
        indent: The number of spaces to indent the JSON.

    Returns:
        The JSON representation of the history.
    """
    return self.history_type_adapter.dump_json(history, indent=indent)

load_history

load_history(
    json_bytes: str | bytes | bytearray,
) -> list[HistoryStep[StateT, RunEndT]]

Load the history of a graph run from JSON.

Parameters:

Name Type Description Default
json_bytes str | bytes | bytearray

The JSON representation of the history.

required

Returns:

Type Description
list[HistoryStep[StateT, RunEndT]]

The history of the graph run.

Source code in pydantic_graph/pydantic_graph/graph.py
256
257
258
259
260
261
262
263
264
265
def load_history(self, json_bytes: str | bytes | bytearray) -> list[HistoryStep[StateT, RunEndT]]:
    """Load the history of a graph run from JSON.

    Args:
        json_bytes: The JSON representation of the history.

    Returns:
        The history of the graph run.
    """
    return self.history_type_adapter.validate_json(json_bytes)

mermaid_code

mermaid_code(
    *,
    start_node: (
        Sequence[NodeIdent] | NodeIdent | None
    ) = None,
    title: str | None | Literal[False] = None,
    edge_labels: bool = True,
    notes: bool = True,
    highlighted_nodes: (
        Sequence[NodeIdent] | NodeIdent | None
    ) = None,
    highlight_css: str = DEFAULT_HIGHLIGHT_CSS,
    infer_name: bool = True
) -> str

Generate a diagram representing the graph as mermaid diagram.

This method calls pydantic_graph.mermaid.generate_code.

Parameters:

Name Type Description Default
start_node Sequence[NodeIdent] | NodeIdent | None

The node or nodes which can start the graph.

None
title str | None | Literal[False]

The title of the diagram, use False to not include a title.

None
edge_labels bool

Whether to include edge labels.

True
notes bool

Whether to include notes on each node.

True
highlighted_nodes Sequence[NodeIdent] | NodeIdent | None

Optional node or nodes to highlight.

None
highlight_css str

The CSS to use for highlighting nodes.

DEFAULT_HIGHLIGHT_CSS
infer_name bool

Whether to infer the graph name from the calling frame.

True

Returns:

Type Description
str

The mermaid code for the graph, which can then be rendered as a diagram.

Here's an example of generating a diagram for the graph from above:

never_42.py
from never_42 import Increment, never_42_graph

print(never_42_graph.mermaid_code(start_node=Increment))
'''
---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
'''

The rendered diagram will look like this:

---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
Source code in pydantic_graph/pydantic_graph/graph.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def mermaid_code(
    self,
    *,
    start_node: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    title: str | None | typing_extensions.Literal[False] = None,
    edge_labels: bool = True,
    notes: bool = True,
    highlighted_nodes: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    highlight_css: str = mermaid.DEFAULT_HIGHLIGHT_CSS,
    infer_name: bool = True,
) -> str:
    """Generate a diagram representing the graph as [mermaid](https://mermaid.js.org/) diagram.

    This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code].

    Args:
        start_node: The node or nodes which can start the graph.
        title: The title of the diagram, use `False` to not include a title.
        edge_labels: Whether to include edge labels.
        notes: Whether to include notes on each node.
        highlighted_nodes: Optional node or nodes to highlight.
        highlight_css: The CSS to use for highlighting nodes.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        The mermaid code for the graph, which can then be rendered as a diagram.

    Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]:

    ```py {title="never_42.py" py="3.10"}
    from never_42 import Increment, never_42_graph

    print(never_42_graph.mermaid_code(start_node=Increment))
    '''
    ---
    title: never_42_graph
    ---
    stateDiagram-v2
      [*] --> Increment
      Increment --> Check42
      Check42 --> Increment
      Check42 --> [*]
    '''
    ```

    The rendered diagram will look like this:

    ```mermaid
    ---
    title: never_42_graph
    ---
    stateDiagram-v2
      [*] --> Increment
      Increment --> Check42
      Check42 --> Increment
      Check42 --> [*]
    ```
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if title is None and self.name:
        title = self.name
    return mermaid.generate_code(
        self,
        start_node=start_node,
        highlighted_nodes=highlighted_nodes,
        highlight_css=highlight_css,
        title=title or None,
        edge_labels=edge_labels,
        notes=notes,
    )

mermaid_image

mermaid_image(
    infer_name: bool = True, **kwargs: Unpack[MermaidConfig]
) -> bytes

Generate a diagram representing the graph as an image.

The format and diagram can be customized using kwargs, see pydantic_graph.mermaid.MermaidConfig.

Uses external service

This method makes a request to mermaid.ink to render the image, mermaid.ink is a free service not affiliated with Pydantic.

Parameters:

Name Type Description Default
infer_name bool

Whether to infer the graph name from the calling frame.

True
**kwargs Unpack[MermaidConfig]

Additional arguments to pass to mermaid.request_image.

{}

Returns:

Type Description
bytes

The image bytes.

Source code in pydantic_graph/pydantic_graph/graph.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def mermaid_image(
    self, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
) -> bytes:
    """Generate a diagram representing the graph as an image.

    The format and diagram can be customized using `kwargs`,
    see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

    !!! note "Uses external service"
        This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
        is a free service not affiliated with Pydantic.

    Args:
        infer_name: Whether to infer the graph name from the calling frame.
        **kwargs: Additional arguments to pass to `mermaid.request_image`.

    Returns:
        The image bytes.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if 'title' not in kwargs and self.name:
        kwargs['title'] = self.name
    return mermaid.request_image(self, **kwargs)

mermaid_save

mermaid_save(
    path: Path | str,
    /,
    *,
    infer_name: bool = True,
    **kwargs: Unpack[MermaidConfig],
) -> None

Generate a diagram representing the graph and save it as an image.

The format and diagram can be customized using kwargs, see pydantic_graph.mermaid.MermaidConfig.

Uses external service

This method makes a request to mermaid.ink to render the image, mermaid.ink is a free service not affiliated with Pydantic.

Parameters:

Name Type Description Default
path Path | str

The path to save the image to.

required
infer_name bool

Whether to infer the graph name from the calling frame.

True
**kwargs Unpack[MermaidConfig]

Additional arguments to pass to mermaid.save_image.

{}
Source code in pydantic_graph/pydantic_graph/graph.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def mermaid_save(
    self, path: Path | str, /, *, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
) -> None:
    """Generate a diagram representing the graph and save it as an image.

    The format and diagram can be customized using `kwargs`,
    see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

    !!! note "Uses external service"
        This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
        is a free service not affiliated with Pydantic.

    Args:
        path: The path to save the image to.
        infer_name: Whether to infer the graph name from the calling frame.
        **kwargs: Additional arguments to pass to `mermaid.save_image`.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if 'title' not in kwargs and self.name:
        kwargs['title'] = self.name
    mermaid.save_image(path, self, **kwargs)