Skip to content

pydantic_ai.result

ResultData module-attribute

ResultData = TypeVar('ResultData', default=str)

Type variable for the result data of a run.

ResultValidatorFunc module-attribute

A function that always takes ResultData and returns ResultData and:

  • may or may not take RunContext as a first argument
  • may or may not be async

Usage ResultValidatorFunc[AgentDeps, ResultData].

RunResult dataclass

Bases: _BaseRunResult[ResultData]

Result of a non-streamed run.

Source code in pydantic_ai_slim/pydantic_ai/result.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@dataclass
class RunResult(_BaseRunResult[ResultData]):
    """Result of a non-streamed run."""

    data: ResultData
    """Data from the final response in the run."""
    _result_tool_name: str | None
    _usage: Usage

    def usage(self) -> Usage:
        """Return the usage of the whole run."""
        return self._usage

    def all_messages(self, *, result_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
        """Return the history of _messages.

        Args:
            result_tool_return_content: The return content of the tool call to set in the last message.
                This provides a convenient way to modify the content of the result tool call if you want to continue
                the conversation and want to set the response to the result tool call. If `None`, the last message will
                not be modified.

        Returns:
            List of messages.
        """
        if result_tool_return_content is not None:
            return self._set_result_tool_return(result_tool_return_content)
        else:
            return self._all_messages

    def _set_result_tool_return(self, return_content: str) -> list[_messages.ModelMessage]:
        """Set return content for the result tool.

        Useful if you want to continue the conversation and want to set the response to the result tool call.
        """
        if not self._result_tool_name:
            raise ValueError('Cannot set result tool return content when the return type is `str`.')
        messages = deepcopy(self._all_messages)
        last_message = messages[-1]
        for part in last_message.parts:
            if isinstance(part, _messages.ToolReturnPart) and part.tool_name == self._result_tool_name:
                part.content = return_content
                return messages
        raise LookupError(f'No tool call found with tool name {self._result_tool_name!r}.')

all_messages_json

all_messages_json(
    *, result_tool_return_content: str | None = None
) -> bytes

Return all messages from all_messages as JSON bytes.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
bytes

JSON bytes representing the messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def all_messages_json(self, *, result_tool_return_content: str | None = None) -> bytes:
    """Return all messages from [`all_messages`][pydantic_ai.result._BaseRunResult.all_messages] as JSON bytes.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        JSON bytes representing the messages.
    """
    return _messages.ModelMessagesTypeAdapter.dump_json(
        self.all_messages(result_tool_return_content=result_tool_return_content)
    )

new_messages

new_messages(
    *, result_tool_return_content: str | None = None
) -> list[ModelMessage]

Return new messages associated with this run.

Messages from older runs are excluded.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
list[ModelMessage]

List of new messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def new_messages(self, *, result_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
    """Return new messages associated with this run.

    Messages from older runs are excluded.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        List of new messages.
    """
    return self.all_messages(result_tool_return_content=result_tool_return_content)[self._new_message_index :]

new_messages_json

new_messages_json(
    *, result_tool_return_content: str | None = None
) -> bytes

Return new messages from new_messages as JSON bytes.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
bytes

JSON bytes representing the new messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def new_messages_json(self, *, result_tool_return_content: str | None = None) -> bytes:
    """Return new messages from [`new_messages`][pydantic_ai.result._BaseRunResult.new_messages] as JSON bytes.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        JSON bytes representing the new messages.
    """
    return _messages.ModelMessagesTypeAdapter.dump_json(
        self.new_messages(result_tool_return_content=result_tool_return_content)
    )

data instance-attribute

data: ResultData

Data from the final response in the run.

usage

usage() -> Usage

Return the usage of the whole run.

Source code in pydantic_ai_slim/pydantic_ai/result.py
130
131
132
def usage(self) -> Usage:
    """Return the usage of the whole run."""
    return self._usage

all_messages

all_messages(
    *, result_tool_return_content: str | None = None
) -> list[ModelMessage]

Return the history of _messages.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
list[ModelMessage]

List of messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def all_messages(self, *, result_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
    """Return the history of _messages.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        List of messages.
    """
    if result_tool_return_content is not None:
        return self._set_result_tool_return(result_tool_return_content)
    else:
        return self._all_messages

StreamedRunResult dataclass

Bases: _BaseRunResult[ResultData], Generic[AgentDeps, ResultData]

Result of a streamed run that returns structured data via a tool call.

Source code in pydantic_ai_slim/pydantic_ai/result.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
@dataclass
class StreamedRunResult(_BaseRunResult[ResultData], Generic[AgentDeps, ResultData]):
    """Result of a streamed run that returns structured data via a tool call."""

    _usage_limits: UsageLimits | None
    _stream_response: models.EitherStreamedResponse
    _result_schema: _result.ResultSchema[ResultData] | None
    _run_ctx: RunContext[AgentDeps]
    _result_validators: list[_result.ResultValidator[AgentDeps, ResultData]]
    _result_tool_name: str | None
    _on_complete: Callable[[], Awaitable[None]]
    is_complete: bool = field(default=False, init=False)
    """Whether the stream has all been received.

    This is set to `True` when one of
    [`stream`][pydantic_ai.result.StreamedRunResult.stream],
    [`stream_text`][pydantic_ai.result.StreamedRunResult.stream_text],
    [`stream_structured`][pydantic_ai.result.StreamedRunResult.stream_structured] or
    [`get_data`][pydantic_ai.result.StreamedRunResult.get_data] completes.
    """

    async def stream(self, *, debounce_by: float | None = 0.1) -> AsyncIterator[ResultData]:
        """Stream the response as an async iterable.

        The pydantic validator for structured data will be called in
        [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation)
        on each iteration.

        Args:
            debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
                Debouncing is particularly important for long structured responses to reduce the overhead of
                performing validation as each token is received.

        Returns:
            An async iterable of the response data.
        """
        if isinstance(self._stream_response, models.StreamTextResponse):
            async for text in self.stream_text(debounce_by=debounce_by):
                yield cast(ResultData, text)
        else:
            async for structured_message, is_last in self.stream_structured(debounce_by=debounce_by):
                yield await self.validate_structured_result(structured_message, allow_partial=not is_last)

    async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]:
        """Stream the text result as an async iterable.

        !!! note
            This method will fail if the response is structured,
            e.g. if [`is_structured`][pydantic_ai.result.StreamedRunResult.is_structured] returns `True`.

        !!! note
            Result validators will NOT be called on the text result if `delta=True`.

        Args:
            delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text
                up to the current point.
            debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
                Debouncing is particularly important for long structured responses to reduce the overhead of
                performing validation as each token is received.
        """
        usage_checking_stream = _get_usage_checking_stream_response(
            self._stream_response, self._usage_limits, self.usage
        )

        with _logfire.span('response stream text') as lf_span:
            if isinstance(self._stream_response, models.StreamStructuredResponse):
                raise exceptions.UserError('stream_text() can only be used with text responses')
            if delta:
                async with _utils.group_by_temporal(usage_checking_stream, debounce_by) as group_iter:
                    async for _ in group_iter:
                        yield ''.join(self._stream_response.get())
                final_delta = ''.join(self._stream_response.get(final=True))
                if final_delta:
                    yield final_delta
            else:
                # a quick benchmark shows it's faster to build up a string with concat when we're
                # yielding at each step
                chunks: list[str] = []
                combined = ''
                async with _utils.group_by_temporal(usage_checking_stream, debounce_by) as group_iter:
                    async for _ in group_iter:
                        new = False
                        for chunk in self._stream_response.get():
                            chunks.append(chunk)
                            new = True
                        if new:
                            combined = await self._validate_text_result(''.join(chunks))
                            yield combined

                new = False
                for chunk in self._stream_response.get(final=True):
                    chunks.append(chunk)
                    new = True
                if new:
                    combined = await self._validate_text_result(''.join(chunks))
                    yield combined
                lf_span.set_attribute('combined_text', combined)
                await self._marked_completed(_messages.ModelResponse.from_text(combined))

    async def stream_structured(
        self, *, debounce_by: float | None = 0.1
    ) -> AsyncIterator[tuple[_messages.ModelResponse, bool]]:
        """Stream the response as an async iterable of Structured LLM Messages.

        !!! note
            This method will fail if the response is text,
            e.g. if [`is_structured`][pydantic_ai.result.StreamedRunResult.is_structured] returns `False`.

        Args:
            debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
                Debouncing is particularly important for long structured responses to reduce the overhead of
                performing validation as each token is received.

        Returns:
            An async iterable of the structured response message and whether that is the last message.
        """
        usage_checking_stream = _get_usage_checking_stream_response(
            self._stream_response, self._usage_limits, self.usage
        )

        with _logfire.span('response stream structured') as lf_span:
            if isinstance(self._stream_response, models.StreamTextResponse):
                raise exceptions.UserError('stream_structured() can only be used with structured responses')
            else:
                # we should already have a message at this point, yield that first if it has any content
                msg = self._stream_response.get()
                for item in msg.parts:
                    if isinstance(item, _messages.ToolCallPart) and item.has_content():
                        yield msg, False
                        break
                async with _utils.group_by_temporal(usage_checking_stream, debounce_by) as group_iter:
                    async for _ in group_iter:
                        msg = self._stream_response.get()
                        for item in msg.parts:
                            if isinstance(item, _messages.ToolCallPart) and item.has_content():
                                yield msg, False
                                break
                msg = self._stream_response.get(final=True)
                yield msg, True
                lf_span.set_attribute('structured_response', msg)
                await self._marked_completed(msg)

    async def get_data(self) -> ResultData:
        """Stream the whole response, validate and return it."""
        usage_checking_stream = _get_usage_checking_stream_response(
            self._stream_response, self._usage_limits, self.usage
        )

        async for _ in usage_checking_stream:
            pass

        if isinstance(self._stream_response, models.StreamTextResponse):
            text = ''.join(self._stream_response.get(final=True))
            text = await self._validate_text_result(text)
            await self._marked_completed(_messages.ModelResponse.from_text(text))
            return cast(ResultData, text)
        else:
            message = self._stream_response.get(final=True)
            await self._marked_completed(message)
            return await self.validate_structured_result(message)

    @property
    def is_structured(self) -> bool:
        """Return whether the stream response contains structured data (as opposed to text)."""
        return isinstance(self._stream_response, models.StreamStructuredResponse)

    def usage(self) -> Usage:
        """Return the usage of the whole run.

        !!! note
            This won't return the full usage until the stream is finished.
        """
        return self._run_ctx.usage + self._stream_response.usage()

    def timestamp(self) -> datetime:
        """Get the timestamp of the response."""
        return self._stream_response.timestamp()

    async def validate_structured_result(
        self, message: _messages.ModelResponse, *, allow_partial: bool = False
    ) -> ResultData:
        """Validate a structured result message."""
        assert self._result_schema is not None, 'Expected _result_schema to not be None'
        assert self._result_tool_name is not None, 'Expected _result_tool_name to not be None'
        match = self._result_schema.find_named_tool(message.parts, self._result_tool_name)
        if match is None:
            raise exceptions.UnexpectedModelBehavior(
                f'Invalid message, unable to find tool: {self._result_schema.tool_names()}'
            )

        call, result_tool = match
        result_data = result_tool.validate(call, allow_partial=allow_partial, wrap_validation_errors=False)

        for validator in self._result_validators:
            result_data = await validator.validate(result_data, call, self._run_ctx)
        return result_data

    async def _validate_text_result(self, text: str) -> str:
        for validator in self._result_validators:
            text = await validator.validate(  # pyright: ignore[reportAssignmentType]
                text,  # pyright: ignore[reportArgumentType]
                None,
                self._run_ctx,
            )
        return text

    async def _marked_completed(self, message: _messages.ModelResponse) -> None:
        self.is_complete = True
        self._all_messages.append(message)
        await self._on_complete()

all_messages

all_messages(
    *, result_tool_return_content: str | None = None
) -> list[ModelMessage]

Return the history of _messages.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
list[ModelMessage]

List of messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def all_messages(self, *, result_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
    """Return the history of _messages.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        List of messages.
    """
    # this is a method to be consistent with the other methods
    if result_tool_return_content is not None:
        raise NotImplementedError('Setting result tool return content is not supported for this result type.')
    return self._all_messages

all_messages_json

all_messages_json(
    *, result_tool_return_content: str | None = None
) -> bytes

Return all messages from all_messages as JSON bytes.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
bytes

JSON bytes representing the messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def all_messages_json(self, *, result_tool_return_content: str | None = None) -> bytes:
    """Return all messages from [`all_messages`][pydantic_ai.result._BaseRunResult.all_messages] as JSON bytes.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        JSON bytes representing the messages.
    """
    return _messages.ModelMessagesTypeAdapter.dump_json(
        self.all_messages(result_tool_return_content=result_tool_return_content)
    )

new_messages

new_messages(
    *, result_tool_return_content: str | None = None
) -> list[ModelMessage]

Return new messages associated with this run.

Messages from older runs are excluded.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
list[ModelMessage]

List of new messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def new_messages(self, *, result_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
    """Return new messages associated with this run.

    Messages from older runs are excluded.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        List of new messages.
    """
    return self.all_messages(result_tool_return_content=result_tool_return_content)[self._new_message_index :]

new_messages_json

new_messages_json(
    *, result_tool_return_content: str | None = None
) -> bytes

Return new messages from new_messages as JSON bytes.

Parameters:

Name Type Description Default
result_tool_return_content str | None

The return content of the tool call to set in the last message. This provides a convenient way to modify the content of the result tool call if you want to continue the conversation and want to set the response to the result tool call. If None, the last message will not be modified.

None

Returns:

Type Description
bytes

JSON bytes representing the new messages.

Source code in pydantic_ai_slim/pydantic_ai/result.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def new_messages_json(self, *, result_tool_return_content: str | None = None) -> bytes:
    """Return new messages from [`new_messages`][pydantic_ai.result._BaseRunResult.new_messages] as JSON bytes.

    Args:
        result_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the result tool call if you want to continue
            the conversation and want to set the response to the result tool call. If `None`, the last message will
            not be modified.

    Returns:
        JSON bytes representing the new messages.
    """
    return _messages.ModelMessagesTypeAdapter.dump_json(
        self.new_messages(result_tool_return_content=result_tool_return_content)
    )

is_complete class-attribute instance-attribute

is_complete: bool = field(default=False, init=False)

Whether the stream has all been received.

This is set to True when one of stream, stream_text, stream_structured or get_data completes.

stream async

stream(
    *, debounce_by: float | None = 0.1
) -> AsyncIterator[ResultData]

Stream the response as an async iterable.

The pydantic validator for structured data will be called in partial mode on each iteration.

Parameters:

Name Type Description Default
debounce_by float | None

by how much (if at all) to debounce/group the response chunks by. None means no debouncing. Debouncing is particularly important for long structured responses to reduce the overhead of performing validation as each token is received.

0.1

Returns:

Type Description
AsyncIterator[ResultData]

An async iterable of the response data.

Source code in pydantic_ai_slim/pydantic_ai/result.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
async def stream(self, *, debounce_by: float | None = 0.1) -> AsyncIterator[ResultData]:
    """Stream the response as an async iterable.

    The pydantic validator for structured data will be called in
    [partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation)
    on each iteration.

    Args:
        debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
            Debouncing is particularly important for long structured responses to reduce the overhead of
            performing validation as each token is received.

    Returns:
        An async iterable of the response data.
    """
    if isinstance(self._stream_response, models.StreamTextResponse):
        async for text in self.stream_text(debounce_by=debounce_by):
            yield cast(ResultData, text)
    else:
        async for structured_message, is_last in self.stream_structured(debounce_by=debounce_by):
            yield await self.validate_structured_result(structured_message, allow_partial=not is_last)

stream_text async

stream_text(
    *, delta: bool = False, debounce_by: float | None = 0.1
) -> AsyncIterator[str]

Stream the text result as an async iterable.

Note

This method will fail if the response is structured, e.g. if is_structured returns True.

Note

Result validators will NOT be called on the text result if delta=True.

Parameters:

Name Type Description Default
delta bool

if True, yield each chunk of text as it is received, if False (default), yield the full text up to the current point.

False
debounce_by float | None

by how much (if at all) to debounce/group the response chunks by. None means no debouncing. Debouncing is particularly important for long structured responses to reduce the overhead of performing validation as each token is received.

0.1
Source code in pydantic_ai_slim/pydantic_ai/result.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]:
    """Stream the text result as an async iterable.

    !!! note
        This method will fail if the response is structured,
        e.g. if [`is_structured`][pydantic_ai.result.StreamedRunResult.is_structured] returns `True`.

    !!! note
        Result validators will NOT be called on the text result if `delta=True`.

    Args:
        delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text
            up to the current point.
        debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
            Debouncing is particularly important for long structured responses to reduce the overhead of
            performing validation as each token is received.
    """
    usage_checking_stream = _get_usage_checking_stream_response(
        self._stream_response, self._usage_limits, self.usage
    )

    with _logfire.span('response stream text') as lf_span:
        if isinstance(self._stream_response, models.StreamStructuredResponse):
            raise exceptions.UserError('stream_text() can only be used with text responses')
        if delta:
            async with _utils.group_by_temporal(usage_checking_stream, debounce_by) as group_iter:
                async for _ in group_iter:
                    yield ''.join(self._stream_response.get())
            final_delta = ''.join(self._stream_response.get(final=True))
            if final_delta:
                yield final_delta
        else:
            # a quick benchmark shows it's faster to build up a string with concat when we're
            # yielding at each step
            chunks: list[str] = []
            combined = ''
            async with _utils.group_by_temporal(usage_checking_stream, debounce_by) as group_iter:
                async for _ in group_iter:
                    new = False
                    for chunk in self._stream_response.get():
                        chunks.append(chunk)
                        new = True
                    if new:
                        combined = await self._validate_text_result(''.join(chunks))
                        yield combined

            new = False
            for chunk in self._stream_response.get(final=True):
                chunks.append(chunk)
                new = True
            if new:
                combined = await self._validate_text_result(''.join(chunks))
                yield combined
            lf_span.set_attribute('combined_text', combined)
            await self._marked_completed(_messages.ModelResponse.from_text(combined))

stream_structured async

stream_structured(
    *, debounce_by: float | None = 0.1
) -> AsyncIterator[tuple[ModelResponse, bool]]

Stream the response as an async iterable of Structured LLM Messages.

Note

This method will fail if the response is text, e.g. if is_structured returns False.

Parameters:

Name Type Description Default
debounce_by float | None

by how much (if at all) to debounce/group the response chunks by. None means no debouncing. Debouncing is particularly important for long structured responses to reduce the overhead of performing validation as each token is received.

0.1

Returns:

Type Description
AsyncIterator[tuple[ModelResponse, bool]]

An async iterable of the structured response message and whether that is the last message.

Source code in pydantic_ai_slim/pydantic_ai/result.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
async def stream_structured(
    self, *, debounce_by: float | None = 0.1
) -> AsyncIterator[tuple[_messages.ModelResponse, bool]]:
    """Stream the response as an async iterable of Structured LLM Messages.

    !!! note
        This method will fail if the response is text,
        e.g. if [`is_structured`][pydantic_ai.result.StreamedRunResult.is_structured] returns `False`.

    Args:
        debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
            Debouncing is particularly important for long structured responses to reduce the overhead of
            performing validation as each token is received.

    Returns:
        An async iterable of the structured response message and whether that is the last message.
    """
    usage_checking_stream = _get_usage_checking_stream_response(
        self._stream_response, self._usage_limits, self.usage
    )

    with _logfire.span('response stream structured') as lf_span:
        if isinstance(self._stream_response, models.StreamTextResponse):
            raise exceptions.UserError('stream_structured() can only be used with structured responses')
        else:
            # we should already have a message at this point, yield that first if it has any content
            msg = self._stream_response.get()
            for item in msg.parts:
                if isinstance(item, _messages.ToolCallPart) and item.has_content():
                    yield msg, False
                    break
            async with _utils.group_by_temporal(usage_checking_stream, debounce_by) as group_iter:
                async for _ in group_iter:
                    msg = self._stream_response.get()
                    for item in msg.parts:
                        if isinstance(item, _messages.ToolCallPart) and item.has_content():
                            yield msg, False
                            break
            msg = self._stream_response.get(final=True)
            yield msg, True
            lf_span.set_attribute('structured_response', msg)
            await self._marked_completed(msg)

get_data async

get_data() -> ResultData

Stream the whole response, validate and return it.

Source code in pydantic_ai_slim/pydantic_ai/result.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
async def get_data(self) -> ResultData:
    """Stream the whole response, validate and return it."""
    usage_checking_stream = _get_usage_checking_stream_response(
        self._stream_response, self._usage_limits, self.usage
    )

    async for _ in usage_checking_stream:
        pass

    if isinstance(self._stream_response, models.StreamTextResponse):
        text = ''.join(self._stream_response.get(final=True))
        text = await self._validate_text_result(text)
        await self._marked_completed(_messages.ModelResponse.from_text(text))
        return cast(ResultData, text)
    else:
        message = self._stream_response.get(final=True)
        await self._marked_completed(message)
        return await self.validate_structured_result(message)

is_structured property

is_structured: bool

Return whether the stream response contains structured data (as opposed to text).

usage

usage() -> Usage

Return the usage of the whole run.

Note

This won't return the full usage until the stream is finished.

Source code in pydantic_ai_slim/pydantic_ai/result.py
333
334
335
336
337
338
339
def usage(self) -> Usage:
    """Return the usage of the whole run.

    !!! note
        This won't return the full usage until the stream is finished.
    """
    return self._run_ctx.usage + self._stream_response.usage()

timestamp

timestamp() -> datetime

Get the timestamp of the response.

Source code in pydantic_ai_slim/pydantic_ai/result.py
341
342
343
def timestamp(self) -> datetime:
    """Get the timestamp of the response."""
    return self._stream_response.timestamp()

validate_structured_result async

validate_structured_result(
    message: ModelResponse, *, allow_partial: bool = False
) -> ResultData

Validate a structured result message.

Source code in pydantic_ai_slim/pydantic_ai/result.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
async def validate_structured_result(
    self, message: _messages.ModelResponse, *, allow_partial: bool = False
) -> ResultData:
    """Validate a structured result message."""
    assert self._result_schema is not None, 'Expected _result_schema to not be None'
    assert self._result_tool_name is not None, 'Expected _result_tool_name to not be None'
    match = self._result_schema.find_named_tool(message.parts, self._result_tool_name)
    if match is None:
        raise exceptions.UnexpectedModelBehavior(
            f'Invalid message, unable to find tool: {self._result_schema.tool_names()}'
        )

    call, result_tool = match
    result_data = result_tool.validate(call, allow_partial=allow_partial, wrap_validation_errors=False)

    for validator in self._result_validators:
        result_data = await validator.validate(result_data, call, self._run_ctx)
    return result_data