Skip to content

pydantic_ai.models.gemini

Custom interface to the generativelanguage.googleapis.com API using HTTPX and [Pydantic](https://docs.pydantic.dev/latest/.

The Google SDK for interacting with the generativelanguage.googleapis.com API google-generativeai reads like it was written by a Java developer who thought they knew everything about OOP, spent 30 minutes trying to learn Python, gave up and decided to build the library to prove how horrible Python is. It also doesn't use httpx for HTTP requests, and tries to implement tool calling itself, but doesn't use Pydantic or equivalent for validation.

We therefore implement support for the API directly.

Despite these shortcomings, the Gemini model is actually quite powerful and very fast.

Setup

For details on how to set up authentication with this model, see model configuration for Gemini.

GeminiModelName module-attribute

GeminiModelName = Literal[
    "gemini-1.5-flash",
    "gemini-1.5-flash-8b",
    "gemini-1.5-pro",
    "gemini-1.0-pro",
]

Named Gemini models.

See the Gemini API docs for a full list.

GeminiModel dataclass

Bases: Model

A model that uses Gemini via generativelanguage.googleapis.com API.

This is implemented from scratch rather than using a dedicated SDK, good API documentation is available here.

Apart from __init__, all methods are private or match those of the base class.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@dataclass(init=False)
class GeminiModel(Model):
    """A model that uses Gemini via `generativelanguage.googleapis.com` API.

    This is implemented from scratch rather than using a dedicated SDK, good API documentation is
    available [here](https://ai.google.dev/api).

    Apart from `__init__`, all methods are private or match those of the base class.
    """

    model_name: GeminiModelName
    auth: AuthProtocol
    http_client: AsyncHTTPClient
    url: str

    def __init__(
        self,
        model_name: GeminiModelName,
        *,
        api_key: str | None = None,
        http_client: AsyncHTTPClient | None = None,
        url_template: str = 'https://generativelanguage.googleapis.com/v1beta/models/{model}:',
    ):
        """Initialize a Gemini model.

        Args:
            model_name: The name of the model to use.
            api_key: The API key to use for authentication, if not provided, the `GEMINI_API_KEY` environment variable
                will be used if available.
            http_client: An existing `httpx.AsyncClient` to use for making HTTP requests.
            url_template: The URL template to use for making requests, you shouldn't need to change this,
                docs [here](https://ai.google.dev/gemini-api/docs/quickstart?lang=rest#make-first-request),
                `model` is substituted with the model name, and `function` is added to the end of the URL.
        """
        self.model_name = model_name
        if api_key is None:
            if env_api_key := os.getenv('GEMINI_API_KEY'):
                api_key = env_api_key
            else:
                raise exceptions.UserError('API key must be provided or set in the GEMINI_API_KEY environment variable')
        self.auth = ApiKeyAuth(api_key)
        self.http_client = http_client or cached_async_http_client()
        self.url = url_template.format(model=model_name)

    async def agent_model(
        self,
        function_tools: Mapping[str, AbstractToolDefinition],
        allow_text_result: bool,
        result_tools: Sequence[AbstractToolDefinition] | None,
    ) -> GeminiAgentModel:
        return GeminiAgentModel(
            http_client=self.http_client,
            model_name=self.model_name,
            auth=self.auth,
            url=self.url,
            function_tools=function_tools,
            allow_text_result=allow_text_result,
            result_tools=result_tools,
        )

    def name(self) -> str:
        return self.model_name

__init__

__init__(
    model_name: GeminiModelName,
    *,
    api_key: str | None = None,
    http_client: AsyncClient | None = None,
    url_template: str = "https://generativelanguage.googleapis.com/v1beta/models/{model}:"
)

Initialize a Gemini model.

Parameters:

Name Type Description Default
model_name GeminiModelName

The name of the model to use.

required
api_key str | None

The API key to use for authentication, if not provided, the GEMINI_API_KEY environment variable will be used if available.

None
http_client AsyncClient | None

An existing httpx.AsyncClient to use for making HTTP requests.

None
url_template str

The URL template to use for making requests, you shouldn't need to change this, docs here, model is substituted with the model name, and function is added to the end of the URL.

'https://generativelanguage.googleapis.com/v1beta/models/{model}:'
Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    model_name: GeminiModelName,
    *,
    api_key: str | None = None,
    http_client: AsyncHTTPClient | None = None,
    url_template: str = 'https://generativelanguage.googleapis.com/v1beta/models/{model}:',
):
    """Initialize a Gemini model.

    Args:
        model_name: The name of the model to use.
        api_key: The API key to use for authentication, if not provided, the `GEMINI_API_KEY` environment variable
            will be used if available.
        http_client: An existing `httpx.AsyncClient` to use for making HTTP requests.
        url_template: The URL template to use for making requests, you shouldn't need to change this,
            docs [here](https://ai.google.dev/gemini-api/docs/quickstart?lang=rest#make-first-request),
            `model` is substituted with the model name, and `function` is added to the end of the URL.
    """
    self.model_name = model_name
    if api_key is None:
        if env_api_key := os.getenv('GEMINI_API_KEY'):
            api_key = env_api_key
        else:
            raise exceptions.UserError('API key must be provided or set in the GEMINI_API_KEY environment variable')
    self.auth = ApiKeyAuth(api_key)
    self.http_client = http_client or cached_async_http_client()
    self.url = url_template.format(model=model_name)

AuthProtocol

Bases: Protocol

Abstract definition for Gemini authentication.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
111
112
113
114
class AuthProtocol(Protocol):
    """Abstract definition for Gemini authentication."""

    async def headers(self) -> dict[str, str]: ...

ApiKeyAuth dataclass

Authentication using an API key for the X-Goog-Api-Key header.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
117
118
119
120
121
122
123
124
125
@dataclass
class ApiKeyAuth:
    """Authentication using an API key for the `X-Goog-Api-Key` header."""

    api_key: str

    async def headers(self) -> dict[str, str]:
        # https://cloud.google.com/docs/authentication/api-keys-use#using-with-rest
        return {'X-Goog-Api-Key': self.api_key}

GeminiAgentModel dataclass

Bases: AgentModel

Implementation of AgentModel for Gemini models.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@dataclass(init=False)
class GeminiAgentModel(AgentModel):
    """Implementation of `AgentModel` for Gemini models."""

    http_client: AsyncHTTPClient
    model_name: GeminiModelName
    auth: AuthProtocol
    tools: _GeminiTools | None
    tool_config: _GeminiToolConfig | None
    url: str

    def __init__(
        self,
        http_client: AsyncHTTPClient,
        model_name: GeminiModelName,
        auth: AuthProtocol,
        url: str,
        function_tools: Mapping[str, AbstractToolDefinition],
        allow_text_result: bool,
        result_tools: Sequence[AbstractToolDefinition] | None,
    ):
        check_allow_model_requests()
        tools = [_function_from_abstract_tool(t) for t in function_tools.values()]
        if result_tools is not None:
            tools += [_function_from_abstract_tool(t) for t in result_tools]

        if allow_text_result:
            tool_config = None
        else:
            tool_config = _tool_config([t['name'] for t in tools])

        self.http_client = http_client
        self.model_name = model_name
        self.auth = auth
        self.tools = _GeminiTools(function_declarations=tools) if tools else None
        self.tool_config = tool_config
        self.url = url

    async def request(self, messages: list[Message]) -> tuple[ModelAnyResponse, result.Cost]:
        async with self._make_request(messages, False) as http_response:
            response = _gemini_response_ta.validate_json(await http_response.aread())
        return self._process_response(response), _metadata_as_cost(response)

    @asynccontextmanager
    async def request_stream(self, messages: list[Message]) -> AsyncIterator[EitherStreamedResponse]:
        async with self._make_request(messages, True) as http_response:
            yield await self._process_streamed_response(http_response)

    @asynccontextmanager
    async def _make_request(self, messages: list[Message], streamed: bool) -> AsyncIterator[HTTPResponse]:
        contents: list[_GeminiContent] = []
        sys_prompt_parts: list[_GeminiTextPart] = []
        for m in messages:
            either_content = self._message_to_gemini(m)
            if left := either_content.left:
                sys_prompt_parts.append(left.value)
            else:
                contents.append(either_content.right)

        request_data = _GeminiRequest(contents=contents)
        if sys_prompt_parts:
            request_data['system_instruction'] = _GeminiTextContent(role='user', parts=sys_prompt_parts)
        if self.tools is not None:
            request_data['tools'] = self.tools
        if self.tool_config is not None:
            request_data['tool_config'] = self.tool_config

        url = self.url + ('streamGenerateContent' if streamed else 'generateContent')

        headers = {
            'Content-Type': 'application/json',
            'User-Agent': get_user_agent(),
            **await self.auth.headers(),
        }

        request_json = _gemini_request_ta.dump_json(request_data, by_alias=True)

        async with self.http_client.stream('POST', url, content=request_json, headers=headers) as r:
            if r.status_code != 200:
                await r.aread()
                raise exceptions.UnexpectedModelBehavior(f'Unexpected response from gemini {r.status_code}', r.text)
            yield r

    @staticmethod
    def _process_response(response: _GeminiResponse) -> ModelAnyResponse:
        either = _extract_response_parts(response)
        if left := either.left:
            return _structured_response_from_parts(left.value)
        else:
            return ModelTextResponse(content=''.join(part['text'] for part in either.right))

    @staticmethod
    async def _process_streamed_response(http_response: HTTPResponse) -> EitherStreamedResponse:
        """Process a streamed response, and prepare a streaming response to return."""
        aiter_bytes = http_response.aiter_bytes()
        start_response: _GeminiResponse | None = None
        content = bytearray()

        async for chunk in aiter_bytes:
            content.extend(chunk)
            responses = _gemini_streamed_response_ta.validate_json(
                content,
                experimental_allow_partial='trailing-strings',
            )
            if responses:
                last = responses[-1]
                if last['candidates'] and last['candidates'][0]['content']['parts']:
                    start_response = last
                    break

        if start_response is None:
            raise UnexpectedModelBehavior('Streamed response ended without content or tool calls')

        if _extract_response_parts(start_response).is_left():
            return GeminiStreamStructuredResponse(_content=content, _stream=aiter_bytes)
        else:
            return GeminiStreamTextResponse(_json_content=content, _stream=aiter_bytes)

    @staticmethod
    def _message_to_gemini(m: Message) -> _utils.Either[_GeminiTextPart, _GeminiContent]:
        """Convert a message to a _GeminiTextPart for "system_instructions" or _GeminiContent for "contents"."""
        if m.role == 'system':
            # SystemPrompt ->
            return _utils.Either(left=_GeminiTextPart(text=m.content))
        elif m.role == 'user':
            # UserPrompt ->
            return _utils.Either(right=_content_user_text(m.content))
        elif m.role == 'tool-return':
            # ToolReturn ->
            return _utils.Either(right=_content_function_return(m))
        elif m.role == 'retry-prompt':
            # RetryPrompt ->
            return _utils.Either(right=_content_function_retry(m))
        elif m.role == 'model-text-response':
            # ModelTextResponse ->
            return _utils.Either(right=_content_model_text(m.content))
        elif m.role == 'model-structured-response':
            # ModelStructuredResponse ->
            return _utils.Either(right=_content_function_call(m))
        else:
            assert_never(m)

