pydantic_graph.beta.join
Join operations and reducers for graph execution.
This module provides the core components for joining parallel execution paths in a graph, including various reducer types that aggregate data from multiple sources into a single output.
JoinState
dataclass
The state of a join during graph execution associated to a particular fork run.
Source code in pydantic_graph/pydantic_graph/beta/join.py
31 32 33 34 35 36 37 | |
ReducerContext
dataclass
Bases: Generic[StateT, DepsT]
Context information passed to reducer functions during graph execution.
The reducer context provides access to the current graph state and dependencies.
Type Parameters
StateT: The type of the graph state DepsT: The type of the dependencies
Source code in pydantic_graph/pydantic_graph/beta/join.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | |
state
property
state: StateT
The state of the graph run.
deps
property
deps: DepsT
The deps for the graph run.
cancel_sibling_tasks
cancel_sibling_tasks()
Cancel all sibling tasks created from the same fork.
You can call this if you want your join to have early-stopping behavior.
Source code in pydantic_graph/pydantic_graph/beta/join.py
73 74 75 76 77 78 | |
ReducerFunction
module-attribute
ReducerFunction = TypeAliasType(
"ReducerFunction",
ContextReducerFunction[StateT, DepsT, InputT, OutputT]
| PlainReducerFunction[InputT, OutputT],
type_params=(StateT, DepsT, InputT, OutputT),
)
A function used for reducing inputs to a join node.
reduce_null
reduce_null(current: None, inputs: Any) -> None
A reducer that discards all input data and returns None.
Source code in pydantic_graph/pydantic_graph/beta/join.py
101 102 103 | |
reduce_list_append
A reducer that appends to a list.
Source code in pydantic_graph/pydantic_graph/beta/join.py
106 107 108 109 | |
reduce_list_extend
A reducer that extends a list.
Source code in pydantic_graph/pydantic_graph/beta/join.py
112 113 114 115 | |
reduce_dict_update
A reducer that updates a dict.
Source code in pydantic_graph/pydantic_graph/beta/join.py
118 119 120 121 | |
SupportsSum
Bases: Protocol
A protocol for a type that supports adding to itself.
Source code in pydantic_graph/pydantic_graph/beta/join.py
124 125 126 127 128 129 | |
reduce_sum
reduce_sum(current: NumericT, inputs: NumericT) -> NumericT
A reducer that sums numbers.
Source code in pydantic_graph/pydantic_graph/beta/join.py
135 136 137 | |
ReduceFirstValue
dataclass
Bases: Generic[T]
A reducer that returns the first value it encounters, and cancels all other tasks.
Source code in pydantic_graph/pydantic_graph/beta/join.py
140 141 142 143 144 145 146 147 | |
__call__
__call__(
ctx: ReducerContext[object, object],
current: T,
inputs: T,
) -> T
The reducer function.
Source code in pydantic_graph/pydantic_graph/beta/join.py
144 145 146 147 | |
Join
dataclass
Bases: Generic[StateT, DepsT, InputT, OutputT]
A join operation that synchronizes and aggregates parallel execution paths.
A join defines how to combine outputs from multiple parallel execution paths
using a ReducerFunction. It specifies which fork
it joins (if any) and manages the initialization of reducers.
Type Parameters
StateT: The type of the graph state DepsT: The type of the dependencies InputT: The type of input data to join OutputT: The type of the final joined output
Source code in pydantic_graph/pydantic_graph/beta/join.py
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | |
as_node
as_node(inputs: None = None) -> JoinNode[StateT, DepsT]
as_node(inputs: InputT) -> JoinNode[StateT, DepsT]
as_node(
inputs: InputT | None = None,
) -> JoinNode[StateT, DepsT]
Create a step node with bound inputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
InputT | None
|
The input data to bind to this step, or None |
None
|
Returns:
| Type | Description |
|---|---|
JoinNode[StateT, DepsT]
|
A |
Source code in pydantic_graph/pydantic_graph/beta/join.py
207 208 209 210 211 212 213 214 215 216 | |
JoinNode
dataclass
Bases: BaseNode[StateT, DepsT, Any]
A base node that represents a join item with bound inputs.
JoinNode bridges between the v1 and v2 graph execution systems by wrapping
a Join with bound inputs in a BaseNode interface.
It is not meant to be run directly but rather used to indicate transitions
to v2-style steps.
Source code in pydantic_graph/pydantic_graph/beta/join.py
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | |
run
async
run(
ctx: GraphRunContext[StateT, DepsT],
) -> BaseNode[StateT, DepsT, Any] | End[Any]
Attempt to run the join node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
GraphRunContext[StateT, DepsT]
|
The graph execution context |
required |
Returns:
| Type | Description |
|---|---|
BaseNode[StateT, DepsT, Any] | End[Any]
|
The result of step execution |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Always raised as StepNode is not meant to be run directly |
Source code in pydantic_graph/pydantic_graph/beta/join.py
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | |