Skip to content

pydantic_evals.online

Online evaluation — attach evaluators to live functions for automatic background evaluation.

This module provides the infrastructure for running evaluators on production (or staging) traffic. The same Evaluator instances used with Dataset.evaluate() work here, the difference is in how they are wired up (decorator vs dataset) rather than what they are.

Example:

from dataclasses import dataclass

from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate


@dataclass
class IsNonEmpty(Evaluator):
    def evaluate(self, ctx: EvaluatorContext) -> bool:
        return bool(ctx.output)


@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
    return x

OnErrorLocation module-attribute

OnErrorLocation = Literal['sink', 'on_max_concurrency']

The location within the online evaluation pipeline where an error occurred.

SamplingMode module-attribute

SamplingMode = Literal['independent', 'correlated']

Controls how per-evaluator sample rates interact across evaluators for a single call.

  • 'independent' (default): Each evaluator flips its own coin. With N evaluators each at rate r, the probability of any evaluation overhead is 1 − (1−r)^N.
  • 'correlated': A single random seed is generated per call and shared across evaluators. An evaluator runs when call_seed < rate, so lower-rate evaluators' calls are always a subset of higher-rate ones. The probability of any overhead equals max(rate_i).

SamplingContext dataclass

Context available when deciding whether to sample an evaluator.

Contains the information available before the decorated function runs — the evaluator instance, function inputs, config metadata, and a per-call random seed. The function's output and duration are not yet available at sampling time.

Source code in pydantic_evals/pydantic_evals/online.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@dataclass(kw_only=True)
class SamplingContext:
    """Context available when deciding whether to sample an evaluator.

    Contains the information available *before* the decorated function runs — the evaluator
    instance, function inputs, config metadata, and a per-call random seed. The function's
    output and duration are not yet available at sampling time.
    """

    evaluator: Evaluator
    """The evaluator being sampled."""
    inputs: Any
    """The inputs to the decorated function."""
    metadata: dict[str, Any] | None
    """Metadata from the [`OnlineEvalConfig`][pydantic_evals.online.OnlineEvalConfig], if set."""
    call_seed: float
    """A uniform random value in [0, 1) generated once per decorated function call.

    Shared across all evaluators for the same call. In `'correlated'` sampling mode this is
    used automatically; in `'independent'` mode it is available for custom `sample_rate`
    callables that want to implement their own correlated logic.
    """

evaluator instance-attribute

evaluator: Evaluator

The evaluator being sampled.

inputs instance-attribute

inputs: Any

The inputs to the decorated function.

metadata instance-attribute

metadata: dict[str, Any] | None

Metadata from the OnlineEvalConfig, if set.

call_seed instance-attribute

call_seed: float

A uniform random value in [0, 1) generated once per decorated function call.

Shared across all evaluators for the same call. In 'correlated' sampling mode this is used automatically; in 'independent' mode it is available for custom sample_rate callables that want to implement their own correlated logic.

OnMaxConcurrencyCallback module-attribute

OnMaxConcurrencyCallback = Callable[
    [EvaluatorContext], None | Awaitable[None]
]

Callback invoked when an evaluation is dropped due to concurrency limits.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async.

OnSamplingErrorCallback module-attribute

OnSamplingErrorCallback = Callable[
    [Exception, Evaluator], None
]

Callback invoked when a sample_rate callable raises an exception.

Called synchronously before the decorated function runs. Receives the exception and the evaluator whose sample_rate failed. Must be sync (not async). If set, the evaluator is skipped. If not set, the exception propagates to the caller.

OnErrorCallback module-attribute

OnErrorCallback = Callable[
    [
        Exception,
        EvaluatorContext,
        Evaluator,
        OnErrorLocation,
    ],
    None | Awaitable[None],
]

Callback invoked when an exception occurs in the online evaluation pipeline.

Receives the exception, the evaluator context, the evaluator instance, and a location string indicating where the error occurred. Can be sync or async.

disable_evaluation

disable_evaluation() -> Iterator[None]

Context manager to disable all online evaluation in the current context.

When active, decorated functions still execute normally but no evaluators are dispatched.

Source code in pydantic_evals/pydantic_evals/online.py
233
234
235
236
237
238
239
240
241
242
243
@contextmanager
def disable_evaluation() -> Iterator[None]:
    """Context manager to disable all online evaluation in the current context.

    When active, decorated functions still execute normally but no evaluators are dispatched.
    """
    token = _EVALUATION_DISABLED.set(True)
    try:
        yield
    finally:
        _EVALUATION_DISABLED.reset(token)

SpanReference dataclass

Identifies a span that evaluation results should be associated with.

Used by sinks to associate evaluation results with the original function execution span.

Source code in pydantic_evals/pydantic_evals/online.py
246
247
248
249
250
251
252
253
254
255
256
@dataclass(kw_only=True)
class SpanReference:
    """Identifies a span that evaluation results should be associated with.

    Used by sinks to associate evaluation results with the original function execution span.
    """

    trace_id: str
    """The trace ID of the span."""
    span_id: str
    """The span ID of the span."""

trace_id instance-attribute

trace_id: str

The trace ID of the span.

span_id instance-attribute

span_id: str

The span ID of the span.

SinkCallback module-attribute

Type alias for bare callables accepted wherever an EvaluationSink is expected.

Auto-wrapped in CallbackSink when passed as a sink parameter.

EvaluationSink

Bases: Protocol

Protocol for evaluation result destinations.

Implementations receive evaluation results and can send them to any backend (Logfire annotations, custom callback, stdout, etc.).

Source code in pydantic_evals/pydantic_evals/online.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
@runtime_checkable
class EvaluationSink(Protocol):
    """Protocol for evaluation result destinations.

    Implementations receive evaluation results and can send them to any backend
    (Logfire annotations, custom callback, stdout, etc.).
    """

    async def submit(
        self,
        *,
        results: Sequence[EvaluationResult],
        failures: Sequence[EvaluatorFailure],
        context: EvaluatorContext,
        span_reference: SpanReference | None,
    ) -> None:
        """Submit evaluation results to the sink.

        Args:
            results: Evaluation results from successful evaluator runs.
            failures: Failures from evaluator runs that raised exceptions.
            context: The full evaluator context for the function call.
            span_reference: Reference to the OTel span for the function call, if available.
        """
        ...

submit async

submit(
    *,
    results: Sequence[EvaluationResult],
    failures: Sequence[EvaluatorFailure],
    context: EvaluatorContext,
    span_reference: SpanReference | None
) -> None

Submit evaluation results to the sink.

Parameters:

Name Type Description Default
results Sequence[EvaluationResult]

Evaluation results from successful evaluator runs.

required
failures Sequence[EvaluatorFailure]

Failures from evaluator runs that raised exceptions.

required
context EvaluatorContext

The full evaluator context for the function call.

required
span_reference SpanReference | None

Reference to the OTel span for the function call, if available.

required
Source code in pydantic_evals/pydantic_evals/online.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
async def submit(
    self,
    *,
    results: Sequence[EvaluationResult],
    failures: Sequence[EvaluatorFailure],
    context: EvaluatorContext,
    span_reference: SpanReference | None,
) -> None:
    """Submit evaluation results to the sink.

    Args:
        results: Evaluation results from successful evaluator runs.
        failures: Failures from evaluator runs that raised exceptions.
        context: The full evaluator context for the function call.
        span_reference: Reference to the OTel span for the function call, if available.
    """
    ...

CallbackSink

An EvaluationSink that delegates to a user-provided callable.

The callback receives the results, failures, and context. The span_reference is not passed to the callback — use a custom EvaluationSink implementation if you need it.

Source code in pydantic_evals/pydantic_evals/online.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
class CallbackSink:
    """An `EvaluationSink` that delegates to a user-provided callable.

    The callback receives the results, failures, and context. The span_reference is not
    passed to the callback — use a custom `EvaluationSink` implementation if you need it.
    """

    def __init__(self, callback: SinkCallback) -> None:
        self.callback = callback

    async def submit(
        self,
        *,
        results: Sequence[EvaluationResult],
        failures: Sequence[EvaluatorFailure],
        context: EvaluatorContext,
        span_reference: SpanReference | None,
    ) -> None:
        _ = span_reference  # Not passed to callback; use a custom EvaluationSink if needed
        result = self.callback(results, failures, context)
        if inspect.isawaitable(result):
            await result

OnlineEvaluator dataclass

Wraps an Evaluator with per-evaluator online configuration.

Different evaluators often need different settings — a cheap heuristic should run on 100% of traffic while an expensive LLM judge might run on only 1%.

Source code in pydantic_evals/pydantic_evals/online.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
@dataclass(kw_only=True)
class OnlineEvaluator:
    """Wraps an `Evaluator` with per-evaluator online configuration.

    Different evaluators often need different settings — a cheap heuristic should
    run on 100% of traffic while an expensive LLM judge might run on only 1%.
    """

    evaluator: Evaluator
    """The evaluator to run."""
    sample_rate: float | Callable[[SamplingContext], float | bool] | None = None
    """Probability of running this evaluator (0.0–1.0), or a callable returning a float or bool.

    When a callable, it receives a [`SamplingContext`][pydantic_evals.online.SamplingContext]
    with the function inputs, config metadata, and evaluator name — but not the output or
    duration (which aren't available yet at sampling time).

    Defaults to `None`, which uses the config's `default_sample_rate` at each call.
    Set explicitly to override.
    """
    max_concurrency: int = 10
    """Maximum number of concurrent evaluations for this evaluator."""

    sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None = None
    """Override sink(s) for this evaluator. If `None`, the config's `default_sink` is used."""

    on_max_concurrency: OnMaxConcurrencyCallback | None = None
    """Called when an evaluation is dropped because `max_concurrency` was reached.

    Receives the `EvaluatorContext` that would have been evaluated. Can be sync or async.
    If `None` (the default), dropped evaluations are silently ignored.
    """
    on_sampling_error: OnSamplingErrorCallback | None = None
    """Called synchronously when a `sample_rate` callable raises an exception.

    Receives the exception and the evaluator. Must be sync (not async), since sampling
    runs before the decorated function. If set, the evaluator is skipped. If `None`,
    uses the config's `on_sampling_error` default. If neither is set, the exception
    propagates to the caller.
    """
    on_error: OnErrorCallback | None = None
    """Called when an exception occurs in a sink or on_max_concurrency callback.

    Receives the exception, evaluator context, evaluator instance, and a location string
    (`'sink'` or `'on_max_concurrency'`). Can be sync or async.
    If `None`, uses the config's `on_error` default. If neither is set, exceptions are
    silently suppressed.
    """

    def __post_init__(self) -> None:
        self.semaphore = threading.Semaphore(self.max_concurrency)

evaluator instance-attribute

evaluator: Evaluator

The evaluator to run.

sample_rate class-attribute instance-attribute

sample_rate: (
    float | Callable[[SamplingContext], float | bool] | None
) = None

Probability of running this evaluator (0.0–1.0), or a callable returning a float or bool.