GeminiStreamTextResponse dataclass

Bases: StreamTextResponse

Implementation of StreamTextResponse for the Gemini model.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
@dataclass
class GeminiStreamTextResponse(StreamTextResponse):
    """Implementation of `StreamTextResponse` for the Gemini model."""

    _json_content: bytearray
    _stream: AsyncIterator[bytes]
    _position: int = 0
    _timestamp: datetime = field(default_factory=_utils.now_utc, init=False)
    _cost: result.Cost = field(default_factory=result.Cost, init=False)

    async def __anext__(self) -> None:
        chunk = await self._stream.__anext__()
        self._json_content.extend(chunk)

    def get(self, *, final: bool = False) -> Iterable[str]:
        if final:
            all_items = pydantic_core.from_json(self._json_content)
            new_items = all_items[self._position :]
            self._position = len(all_items)
            new_responses = _gemini_streamed_response_ta.validate_python(new_items)
        else:
            all_items = pydantic_core.from_json(self._json_content, allow_partial=True)
            new_items = all_items[self._position : -1]
            self._position = len(all_items) - 1
            new_responses = _gemini_streamed_response_ta.validate_python(
                new_items, experimental_allow_partial='trailing-strings'
            )
        for r in new_responses:
            self._cost += _metadata_as_cost(r)
            parts = r['candidates'][0]['content']['parts']
            if _all_text_parts(parts):
                for part in parts:
                    yield part['text']
            else:
                raise UnexpectedModelBehavior(
                    'Streamed response with unexpected content, expected all parts to be text'
                )

    def cost(self) -> result.Cost:
        return self._cost

    def timestamp(self) -> datetime:
        return self._timestamp

