Retry Strategies
Handle transient failures in tasks and evaluators with automatic retry logic.
Overview
LLM-based systems can experience transient failures:
- Rate limits
- Network timeouts
- Temporary API outages
- Context length errors
Pydantic Evals supports retry configuration for both:
- Task execution - The function being evaluated
- Evaluator execution - The evaluators themselves
Basic Retry Configuration
Pass a retry configuration to evaluate() or evaluate_sync() using Tenacity parameters:
from tenacity import stop_after_attempt
from pydantic_evals import Case, Dataset
def my_function(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
report = dataset.evaluate_sync(
task=my_function,
retry_task={'stop': stop_after_attempt(3)},
retry_evaluators={'stop': stop_after_attempt(2)},
)
Retry Configuration Options
Retry configurations use Tenacity and support the same options as Pydantic AI's RetryConfig:
from tenacity import stop_after_attempt, wait_exponential
from pydantic_evals import Case, Dataset
def my_function(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
retry_config = {
'stop': stop_after_attempt(3), # Stop after 3 attempts
'wait': wait_exponential(multiplier=1, min=1, max=10), # Exponential backoff: 1s, 2s, 4s, 8s (capped at 10s)
'reraise': True, # Re-raise the original exception after exhausting retries
}
dataset.evaluate_sync(
task=my_function,
retry_task=retry_config,
)
Common Parameters
The retry configuration accepts any parameters from the tenacity retry decorator. Common ones include:
| Parameter | Type | Description |
|---|---|---|
stop |
StopBaseT |
Stop strategy (e.g., stop_after_attempt(3), stop_after_delay(60)) |
wait |
WaitBaseT |
Wait strategy (e.g., wait_exponential(), wait_fixed(2)) |
retry |
RetryBaseT |
Retry condition (e.g., retry_if_exception_type(TimeoutError)) |
reraise |
bool |
Whether to reraise the original exception (default: False) |
before_sleep |
Callable |
Callback before sleeping between retries |
See the Tenacity documentation for all available options.
Task Retries
Retry the task function when it fails:
from tenacity import stop_after_attempt, wait_exponential
from pydantic_evals import Case, Dataset
async def call_llm(inputs: str) -> str:
return f'LLM response to: {inputs}'
async def flaky_llm_task(inputs: str) -> str:
"""This might hit rate limits or timeout."""
response = await call_llm(inputs)
return response
dataset = Dataset(cases=[Case(inputs='test')])
report = dataset.evaluate_sync(
task=flaky_llm_task,
retry_task={
'stop': stop_after_attempt(5), # Try up to 5 times
'wait': wait_exponential(multiplier=1, min=1, max=30), # Exponential backoff, capped at 30s
'reraise': True,
},
)
When Task Retries Trigger
Retries trigger when the task raises an exception:
class RateLimitError(Exception):
pass
class ValidationError(Exception):
pass
async def call_api(inputs: str) -> str:
return f'API response: {inputs}'
async def my_task(inputs: str) -> str:
try:
return await call_api(inputs)
except RateLimitError:
# Will trigger retry
raise
except ValidationError:
# Will also trigger retry
raise
Exponential Backoff
When using wait_exponential(), delays increase exponentially:
Attempt 1: immediate
Attempt 2: ~1s delay (multiplier * 2^0)
Attempt 3: ~2s delay (multiplier * 2^1)
Attempt 4: ~4s delay (multiplier * 2^2)
Attempt 5: ~8s delay (multiplier * 2^3, capped at max)
The actual delay depends on the multiplier, min, and max parameters passed to wait_exponential().
Evaluator Retries
Retry evaluators when they fail:
from tenacity import stop_after_attempt, wait_exponential
from pydantic_evals import Case, Dataset
from pydantic_evals.evaluators import LLMJudge
def my_task(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(
cases=[Case(inputs='test')],
evaluators=[
# LLMJudge might hit rate limits
LLMJudge(rubric='Response is accurate'),
],
)
report = dataset.evaluate_sync(
task=my_task,
retry_evaluators={
'stop': stop_after_attempt(3),
'wait': wait_exponential(multiplier=1, min=0.5, max=10),
'reraise': True,
},
)
When Evaluator Retries Trigger
Retries trigger when an evaluator raises an exception:
from dataclasses import dataclass
from pydantic_evals.evaluators import Evaluator, EvaluatorContext
async def external_api_call(output: str) -> bool:
return len(output) > 0
@dataclass
class APIEvaluator(Evaluator):
async def evaluate(self, ctx: EvaluatorContext) -> bool:
# If this raises an exception, retry logic will trigger
result = await external_api_call(ctx.output)
return result
Evaluator Failures
If an evaluator fails after all retries, it's recorded as an EvaluatorFailure:
from tenacity import stop_after_attempt
from pydantic_evals import Case, Dataset
def task(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
report = dataset.evaluate_sync(task, retry_evaluators={'stop': stop_after_attempt(3)})
# Check for evaluator failures
for case in report.cases:
if case.evaluator_failures:
for failure in case.evaluator_failures:
print(f'Evaluator {failure.name} failed: {failure.error_message}')
#> (No output - no evaluator failures in this case)
View evaluator failures in reports:
from pydantic_evals import Case, Dataset
def task(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
report = dataset.evaluate_sync(task)
report.print(include_evaluator_failures=True)
"""
Evaluation Summary:
task
┏━━━━━━━━━━┳━━━━━━━━━━┓
┃ Case ID ┃ Duration ┃
┡━━━━━━━━━━╇━━━━━━━━━━┩
│ Case 1 │ 10ms │
├──────────┼──────────┤
│ Averages │ 10ms │
└──────────┴──────────┘
"""
#>
#> ✅ case_0 ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% (0/0)
Combining Task and Evaluator Retries
You can configure both independently:
from tenacity import stop_after_attempt, wait_exponential
from pydantic_evals import Case, Dataset
def flaky_task(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
report = dataset.evaluate_sync(
task=flaky_task,
retry_task={
'stop': stop_after_attempt(5), # Retry task up to 5 times
'wait': wait_exponential(multiplier=1, min=1, max=30),
'reraise': True,
},
retry_evaluators={
'stop': stop_after_attempt(3), # Retry evaluators up to 3 times
'wait': wait_exponential(multiplier=1, min=0.5, max=10),
'reraise': True,
},
)
Practical Examples
Rate Limit Handling
from tenacity import stop_after_attempt, wait_exponential
from pydantic_evals import Case, Dataset
from pydantic_evals.evaluators import LLMJudge
async def expensive_llm_call(inputs: str) -> str:
return f'LLM response: {inputs}'
async def llm_task(inputs: str) -> str:
"""Task that might hit rate limits."""
return await expensive_llm_call(inputs)
dataset = Dataset(
cases=[Case(inputs='test')],
evaluators=[
LLMJudge(rubric='Quality check'), # Also might hit rate limits
],
)
# Generous retries for rate limits
report = dataset.evaluate_sync(
task=llm_task,
retry_task={
'stop': stop_after_attempt(10), # Rate limits can take multiple retries
'wait': wait_exponential(multiplier=2, min=2, max=60), # Start at 2s, exponential up to 60s
'reraise': True,
},
retry_evaluators={
'stop': stop_after_attempt(5),
'wait': wait_exponential(multiplier=2, min=2, max=30),
'reraise': True,
},
)
Network Timeout Handling
import httpx
from tenacity import stop_after_attempt, wait_exponential
from pydantic_evals import Case, Dataset
async def api_task(inputs: str) -> str:
"""Task that calls external API which might timeout."""
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post('https://api.example.com', json={'input': inputs})
return response.text
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
# Quick retries for network issues
report = dataset.evaluate_sync(
task=api_task,
retry_task={
'stop': stop_after_attempt(4), # A few quick retries
'wait': wait_exponential(multiplier=0.5, min=0.5, max=5), # Fast retry, capped at 5s
'reraise': True,
},
)
Context Length Handling
from tenacity import stop_after_attempt
from pydantic_evals import Case, Dataset
class ContextLengthError(Exception):
pass
async def llm_call(inputs: str, max_tokens: int = 8000) -> str:
return f'LLM response: {inputs[:100]}'
async def smart_llm_task(inputs: str) -> str:
"""Task that might exceed context length."""
try:
return await llm_call(inputs, max_tokens=8000)
except ContextLengthError:
# Retry with shorter context
truncated_inputs = inputs[:4000]
return await llm_call(truncated_inputs, max_tokens=4000)
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
# Don't retry context length errors (handle in task)
report = dataset.evaluate_sync(
task=smart_llm_task,
retry_task={'stop': stop_after_attempt(1)}, # No retries, we handle it
)
Retry vs Error Handling
Use retries for: - Transient failures (rate limits, timeouts) - Network issues - Temporary service outages - Recoverable errors
Use error handling for: - Validation errors - Logic errors - Permanent failures - Expected error conditions
class RateLimitError(Exception):
pass
async def llm_call(inputs: str) -> str:
return f'LLM response: {inputs}'
def is_valid(result: str) -> bool:
return len(result) > 0
async def smart_task(inputs: str) -> str:
"""Handle expected errors, let retries handle transient failures."""
try:
result = await llm_call(inputs)
# Validate output (don't retry validation errors)
if not is_valid(result):
return 'ERROR: Invalid output format'
return result
except RateLimitError:
# Let retry logic handle this
raise
except ValueError as e:
# Don't retry - this is a permanent error
return f'ERROR: {e}'
Troubleshooting
"Still failing after retries"
Increase retry attempts or check if error is retriable:
import logging
from tenacity import stop_after_attempt
from pydantic_evals import Case, Dataset
def task(inputs: str) -> str:
return f'Result: {inputs}'
# Add logging to see what's failing
logging.basicConfig(level=logging.DEBUG)
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
# Tenacity logs retry attempts
report = dataset.evaluate_sync(task, retry_task={'stop': stop_after_attempt(5)})
"Evaluations taking too long"
Reduce retry attempts or wait times:
from tenacity import stop_after_attempt, wait_exponential
# Faster retries
retry_config = {
'stop': stop_after_attempt(3), # Fewer attempts
'wait': wait_exponential(multiplier=0.1, min=0.1, max=2), # Quick retries, capped at 2s
'reraise': True,
}
"Hitting rate limits despite retries"
Increase delays or use max_concurrency:
from tenacity import stop_after_attempt, wait_exponential
from pydantic_evals import Case, Dataset
def task(inputs: str) -> str:
return f'Result: {inputs}'
dataset = Dataset(cases=[Case(inputs='test')], evaluators=[])
# Longer delays
retry_config = {
'stop': stop_after_attempt(5),
'wait': wait_exponential(multiplier=5, min=5, max=60), # Start at 5s, exponential up to 60s
'reraise': True,
}
# Also reduce concurrency
report = dataset.evaluate_sync(
task=task,
retry_task=retry_config,
max_concurrency=2, # Only 2 concurrent tasks
)
Next Steps
- Concurrency & Performance - Optimize evaluation performance
- Logfire Integration - View retries in Logfire