pydantic_ai — Concurrency
Bases: WrapperModel
A model wrapper that limits concurrent requests to the underlying model.
This wrapper applies concurrency limiting at the model level, ensuring that the number of concurrent requests to the model does not exceed the configured limit. This is useful for:
- Respecting API rate limits
- Managing resource usage
- Sharing a concurrency pool across multiple models
Example usage:
from pydantic_ai import Agent
from pydantic_ai.models.concurrency import ConcurrencyLimitedModel
# Limit to 5 concurrent requests
model = ConcurrencyLimitedModel('openai:gpt-4o', limiter=5)
agent = Agent(model)
# Or share a limiter across multiple models
from pydantic_ai import ConcurrencyLimiter # noqa E402
shared_limiter = ConcurrencyLimiter(max_running=10, name='openai-pool')
model1 = ConcurrencyLimitedModel('openai:gpt-4o', limiter=shared_limiter)
model2 = ConcurrencyLimitedModel('openai:gpt-4o-mini', limiter=shared_limiter)
Source code in pydantic_ai_slim/pydantic_ai/models/concurrency.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | |
__init__
__init__(
wrapped: Model | KnownModelName,
limiter: (
int | ConcurrencyLimit | AbstractConcurrencyLimiter
),
)
Initialize the ConcurrencyLimitedModel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wrapped
|
Model | KnownModelName
|
The model to wrap, either a Model instance or a known model name. |
required |
limiter
|
int | ConcurrencyLimit | AbstractConcurrencyLimiter
|
The concurrency limit configuration. Can be:
- An |
required |
Source code in pydantic_ai_slim/pydantic_ai/models/concurrency.py
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | |
request
async
request(
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
) -> ModelResponse
Make a request to the model with concurrency limiting.
Source code in pydantic_ai_slim/pydantic_ai/models/concurrency.py
78 79 80 81 82 83 84 85 86 | |
count_tokens
async
count_tokens(
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
) -> RequestUsage
Count tokens with concurrency limiting.
Source code in pydantic_ai_slim/pydantic_ai/models/concurrency.py
88 89 90 91 92 93 94 95 96 | |
request_stream
async
request_stream(
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
run_context: RunContext[Any] | None = None,
) -> AsyncIterator[StreamedResponse]
Make a streaming request to the model with concurrency limiting.
Source code in pydantic_ai_slim/pydantic_ai/models/concurrency.py
98 99 100 101 102 103 104 105 106 107 108 109 110 111 | |
Wrap a model with concurrency limiting.
This is a convenience function to wrap a model with concurrency limiting. If the limiter is None, the model is returned unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Model | KnownModelName
|
The model to wrap. |
required |
limiter
|
AnyConcurrencyLimit
|
The concurrency limit configuration. |
required |
Returns:
| Type | Description |
|---|---|
Model
|
The wrapped model with concurrency limiting, or the original model if limiter is None. |
Example:
from pydantic_ai.models.concurrency import limit_model_concurrency
model = limit_model_concurrency('openai:gpt-4o', limiter=5)
Source code in pydantic_ai_slim/pydantic_ai/models/concurrency.py
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | |
Bases: ABC
Abstract base class for concurrency limiters.
Subclass this to create custom concurrency limiters (e.g., Redis-backed distributed limiters).
Example:
from pydantic_ai.concurrency import AbstractConcurrencyLimiter
class RedisConcurrencyLimiter(AbstractConcurrencyLimiter):
def __init__(self, redis_client, key: str, max_running: int):
self._redis = redis_client
self._key = key
self._max_running = max_running
async def acquire(self, source: str) -> None:
# Implement Redis-based distributed locking
...
def release(self) -> None:
# Release the Redis lock
...
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | |
acquire
abstractmethod
async
acquire(source: str) -> None
Acquire a slot, waiting if necessary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Identifier for observability (e.g., 'model:gpt-4o'). |
required |
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
50 51 52 53 54 55 56 57 | |
release
abstractmethod
release() -> None
Release a slot.
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
59 60 61 62 | |
Bases: AbstractConcurrencyLimiter
A concurrency limiter that tracks waiting operations for observability.
This class wraps an anyio.CapacityLimiter and tracks the number of waiting operations. When an operation has to wait to acquire a slot, a span is created for observability purposes.
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | |
__init__
__init__(
max_running: int,
*,
max_queued: int | None = None,
name: str | None = None,
tracer: Tracer | None = None
)
Initialize the ConcurrencyLimiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_running
|
int
|
Maximum number of concurrent operations. |
required |
max_queued
|
int | None
|
Maximum queue depth before raising ConcurrencyLimitExceeded. |
None
|
name
|
str | None
|
Optional name for this limiter, used for observability when sharing a limiter across multiple models or agents. |
None
|
tracer
|
Tracer | None
|
OpenTelemetry tracer for span creation. |
None
|
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | |
from_limit
classmethod
from_limit(
limit: int | ConcurrencyLimit,
*,
name: str | None = None,
tracer: Tracer | None = None
) -> Self
Create a ConcurrencyLimiter from a ConcurrencyLimit configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | ConcurrencyLimit
|
Either an int for simple limiting or a ConcurrencyLimit for full config. |
required |
name
|
str | None
|
Optional name for this limiter, used for observability. |
None
|
tracer
|
Tracer | None
|
OpenTelemetry tracer for span creation. |
None
|
Returns:
| Type | Description |
|---|---|
Self
|
A configured ConcurrencyLimiter. |
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | |
acquire
async
acquire(source: str) -> None
Acquire a slot, creating a span if waiting is required.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Identifier for the source of this acquisition (e.g., 'agent:my-agent' or 'model:gpt-4'). |
required |
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | |
release
release() -> None
Release a slot.
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
224 225 226 | |
Configuration for concurrency limiting with optional backpressure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_running
|
int
|
Maximum number of concurrent operations allowed. |
required |
max_queued
|
int | None
|
Maximum number of operations waiting in the queue.
If None, the queue is unlimited. If exceeded, raises |
None
|
Source code in pydantic_ai_slim/pydantic_ai/concurrency.py
65 66 67 68 69 70 71 72 73 74 75 76 | |
Type alias for concurrency limit configuration.
Can be:
- An int: Simple limit on concurrent operations (unlimited queue).
- A ConcurrencyLimit: Full configuration with optional backpressure.
- An AbstractConcurrencyLimiter: A pre-created limiter instance for sharing across multiple models/agents.
- None: No concurrency limiting (default).
Bases: AgentRunError
Error raised when the concurrency queue depth exceeds max_queued.
Source code in pydantic_ai_slim/pydantic_ai/exceptions.py
133 134 | |