Skip to content

pydantic_ai.models.function

A model controlled by a local function.

FunctionModel is similar to TestModel, but allows greater control over the model's behavior.

Its primary use case is for more advanced unit testing than is possible with TestModel.

FunctionModel dataclass

Bases: Model

A model controlled by a local function.

Apart from __init__, all methods are private or match those of the base class.

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@dataclass(init=False)
class FunctionModel(Model):
    """A model controlled by a local function.

    Apart from `__init__`, all methods are private or match those of the base class.
    """

    function: FunctionDef | None = None
    stream_function: StreamFunctionDef | None = None

    @overload
    def __init__(self, function: FunctionDef) -> None: ...

    @overload
    def __init__(self, *, stream_function: StreamFunctionDef) -> None: ...

    @overload
    def __init__(self, function: FunctionDef, *, stream_function: StreamFunctionDef) -> None: ...

    def __init__(self, function: FunctionDef | None = None, *, stream_function: StreamFunctionDef | None = None):
        """Initialize a `FunctionModel`.

        Either `function` or `stream_function` must be provided, providing both is allowed.

        Args:
            function: The function to call for non-streamed requests.
            stream_function: The function to call for streamed requests.
        """
        if function is None and stream_function is None:
            raise TypeError('Either `function` or `stream_function` must be provided')
        self.function = function
        self.stream_function = stream_function

    async def agent_model(
        self,
        function_tools: Mapping[str, AbstractToolDefinition],
        allow_text_result: bool,
        result_tools: Sequence[AbstractToolDefinition] | None,
    ) -> AgentModel:
        result_tools = list(result_tools) if result_tools is not None else None
        return FunctionAgentModel(
            self.function, self.stream_function, AgentInfo(function_tools, allow_text_result, result_tools)
        )

    def name(self) -> str:
        labels: list[str] = []
        if self.function is not None:
            labels.append(self.function.__name__)
        if self.stream_function is not None:
            labels.append(f'stream-{self.stream_function.__name__}')
        return f'function:{",".join(labels)}'

__init__

__init__(function: FunctionDef) -> None
__init__(*, stream_function: StreamFunctionDef) -> None
__init__(
    function: FunctionDef,
    *,
    stream_function: StreamFunctionDef
) -> None
__init__(
    function: FunctionDef | None = None,
    *,
    stream_function: StreamFunctionDef | None = None
)

Initialize a FunctionModel.

Either function or stream_function must be provided, providing both is allowed.

Parameters:

Name Type Description Default
function FunctionDef | None

The function to call for non-streamed requests.

None
stream_function StreamFunctionDef | None

The function to call for streamed requests.

None
Source code in pydantic_ai_slim/pydantic_ai/models/function.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, function: FunctionDef | None = None, *, stream_function: StreamFunctionDef | None = None):
    """Initialize a `FunctionModel`.

    Either `function` or `stream_function` must be provided, providing both is allowed.

    Args:
        function: The function to call for non-streamed requests.
        stream_function: The function to call for streamed requests.
    """
    if function is None and stream_function is None:
        raise TypeError('Either `function` or `stream_function` must be provided')
    self.function = function
    self.stream_function = stream_function

AgentInfo dataclass

Information about an agent.

This is passed as the second to functions used within FunctionModel.

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@dataclass(frozen=True)
class AgentInfo:
    """Information about an agent.

    This is passed as the second to functions used within [`FunctionModel`][pydantic_ai.models.function.FunctionModel].
    """

    function_tools: Mapping[str, AbstractToolDefinition]
    """The function tools available on this agent.

    These are the tools registered via the [`tool`][pydantic_ai.Agent.tool] and
    [`tool_plain`][pydantic_ai.Agent.tool_plain] decorators.
    """
    allow_text_result: bool
    """Whether a plain text result is allowed."""
    result_tools: list[AbstractToolDefinition] | None
    """The tools that can called as the final result of the run."""

function_tools instance-attribute

The function tools available on this agent.

These are the tools registered via the tool and tool_plain decorators.

allow_text_result instance-attribute

allow_text_result: bool

Whether a plain text result is allowed.

result_tools instance-attribute

result_tools: list[AbstractToolDefinition] | None

The tools that can called as the final result of the run.

DeltaToolCall dataclass

Incremental change to a tool call.

Used to describe a chunk when streaming structured responses.

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
 99
100
101
102
103
104
105
106
107
108
109
@dataclass
class DeltaToolCall:
    """Incremental change to a tool call.

    Used to describe a chunk when streaming structured responses.
    """

    name: str | None = None
    """Incremental change to the name of the tool."""
    json_args: str | None = None
    """Incremental change to the arguments as JSON"""

name class-attribute instance-attribute

name: str | None = None

Incremental change to the name of the tool.

json_args class-attribute instance-attribute