When a callable, it receives a SamplingContext with the function inputs, config metadata, and evaluator name — but not the output or duration (which aren't available yet at sampling time).

Defaults to None, which uses the config's default_sample_rate at each call. Set explicitly to override.

max_concurrency class-attribute instance-attribute

max_concurrency: int = 10

Maximum number of concurrent evaluations for this evaluator.

sink class-attribute instance-attribute

Override sink(s) for this evaluator. If None, the config's default_sink is used.

on_max_concurrency class-attribute instance-attribute

on_max_concurrency: OnMaxConcurrencyCallback | None = None

Called when an evaluation is dropped because max_concurrency was reached.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async. If None (the default), dropped evaluations are silently ignored.

on_sampling_error class-attribute instance-attribute

on_sampling_error: OnSamplingErrorCallback | None = None

Called synchronously when a sample_rate callable raises an exception.

Receives the exception and the evaluator. Must be sync (not async), since sampling runs before the decorated function. If set, the evaluator is skipped. If None, uses the config's on_sampling_error default. If neither is set, the exception propagates to the caller.

on_error class-attribute instance-attribute

on_error: OnErrorCallback | None = None

Called when an exception occurs in a sink or on_max_concurrency callback.

Receives the exception, evaluator context, evaluator instance, and a location string ('sink' or 'on_max_concurrency'). Can be sync or async. If None, uses the config's on_error default. If neither is set, exceptions are silently suppressed.

EvaluatorContextSource

Bases: Protocol

Protocol for retrieving stored evaluator contexts.

Implementations reconstruct EvaluatorContext objects from stored traces (e.g., Logfire). The batch method allows fetching contexts for multiple spans in a single call.

Source code in pydantic_evals/pydantic_evals/online.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
class EvaluatorContextSource(Protocol):
    """Protocol for retrieving stored evaluator contexts.

    Implementations reconstruct [`EvaluatorContext`][pydantic_evals.evaluators.EvaluatorContext]
    objects from stored traces (e.g., Logfire). The batch method allows fetching contexts
    for multiple spans in a single call.
    """

    async def fetch(self, span: SpanReference) -> EvaluatorContext:
        """Fetch an evaluator context for a single span.

        Args:
            span: Reference to the span to fetch context for.

        Returns:
            The evaluator context for the span.
        """
        return (await self.fetch_many([span]))[0]

    async def fetch_many(self, spans: Sequence[SpanReference]) -> list[EvaluatorContext]:
        """Fetch evaluator contexts for multiple spans in a single batch.

        Args:
            spans: References to the spans to fetch context for.

        Returns:
            Evaluator contexts in the same order as the input spans.
        """
        ...

fetch async

Fetch an evaluator context for a single span.

Parameters:

Name Type Description Default
span SpanReference

Reference to the span to fetch context for.

required

Returns:

Type Description
EvaluatorContext

The evaluator context for the span.

Source code in pydantic_evals/pydantic_evals/online.py
381
382
383
384
385
386
387
388
389
390
async def fetch(self, span: SpanReference) -> EvaluatorContext:
    """Fetch an evaluator context for a single span.

    Args:
        span: Reference to the span to fetch context for.

    Returns:
        The evaluator context for the span.
    """
    return (await self.fetch_many([span]))[0]

fetch_many async

fetch_many(
    spans: Sequence[SpanReference],
) -> list[EvaluatorContext]

Fetch evaluator contexts for multiple spans in a single batch.

Parameters:

Name Type Description Default
spans Sequence[SpanReference]

References to the spans to fetch context for.

required

Returns:

Type Description
list[EvaluatorContext]

Evaluator contexts in the same order as the input spans.

Source code in pydantic_evals/pydantic_evals/online.py
392
393
394
395
396
397
398
399
400
401
async def fetch_many(self, spans: Sequence[SpanReference]) -> list[EvaluatorContext]:
    """Fetch evaluator contexts for multiple spans in a single batch.

    Args:
        spans: References to the spans to fetch context for.

    Returns:
        Evaluator contexts in the same order as the input spans.
    """
    ...

run_evaluators async

run_evaluators(
    evaluators: Sequence[Evaluator],
    context: EvaluatorContext,
) -> tuple[list[EvaluationResult], list[EvaluatorFailure]]

Run evaluators on a context and return results.

Useful for re-running evaluators from stored data.

Parameters:

Name Type Description Default
evaluators Sequence[Evaluator]

The evaluators to run.

required
context EvaluatorContext

The evaluator context to evaluate against.

required

Returns:

Type Description
tuple[list[EvaluationResult], list[EvaluatorFailure]]

A tuple of (results, failures).

Source code in pydantic_evals/pydantic_evals/online.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
async def run_evaluators(
    evaluators: Sequence[Evaluator],
    context: EvaluatorContext,
) -> tuple[list[EvaluationResult], list[EvaluatorFailure]]:
    """Run evaluators on a context and return results.

    Useful for re-running evaluators from stored data.

    Args:
        evaluators: The evaluators to run.
        context: The evaluator context to evaluate against.

    Returns:
        A tuple of (results, failures).
    """
    all_results: list[EvaluationResult] = []
    all_failures: list[EvaluatorFailure] = []

    async with anyio.create_task_group() as tg:
        results_by_index: dict[int, list[EvaluationResult] | EvaluatorFailure] = {}

        async def _run(idx: int, evaluator: Evaluator) -> None:
            results_by_index[idx] = await run_evaluator(evaluator, context)

        for i, evaluator in enumerate(evaluators):
            tg.start_soon(_run, i, evaluator)

    for i in range(len(evaluators)):
        result = results_by_index[i]
        if isinstance(result, EvaluatorFailure):
            all_failures.append(result)
        else:
            all_results.extend(result)

    return all_results, all_failures

OnlineEvalConfig dataclass

Holds cross-evaluator defaults for online evaluation.

Create instances for different evaluation configurations, or use the global DEFAULT_CONFIG via the module-level evaluate() and configure() functions.

Source code in pydantic_evals/pydantic_evals/online.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
@dataclass(kw_only=True)
class OnlineEvalConfig:
    """Holds cross-evaluator defaults for online evaluation.

    Create instances for different evaluation configurations, or use the global
    `DEFAULT_CONFIG` via the module-level `evaluate()` and `configure()` functions.
    """

    default_sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None = None
    """Default sink(s) for evaluators that don't specify their own."""
    default_sample_rate: float | Callable[[SamplingContext], float | bool] = 1.0
    """Default sample rate for evaluators that don't specify their own."""
    sampling_mode: SamplingMode = 'independent'
    """Controls how per-evaluator sample rates interact for a single call.

    - `'independent'` (default): each evaluator decides independently.
    - `'correlated'`: a shared random seed is used so that lower-rate evaluators'
      calls are a subset of higher-rate ones, minimising total overhead.

    See [`SamplingMode`][pydantic_evals.online.SamplingMode] for details.
    """
    enabled: bool = True
    """Whether online evaluation is enabled for this config."""
    metadata: dict[str, Any] | None = None
    """Optional metadata to include in evaluator contexts."""
    on_max_concurrency: OnMaxConcurrencyCallback | None = None
    """Default handler called when an evaluation is dropped because `max_concurrency` was reached.

    Receives the `EvaluatorContext` that would have been evaluated. Can be sync or async.
    If `None` (the default), dropped evaluations are silently ignored.
    Per-evaluator `OnlineEvaluator.on_max_concurrency` overrides this default.
    """
    on_sampling_error: OnSamplingErrorCallback | None = None
    """Default handler called synchronously when a `sample_rate` callable raises.

    Receives the exception and the evaluator. Must be sync (not async).
    If set, the evaluator is skipped. If `None` (the default), the exception
    propagates to the caller.
    Per-evaluator `OnlineEvaluator.on_sampling_error` overrides this default.
    """
    on_error: OnErrorCallback | None = None
    """Default handler called when an exception occurs in a sink or on_max_concurrency callback.

    Receives the exception, evaluator context, evaluator instance, and a location string
    (`'sink'` or `'on_max_concurrency'`). Can be sync or async.
    If `None` (the default), exceptions are silently suppressed.
    Per-evaluator `OnlineEvaluator.on_error` overrides this default.
    """

    def evaluate(
        self,
        *evaluators: Evaluator | OnlineEvaluator,
    ) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
        """Decorator to attach online evaluators to a function.

        Bare `Evaluator` instances are auto-wrapped in `OnlineEvaluator` at decoration time
        (so concurrency semaphores are shared across calls). Their `sample_rate` defaults to
        `None`, which resolves to the config's `default_sample_rate` at each call — so
        changes to the config after decoration take effect.

        Args:
            *evaluators: Evaluators to attach. Can be `Evaluator` or `OnlineEvaluator` instances.

        Returns:
            A decorator that wraps the function with online evaluation.
        """
        online_evals = [e if isinstance(e, OnlineEvaluator) else OnlineEvaluator(evaluator=e) for e in evaluators]

        def decorator(func: Callable[_P, _R]) -> Callable[_P, _R]:
            if inspect.iscoroutinefunction(func):
                # ParamSpec can't distinguish async from sync return types — _wrap_async returns
                # Callable[_P, Awaitable[_R]] but the decorator signature expects Callable[_P, _R]
                return _wrap_async(func, online_evals, self)  # pyright: ignore[reportReturnType]
            else:
                return _wrap_sync(func, online_evals, self)

        return decorator

default_sink class-attribute instance-attribute

default_sink: (
    EvaluationSink
    | Sequence[EvaluationSink | SinkCallback]
    | SinkCallback
    | None
) = None

Default sink(s) for evaluators that don't specify their own.

default_sample_rate class-attribute instance-attribute

default_sample_rate: (
    float | Callable[[SamplingContext], float | bool]
) = 1.0

Default sample rate for evaluators that don't specify their own.

sampling_mode class-attribute instance-attribute

sampling_mode: SamplingMode = 'independent'

Controls how per-evaluator sample rates interact for a single call.

  • 'independent' (default): each evaluator decides independently.
  • 'correlated': a shared random seed is used so that lower-rate evaluators' calls are a subset of higher-rate ones, minimising total overhead.

See SamplingMode for details.

enabled class-attribute instance-attribute

enabled: bool = True

Whether online evaluation is enabled for this config.

metadata class-attribute instance-attribute

metadata: dict[str, Any] | None = None

Optional metadata to include in evaluator contexts.

on_max_concurrency class-attribute instance-attribute

on_max_concurrency: OnMaxConcurrencyCallback | None = None

Default handler called when an evaluation is dropped because max_concurrency was reached.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async. If None (the default), dropped evaluations are silently ignored. Per-evaluator OnlineEvaluator.on_max_concurrency overrides this default.

on_sampling_error class-attribute instance-attribute

on_sampling_error: OnSamplingErrorCallback | None = None

Default handler called synchronously when a sample_rate callable raises.

Receives the exception and the evaluator. Must be sync (not async). If set, the evaluator is skipped. If None (the default), the exception propagates to the caller. Per-evaluator OnlineEvaluator.on_sampling_error overrides this default.

on_error class-attribute instance-attribute

on_error: OnErrorCallback | None = None

Default handler called when an exception occurs in a sink or on_max_concurrency callback.

Receives the exception, evaluator context, evaluator instance, and a location string ('sink' or 'on_max_concurrency'). Can be sync or async. If None (the default), exceptions are silently suppressed. Per-evaluator OnlineEvaluator.on_error overrides this default.

evaluate

evaluate(
    *evaluators: Evaluator | OnlineEvaluator,
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]

Decorator to attach online evaluators to a function.

Bare Evaluator instances are auto-wrapped in OnlineEvaluator at decoration time (so concurrency semaphores are shared across calls). Their sample_rate defaults to None, which resolves to the config's default_sample_rate at each call — so changes to the config after decoration take effect.

Parameters:

Name Type Description Default
*evaluators Evaluator | OnlineEvaluator

Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.

()

Returns:

Type Description
Callable[[Callable[_P, _R]], Callable[_P, _R]]

A decorator that wraps the function with online evaluation.

Source code in pydantic_evals/pydantic_evals/online.py
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
def evaluate(
    self,
    *evaluators: Evaluator | OnlineEvaluator,
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
    """Decorator to attach online evaluators to a function.

    Bare `Evaluator` instances are auto-wrapped in `OnlineEvaluator` at decoration time
    (so concurrency semaphores are shared across calls). Their `sample_rate` defaults to
    `None`, which resolves to the config's `default_sample_rate` at each call — so
    changes to the config after decoration take effect.

    Args:
        *evaluators: Evaluators to attach. Can be `Evaluator` or `OnlineEvaluator` instances.

    Returns:
        A decorator that wraps the function with online evaluation.
    """
    online_evals = [e if isinstance(e, OnlineEvaluator) else OnlineEvaluator(evaluator=e) for e in evaluators]

    def decorator(func: Callable[_P, _R]) -> Callable[_P, _R]:
        if inspect.iscoroutinefunction(func):
            # ParamSpec can't distinguish async from sync return types — _wrap_async returns
            # Callable[_P, Awaitable[_R]] but the decorator signature expects Callable[_P, _R]
            return _wrap_async(func, online_evals, self)  # pyright: ignore[reportReturnType]
        else:
            return _wrap_sync(func, online_evals, self)

    return decorator

DEFAULT_CONFIG module-attribute

DEFAULT_CONFIG = OnlineEvalConfig()

The global default OnlineEvalConfig instance.

Module-level functions like evaluate() and configure() delegate to this instance.

evaluate

evaluate(
    *evaluators: Evaluator | OnlineEvaluator,
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]

Decorator to attach online evaluators to a function using the global default config.

Equivalent to DEFAULT_CONFIG.evaluate(...).

Parameters:

Name Type Description Default
*evaluators Evaluator | OnlineEvaluator

Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.

()

Returns:

Type Description
Callable[[Callable[_P, _R]], Callable[_P, _R]]

A decorator that wraps the function with online evaluation.

Example:

from dataclasses import dataclass

from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate


@dataclass
class IsNonEmpty(Evaluator):
    def evaluate(self, ctx: EvaluatorContext) -> bool:
        return bool(ctx.output)


@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
    return x

Source code in pydantic_evals/pydantic_evals/online.py
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
def evaluate(*evaluators: Evaluator | OnlineEvaluator) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
    """Decorator to attach online evaluators to a function using the global default config.

    Equivalent to `DEFAULT_CONFIG.evaluate(...)`.

    Args:
        *evaluators: Evaluators to attach. Can be `Evaluator` or `OnlineEvaluator` instances.

    Returns:
        A decorator that wraps the function with online evaluation.

    Example:
    ```python
    from dataclasses import dataclass

    from pydantic_evals.evaluators import Evaluator, EvaluatorContext
    from pydantic_evals.online import evaluate


    @dataclass
    class IsNonEmpty(Evaluator):
        def evaluate(self, ctx: EvaluatorContext) -> bool:
            return bool(ctx.output)


    @evaluate(IsNonEmpty())
    async def my_function(x: int) -> int:
        return x
    ```
    """
    return DEFAULT_CONFIG.evaluate(*evaluators)

configure

configure(
    *,
    default_sink: (
        EvaluationSink
        | Sequence[EvaluationSink | SinkCallback]
        | SinkCallback
        | None
        | Unset
    ) = UNSET,
    default_sample_rate: (
        float
        | Callable[[SamplingContext], float | bool]
        | Unset
    ) = UNSET,
    sampling_mode: SamplingMode | Unset = UNSET,
    enabled: bool | Unset = UNSET,
    metadata: dict[str, Any] | None | Unset = UNSET,
    on_max_concurrency: (
        OnMaxConcurrencyCallback | None | Unset
    ) = UNSET,
    on_sampling_error: (
        OnSamplingErrorCallback | None | Unset
    ) = UNSET,
    on_error: OnErrorCallback | None | Unset = UNSET
) -> None

Configure the global default OnlineEvalConfig.

Only provided values are updated; unset arguments are ignored. Pass None explicitly to clear default_sink, metadata, on_max_concurrency, on_sampling_error, or on_error.

Parameters:

Name Type Description Default
default_sink EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset

Default sink(s) for evaluators. Pass None to clear.

UNSET
default_sample_rate float | Callable[[SamplingContext], float | bool] | Unset

Default sample rate for evaluators.

UNSET
sampling_mode SamplingMode | Unset

Sampling mode ('independent' or 'correlated').

UNSET
enabled bool | Unset

Whether online evaluation is enabled.

UNSET
metadata dict[str, Any] | None | Unset

Metadata to include in evaluator contexts. Pass None to clear.

UNSET
on_max_concurrency OnMaxConcurrencyCallback | None | Unset

Default handler for dropped evaluations. Pass None to clear.

UNSET
on_sampling_error OnSamplingErrorCallback | None | Unset

Default handler for sample_rate exceptions. Pass None to clear.

UNSET
on_error OnErrorCallback | None | Unset

Default handler for pipeline exceptions. Pass None to clear.

UNSET
Source code in pydantic_evals/pydantic_evals/online.py
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
def configure(
    *,
    default_sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset = UNSET,
    default_sample_rate: float | Callable[[SamplingContext], float | bool] | Unset = UNSET,
    sampling_mode: SamplingMode | Unset = UNSET,
    enabled: bool | Unset = UNSET,
    metadata: dict[str, Any] | None | Unset = UNSET,
    on_max_concurrency: OnMaxConcurrencyCallback | None | Unset = UNSET,
    on_sampling_error: OnSamplingErrorCallback | None | Unset = UNSET,
    on_error: OnErrorCallback | None | Unset = UNSET,
) -> None:
    """Configure the global default `OnlineEvalConfig`.

    Only provided values are updated; unset arguments are ignored.
    Pass `None` explicitly to clear `default_sink`, `metadata`, `on_max_concurrency`,
    `on_sampling_error`, or `on_error`.

    Args:
        default_sink: Default sink(s) for evaluators. Pass `None` to clear.
        default_sample_rate: Default sample rate for evaluators.
        sampling_mode: Sampling mode (`'independent'` or `'correlated'`).
        enabled: Whether online evaluation is enabled.
        metadata: Metadata to include in evaluator contexts. Pass `None` to clear.
        on_max_concurrency: Default handler for dropped evaluations. Pass `None` to clear.
        on_sampling_error: Default handler for sample_rate exceptions. Pass `None` to clear.
        on_error: Default handler for pipeline exceptions. Pass `None` to clear.
    """
    if not isinstance(default_sink, Unset):
        DEFAULT_CONFIG.default_sink = default_sink
    if not isinstance(default_sample_rate, Unset):
        DEFAULT_CONFIG.default_sample_rate = default_sample_rate
    if not isinstance(sampling_mode, Unset):
        DEFAULT_CONFIG.sampling_mode = sampling_mode
    if not isinstance(enabled, Unset):
        DEFAULT_CONFIG.enabled = enabled
    if not isinstance(metadata, Unset):
        DEFAULT_CONFIG.metadata = metadata
    if not isinstance(on_max_concurrency, Unset):
        DEFAULT_CONFIG.on_max_concurrency = on_max_concurrency
    if not isinstance(on_sampling_error, Unset):
        DEFAULT_CONFIG.on_sampling_error = on_sampling_error
    if not isinstance(on_error, Unset):
        DEFAULT_CONFIG.on_error = on_error

wait_for_evaluations async

wait_for_evaluations(*, timeout: float = 30.0) -> None

Wait for all pending background evaluation tasks and threads to complete.

This is useful in tests to deterministically wait for background evaluators to finish instead of relying on timing-based sleeps.

For async decorated functions, evaluators run as tasks on the caller's event loop and are awaited directly. For sync decorated functions, evaluators run in background threads which are joined with the given timeout.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for each background thread. Defaults to 30.

30.0
Source code in pydantic_evals/pydantic_evals/online.py
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
async def wait_for_evaluations(*, timeout: float = 30.0) -> None:
    """Wait for all pending background evaluation tasks and threads to complete.

    This is useful in tests to deterministically wait for background evaluators
    to finish instead of relying on timing-based sleeps.

    For async decorated functions, evaluators run as tasks on the caller's event loop
    and are awaited directly. For sync decorated functions, evaluators run in background
    threads which are joined with the given timeout.

    Args:
        timeout: Maximum seconds to wait for each background thread. Defaults to 30.
    """
    with _background_lock:
        tasks_snapshot = list(_background_tasks)
        events_snapshot = list(_background_events)
        threads_snapshot = list(_background_threads)

    # Await async tasks (from async decorated functions on asyncio)
    for task in tasks_snapshot:
        try:
            await task
        except BaseException:  # pragma: no cover
            pass  # Exceptions are handled inside _dispatch_single_evaluator

    # Await trio events (from async decorated functions on trio)
    for event in events_snapshot:
        await event.wait()  # pragma: no cover

    # Join background threads (from sync decorated functions) without blocking the event loop
    if threads_snapshot:

        def _join_threads() -> None:
            for thread in threads_snapshot:
                thread.join(timeout=timeout)
                if thread.is_alive():  # pragma: no cover
                    warnings.warn(f'Background evaluation thread did not complete within {timeout:.1f}s timeout')

        await run_sync(_join_threads)