GeminiStreamStructuredResponse dataclass

Bases: StreamStructuredResponse

Implementation of StreamStructuredResponse for the Gemini model.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
@dataclass
class GeminiStreamStructuredResponse(StreamStructuredResponse):
    """Implementation of `StreamStructuredResponse` for the Gemini model."""

    _content: bytearray
    _stream: AsyncIterator[bytes]
    _timestamp: datetime = field(default_factory=_utils.now_utc, init=False)
    _cost: result.Cost = field(default_factory=result.Cost, init=False)

    async def __anext__(self) -> None:
        chunk = await self._stream.__anext__()
        self._content.extend(chunk)

    def get(self, *, final: bool = False) -> ModelStructuredResponse:
        """Get the `ModelStructuredResponse` at this point.

        NOTE: It's not clear how the stream of responses should be combined because Gemini seems to always
        reply with a single response, when returning a structured data.

        I'm therefore assuming that each part contains a complete tool call, and not trying to combine data from
        separate parts.
        """
        responses = _gemini_streamed_response_ta.validate_json(
            self._content,
            experimental_allow_partial='off' if final else 'trailing-strings',
        )
        combined_parts: list[_GeminiFunctionCallPart] = []
        self._cost = result.Cost()
        for r in responses:
            self._cost += _metadata_as_cost(r)
            candidate = r['candidates'][0]
            parts = candidate['content']['parts']
            if _all_function_call_parts(parts):
                combined_parts.extend(parts)
            elif not candidate.get('finish_reason'):
                # you can get an empty text part along with the finish_reason, so we ignore that case
                raise UnexpectedModelBehavior(
                    'Streamed response with unexpected content, expected all parts to be function calls'
                )
        return _structured_response_from_parts(combined_parts, timestamp=self._timestamp)

    def cost(self) -> result.Cost:
        return self._cost

    def timestamp(self) -> datetime:
        return self._timestamp

get

get(*, final: bool = False) -> ModelStructuredResponse

Get the ModelStructuredResponse at this point.

NOTE: It's not clear how the stream of responses should be combined because Gemini seems to always reply with a single response, when returning a structured data.

I'm therefore assuming that each part contains a complete tool call, and not trying to combine data from separate parts.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def get(self, *, final: bool = False) -> ModelStructuredResponse:
    """Get the `ModelStructuredResponse` at this point.

    NOTE: It's not clear how the stream of responses should be combined because Gemini seems to always
    reply with a single response, when returning a structured data.

    I'm therefore assuming that each part contains a complete tool call, and not trying to combine data from
    separate parts.
    """
    responses = _gemini_streamed_response_ta.validate_json(
        self._content,
        experimental_allow_partial='off' if final else 'trailing-strings',
    )
    combined_parts: list[_GeminiFunctionCallPart] = []
    self._cost = result.Cost()
    for r in responses:
        self._cost += _metadata_as_cost(r)
        candidate = r['candidates'][0]
        parts = candidate['content']['parts']
        if _all_function_call_parts(parts):
            combined_parts.extend(parts)
        elif not candidate.get('finish_reason'):
            # you can get an empty text part along with the finish_reason, so we ignore that case
            raise UnexpectedModelBehavior(
                'Streamed response with unexpected content, expected all parts to be function calls'
            )
    return _structured_response_from_parts(combined_parts, timestamp=self._timestamp)