json_args: str | None = None

Incremental change to the arguments as JSON

DeltaToolCalls module-attribute

DeltaToolCalls: TypeAlias = dict[int, DeltaToolCall]

A mapping of tool call IDs to incremental changes.

FunctionDef module-attribute

A function used to generate a non-streamed response.

StreamFunctionDef module-attribute

A function used to generate a streamed response.

While this is defined as having return type of AsyncIterator[Union[str, DeltaToolCalls]], it should really be considered as Union[AsyncIterator[str], AsyncIterator[DeltaToolCalls],

E.g. you need to yield all text or all DeltaToolCalls, not mix them.

FunctionAgentModel dataclass

Bases: AgentModel

Implementation of AgentModel for FunctionModel.

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@dataclass
class FunctionAgentModel(AgentModel):
    """Implementation of `AgentModel` for [FunctionModel][pydantic_ai.models.function.FunctionModel]."""

    function: FunctionDef | None
    stream_function: StreamFunctionDef | None
    agent_info: AgentInfo

    async def request(self, messages: list[Message]) -> tuple[ModelAnyResponse, result.Cost]:
        assert self.function is not None, 'FunctionModel must receive a `function` to support non-streamed requests'
        if inspect.iscoroutinefunction(self.function):
            response = await self.function(messages, self.agent_info)
        else:
            response_ = await _utils.run_in_executor(self.function, messages, self.agent_info)
            response = cast(ModelAnyResponse, response_)
        # TODO is `messages` right here? Should it just be new messages?
        return response, _estimate_cost(chain(messages, [response]))

    @asynccontextmanager
    async def request_stream(self, messages: list[Message]) -> AsyncIterator[EitherStreamedResponse]:
        assert (
            self.stream_function is not None
        ), 'FunctionModel must receive a `stream_function` to support streamed requests'
        response_stream = self.stream_function(messages, self.agent_info)
        try:
            first = await response_stream.__anext__()
        except StopAsyncIteration as e:
            raise ValueError('Stream function must return at least one item') from e

        if isinstance(first, str):
            text_stream = cast(AsyncIterator[str], response_stream)
            yield FunctionStreamTextResponse(first, text_stream)
        else:
            structured_stream = cast(AsyncIterator[DeltaToolCalls], response_stream)
            yield FunctionStreamStructuredResponse(first, structured_stream)

FunctionStreamTextResponse dataclass

Bases: StreamTextResponse

Implementation of StreamTextResponse for FunctionModel.

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@dataclass
class FunctionStreamTextResponse(StreamTextResponse):
    """Implementation of `StreamTextResponse` for [FunctionModel][pydantic_ai.models.function.FunctionModel]."""

    _next: str | None
    _iter: AsyncIterator[str]
    _timestamp: datetime = field(default_factory=_utils.now_utc, init=False)
    _buffer: list[str] = field(default_factory=list, init=False)

    async def __anext__(self) -> None:
        if self._next is not None:
            self._buffer.append(self._next)
            self._next = None
        else:
            self._buffer.append(await self._iter.__anext__())

    def get(self, *, final: bool = False) -> Iterable[str]:
        yield from self._buffer
        self._buffer.clear()

    def cost(self) -> result.Cost:
        return result.Cost()

    def timestamp(self) -> datetime:
        return self._timestamp

FunctionStreamStructuredResponse dataclass

Bases: StreamStructuredResponse

Implementation of StreamStructuredResponse for FunctionModel.

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@dataclass
class FunctionStreamStructuredResponse(StreamStructuredResponse):
    """Implementation of `StreamStructuredResponse` for [FunctionModel][pydantic_ai.models.function.FunctionModel]."""

    _next: DeltaToolCalls | None
    _iter: AsyncIterator[DeltaToolCalls]
    _delta_tool_calls: dict[int, DeltaToolCall] = field(default_factory=dict)
    _timestamp: datetime = field(default_factory=_utils.now_utc)

    async def __anext__(self) -> None:
        if self._next is not None:
            tool_call = self._next
            self._next = None
        else:
            tool_call = await self._iter.__anext__()

        for key, new in tool_call.items():
            if current := self._delta_tool_calls.get(key):
                current.name = _utils.add_optional(current.name, new.name)
                current.json_args = _utils.add_optional(current.json_args, new.json_args)
            else:
                self._delta_tool_calls[key] = new

    def get(self, *, final: bool = False) -> ModelStructuredResponse:
        calls: list[ToolCall] = []
        for c in self._delta_tool_calls.values():
            if c.name is not None and c.json_args is not None:
                calls.append(ToolCall.from_json(c.name, c.json_args))

        return ModelStructuredResponse(calls, timestamp=self._timestamp)

    def cost(self) -> result.Cost:
        return result.Cost()

    def timestamp(self) -> datetime:
        return self._timestamp