Skip to content

fasta2a

FastA2A

Bases: Starlette

The main class for the FastA2A library.

Source code in fasta2a/fasta2a/applications.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class FastA2A(Starlette):
    """The main class for the FastA2A library."""

    def __init__(
        self,
        *,
        storage: Storage,
        broker: Broker,
        # Agent card
        name: str | None = None,
        url: str = 'http://localhost:8000',
        version: str = '1.0.0',
        description: str | None = None,
        provider: Provider | None = None,
        skills: list[Skill] | None = None,
        # Starlette
        debug: bool = False,
        routes: Sequence[Route] | None = None,
        middleware: Sequence[Middleware] | None = None,
        exception_handlers: dict[Any, ExceptionHandler] | None = None,
        lifespan: Lifespan[FastA2A] | None = None,
    ):
        if lifespan is None:
            lifespan = _default_lifespan

        super().__init__(
            debug=debug,
            routes=routes,
            middleware=middleware,
            exception_handlers=exception_handlers,
            lifespan=lifespan,
        )

        self.name = name or 'Agent'
        self.url = url
        self.version = version
        self.description = description
        self.provider = provider
        self.skills = skills or []
        # NOTE: For now, I don't think there's any reason to support any other input/output modes.
        self.default_input_modes = ['application/json']
        self.default_output_modes = ['application/json']

        self.task_manager = TaskManager(broker=broker, storage=storage)

        # Setup
        self._agent_card_json_schema: bytes | None = None
        self.router.add_route('/.well-known/agent.json', self._agent_card_endpoint, methods=['HEAD', 'GET', 'OPTIONS'])
        self.router.add_route('/', self._agent_run_endpoint, methods=['POST'])

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope['type'] == 'http' and not self.task_manager.is_running:
            raise RuntimeError('TaskManager was not properly initialized.')
        await super().__call__(scope, receive, send)

    async def _agent_card_endpoint(self, request: Request) -> Response:
        if self._agent_card_json_schema is None:
            agent_card = AgentCard(
                name=self.name,
                url=self.url,
                version=self.version,
                skills=self.skills,
                default_input_modes=self.default_input_modes,
                default_output_modes=self.default_output_modes,
            )
            if self.description is not None:
                agent_card['description'] = self.description
            if self.provider is not None:
                agent_card['provider'] = self.provider
            self._agent_card_json_schema = agent_card_ta.dump_json(agent_card)
        return Response(content=self._agent_card_json_schema, media_type='application/json')

    async def _agent_run_endpoint(self, request: Request) -> Response:
        """This is the main endpoint for the A2A server.

        Although the specification allows freedom of choice and implementation, I'm pretty sure about some decisions.

        1. The server will always either send a "submitted" or a "failed" on `tasks/send`.
            Never a "completed" on the first message.
        2. There are three possible ends for the task:
            2.1. The task was "completed" successfully.
            2.2. The task was "canceled".
            2.3. The task "failed".
        3. The server will send a "working" on the first chunk on `tasks/pushNotification/get`.
        """
        data = await request.body()
        a2a_request = a2a_request_ta.validate_json(data)

        if a2a_request['method'] == 'tasks/send':
            jsonrpc_response = await self.task_manager.send_task(a2a_request)
        elif a2a_request['method'] == 'tasks/get':
            jsonrpc_response = await self.task_manager.get_task(a2a_request)
        elif a2a_request['method'] == 'tasks/cancel':
            jsonrpc_response = await self.task_manager.cancel_task(a2a_request)
        else:
            raise NotImplementedError(f'Method {a2a_request["method"]} not implemented.')
        return Response(
            content=a2a_response_ta.dump_json(jsonrpc_response, by_alias=True), media_type='application/json'
        )

Broker dataclass

Bases: ABC

The broker class is in charge of scheduling the tasks.

The HTTP server uses the broker to schedule tasks.

The simple implementation is the InMemoryBroker, which is the broker that runs the tasks in the same process as the HTTP server. That said, this class can be extended to support remote workers.

Source code in fasta2a/fasta2a/broker.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass
class Broker(ABC):
    """The broker class is in charge of scheduling the tasks.

    The HTTP server uses the broker to schedule tasks.

    The simple implementation is the `InMemoryBroker`, which is the broker that
    runs the tasks in the same process as the HTTP server. That said, this class can be
    extended to support remote workers.
    """

    @abstractmethod
    async def run_task(self, params: TaskSendParams) -> None:
        """Send a task to be executed by the worker."""
        raise NotImplementedError('send_run_task is not implemented yet.')

    @abstractmethod
    async def cancel_task(self, params: TaskIdParams) -> None:
        """Cancel a task."""
        raise NotImplementedError('send_cancel_task is not implemented yet.')

    @abstractmethod
    async def __aenter__(self) -> Self: ...

    @abstractmethod
    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): ...

    @abstractmethod
    def receive_task_operations(self) -> AsyncIterator[TaskOperation]:
        """Receive task operations from the broker.

        On a multi-worker setup, the broker will need to round-robin the task operations
        between the workers.
        """

run_task abstractmethod async

run_task(params: TaskSendParams) -> None

Send a task to be executed by the worker.

Source code in fasta2a/fasta2a/broker.py
30
31
32
33
@abstractmethod
async def run_task(self, params: TaskSendParams) -> None:
    """Send a task to be executed by the worker."""
    raise NotImplementedError('send_run_task is not implemented yet.')

cancel_task abstractmethod async

cancel_task(params: TaskIdParams) -> None

Cancel a task.

Source code in fasta2a/fasta2a/broker.py
35
36
37
38
@abstractmethod
async def cancel_task(self, params: TaskIdParams) -> None:
    """Cancel a task."""
    raise NotImplementedError('send_cancel_task is not implemented yet.')

receive_task_operations abstractmethod

receive_task_operations() -> AsyncIterator[TaskOperation]

Receive task operations from the broker.

On a multi-worker setup, the broker will need to round-robin the task operations between the workers.

Source code in fasta2a/fasta2a/broker.py
46
47
48
49
50
51
52
@abstractmethod
def receive_task_operations(self) -> AsyncIterator[TaskOperation]:
    """Receive task operations from the broker.

    On a multi-worker setup, the broker will need to round-robin the task operations
    between the workers.
    """

Skill

Bases: TypedDict

Skills are a unit of capability that an agent can perform.

Source code in fasta2a/fasta2a/schema.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@pydantic.with_config(config={'alias_generator': to_camel})
class Skill(TypedDict):
    """Skills are a unit of capability that an agent can perform."""

    id: str
    """A unique identifier for the skill."""

    name: str
    """Human readable name of the skill."""

    description: str
    """A human-readable description of the skill.

    It will be used by the client or a human as a hint to understand the skill.
    """

    tags: list[str]
    """Set of tag-words describing classes of capabilities for this specific skill.

    Examples: "cooking", "customer support", "billing".
    """

    examples: NotRequired[list[str]]
    """The set of example scenarios that the skill can perform.

    Will be used by the client as a hint to understand how the skill can be used. (e.g. "I need a recipe for bread")
    """

    input_modes: list[str]
    """Supported mime types for input data."""

    output_modes: list[str]
    """Supported mime types for output data."""

id instance-attribute

id: str

A unique identifier for the skill.

name instance-attribute

name: str

Human readable name of the skill.

description instance-attribute

description: str

A human-readable description of the skill.

It will be used by the client or a human as a hint to understand the skill.

tags instance-attribute

tags: list[str]

Set of tag-words describing classes of capabilities for this specific skill.

Examples: "cooking", "customer support", "billing".

examples instance-attribute

examples: NotRequired[list[str]]

The set of example scenarios that the skill can perform.

Will be used by the client as a hint to understand how the skill can be used. (e.g. "I need a recipe for bread")

input_modes instance-attribute

input_modes: list[str]

Supported mime types for input data.

output_modes instance-attribute

output_modes: list[str]

Supported mime types for output data.

Storage

Bases: ABC

A storage to retrieve and save tasks.

The storage is used to update the status of a task and to save the result of a task.

Source code in fasta2a/fasta2a/storage.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Storage(ABC):
    """A storage to retrieve and save tasks.

    The storage is used to update the status of a task and to save the result of a task.
    """

    @abstractmethod
    async def load_task(self, task_id: str, history_length: int | None = None) -> Task | None:
        """Load a task from storage.

        If the task is not found, return None.
        """

    @abstractmethod
    async def submit_task(self, task_id: str, session_id: str, message: Message) -> Task:
        """Submit a task to storage."""

    @abstractmethod
    async def update_task(
        self,
        task_id: str,
        state: TaskState,
        message: Message | None = None,
        artifacts: list[Artifact] | None = None,
    ) -> Task:
        """Update the state of a task."""

load_task abstractmethod async

load_task(
    task_id: str, history_length: int | None = None
) -> Task | None

Load a task from storage.

If the task is not found, return None.

Source code in fasta2a/fasta2a/storage.py
17
18
19
20
21
22
@abstractmethod
async def load_task(self, task_id: str, history_length: int | None = None) -> Task | None:
    """Load a task from storage.

    If the task is not found, return None.
    """

submit_task abstractmethod async

submit_task(
    task_id: str, session_id: str, message: Message
) -> Task

Submit a task to storage.

Source code in fasta2a/fasta2a/storage.py
24
25
26
@abstractmethod
async def submit_task(self, task_id: str, session_id: str, message: Message) -> Task:
    """Submit a task to storage."""

update_task abstractmethod async

update_task(
    task_id: str,
    state: TaskState,
    message: Message | None = None,
    artifacts: list[Artifact] | None = None,
) -> Task

Update the state of a task.

Source code in fasta2a/fasta2a/storage.py
28
29
30
31
32
33
34
35
36
@abstractmethod
async def update_task(
    self,
    task_id: str,
    state: TaskState,
    message: Message | None = None,
    artifacts: list[Artifact] | None = None,
) -> Task:
    """Update the state of a task."""

Worker dataclass

Bases: ABC

A worker is responsible for executing tasks.

Source code in fasta2a/fasta2a/worker.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@dataclass
class Worker(ABC):
    """A worker is responsible for executing tasks."""

    broker: Broker
    storage: Storage

    @asynccontextmanager
    async def run(self) -> AsyncIterator[None]:
        """Run the worker.

        It connects to the broker, and it makes itself available to receive commands.
        """
        async with anyio.create_task_group() as tg:
            tg.start_soon(self._loop)
            yield
            tg.cancel_scope.cancel()

    async def _loop(self) -> None:
        async for task_operation in self.broker.receive_task_operations():
            await self._handle_task_operation(task_operation)

    async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
        try:
            with use_span(task_operation['_current_span']):
                with tracer.start_as_current_span(
                    f'{task_operation["operation"]} task', attributes={'logfire.tags': ['fasta2a']}
                ):
                    if task_operation['operation'] == 'run':
                        await self.run_task(task_operation['params'])
                    elif task_operation['operation'] == 'cancel':
                        await self.cancel_task(task_operation['params'])
                    else:
                        assert_never(task_operation)
        except Exception:
            await self.storage.update_task(task_operation['params']['id'], state='failed')

    @abstractmethod
    async def run_task(self, params: TaskSendParams) -> None: ...

    @abstractmethod
    async def cancel_task(self, params: TaskIdParams) -> None: ...

    @abstractmethod
    def build_message_history(self, task_history: list[Message]) -> list[Any]: ...

    @abstractmethod
    def build_artifacts(self, result: Any) -> list[Artifact]: ...

run async

run() -> AsyncIterator[None]

Run the worker.

It connects to the broker, and it makes itself available to receive commands.

Source code in fasta2a/fasta2a/worker.py
28
29
30
31
32
33
34
35
36
37
@asynccontextmanager
async def run(self) -> AsyncIterator[None]:
    """Run the worker.

    It connects to the broker, and it makes itself available to receive commands.
    """
    async with anyio.create_task_group() as tg:
        tg.start_soon(self._loop)
        yield
        tg.cancel_scope.cancel()

This module contains the schema for the agent card.

AgentCard

Bases: TypedDict

The card that describes an agent.

Source code in fasta2a/fasta2a/schema.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@pydantic.with_config(config={'alias_generator': to_camel})
class AgentCard(TypedDict):
    """The card that describes an agent."""

    name: str
    """Human readable name of the agent e.g. "Recipe Agent"."""

    description: NotRequired[str]
    """A human-readable description of the agent.

    Used to assist users and other agents in understanding what the agent can do.
    (e.g. "Agent that helps users with recipes and cooking.")
    """

    # TODO(Marcelo): The spec makes url required.
    url: NotRequired[str]
    """A URL to the address the agent is hosted at."""

    provider: NotRequired[Provider]
    """The service provider of the agent."""

    # TODO(Marcelo): The spec makes version required.
    version: NotRequired[str]
    """The version of the agent - format is up to the provider. (e.g. "1.0.0")"""

    documentation_url: NotRequired[str]
    """A URL to documentation for the agent."""

    capabilities: NotRequired[Capabilities]
    """The capabilities of the agent."""

    # TODO(Marcelo): The spec makes authentication required.
    authentication: NotRequired[Authentication]
    """The authentication schemes supported by the agent.

    Intended to match OpenAPI authentication structure.
    """

    default_input_modes: list[str]
    """Supported mime types for input data."""

    default_output_modes: list[str]
    """Supported mime types for output data."""

    skills: list[Skill]

name instance-attribute

name: str

Human readable name of the agent e.g. "Recipe Agent".

description instance-attribute

description: NotRequired[str]

A human-readable description of the agent.

Used to assist users and other agents in understanding what the agent can do. (e.g. "Agent that helps users with recipes and cooking.")

url instance-attribute

A URL to the address the agent is hosted at.

provider instance-attribute

provider: NotRequired[Provider]

The service provider of the agent.

version instance-attribute

version: NotRequired[str]

The version of the agent - format is up to the provider. (e.g. "1.0.0")

documentation_url instance-attribute

documentation_url: NotRequired[str]

A URL to documentation for the agent.

capabilities instance-attribute

capabilities: NotRequired[Capabilities]

The capabilities of the agent.

authentication instance-attribute

authentication: NotRequired[Authentication]

The authentication schemes supported by the agent.

Intended to match OpenAPI authentication structure.

default_input_modes instance-attribute

default_input_modes: list[str]

Supported mime types for input data.

default_output_modes instance-attribute

default_output_modes: list[str]

Supported mime types for output data.

Provider

Bases: TypedDict

The service provider of the agent.

Source code in fasta2a/fasta2a/schema.py
63
64
65
66
67
class Provider(TypedDict):
    """The service provider of the agent."""

    organization: str
    url: str

Capabilities

Bases: TypedDict

The capabilities of the agent.

Source code in fasta2a/fasta2a/schema.py
70
71
72
73
74
75
76
77
78
79
80
81
@pydantic.with_config(config={'alias_generator': to_camel})
class Capabilities(TypedDict):
    """The capabilities of the agent."""

    streaming: NotRequired[bool]
    """Whether the agent supports streaming."""

    push_notifications: NotRequired[bool]
    """Whether the agent can notify updates to client."""

    state_transition_history: NotRequired[bool]
    """Whether the agent exposes status change history for tasks."""

streaming instance-attribute

streaming: NotRequired[bool]

Whether the agent supports streaming.

push_notifications instance-attribute

push_notifications: NotRequired[bool]

Whether the agent can notify updates to client.

state_transition_history instance-attribute

state_transition_history: NotRequired[bool]

Whether the agent exposes status change history for tasks.

Authentication

Bases: TypedDict

The authentication schemes supported by the agent.

Source code in fasta2a/fasta2a/schema.py
84
85
86
87
88
89
90
91
92
@pydantic.with_config(config={'alias_generator': to_camel})
class Authentication(TypedDict):
    """The authentication schemes supported by the agent."""

    schemes: list[str]
    """The authentication schemes supported by the agent. (e.g. "Basic", "Bearer")"""

    credentials: NotRequired[str]
    """The credentials a client should use for private cards."""

schemes instance-attribute

schemes: list[str]

The authentication schemes supported by the agent. (e.g. "Basic", "Bearer")

credentials instance-attribute

credentials: NotRequired[str]

The credentials a client should use for private cards.

Skill

Bases: TypedDict

Skills are a unit of capability that an agent can perform.

Source code in fasta2a/fasta2a/schema.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@pydantic.with_config(config={'alias_generator': to_camel})
class Skill(TypedDict):
    """Skills are a unit of capability that an agent can perform."""

    id: str
    """A unique identifier for the skill."""

    name: str
    """Human readable name of the skill."""

    description: str
    """A human-readable description of the skill.

    It will be used by the client or a human as a hint to understand the skill.
    """

    tags: list[str]
    """Set of tag-words describing classes of capabilities for this specific skill.

    Examples: "cooking", "customer support", "billing".
    """

    examples: NotRequired[list[str]]
    """The set of example scenarios that the skill can perform.

    Will be used by the client as a hint to understand how the skill can be used. (e.g. "I need a recipe for bread")
    """

    input_modes: list[str]
    """Supported mime types for input data."""

    output_modes: list[str]
    """Supported mime types for output data."""

id instance-attribute

id: str

A unique identifier for the skill.

name instance-attribute

name: str

Human readable name of the skill.

description instance-attribute

description: str

A human-readable description of the skill.

It will be used by the client or a human as a hint to understand the skill.

tags instance-attribute

tags: list[str]

Set of tag-words describing classes of capabilities for this specific skill.

Examples: "cooking", "customer support", "billing".

examples instance-attribute

examples: NotRequired[list[str]]

The set of example scenarios that the skill can perform.

Will be used by the client as a hint to understand how the skill can be used. (e.g. "I need a recipe for bread")

input_modes instance-attribute

input_modes: list[str]

Supported mime types for input data.

output_modes instance-attribute

output_modes: list[str]

Supported mime types for output data.

Artifact

Bases: TypedDict

Agents generate Artifacts as an end result of a Task.

Artifacts are immutable, can be named, and can have multiple parts. A streaming response can append parts to existing Artifacts.

A single Task can generate many Artifacts. For example, "create a webpage" could create separate HTML and image Artifacts.

Source code in fasta2a/fasta2a/schema.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@pydantic.with_config(config={'alias_generator': to_camel})
class Artifact(TypedDict):
    """Agents generate Artifacts as an end result of a Task.

    Artifacts are immutable, can be named, and can have multiple parts. A streaming response can append parts to
    existing Artifacts.

    A single Task can generate many Artifacts. For example, "create a webpage" could create separate HTML and image
    Artifacts.
    """

    name: NotRequired[str]
    """The name of the artifact."""

    description: NotRequired[str]
    """A description of the artifact."""

    parts: list[Part]
    """The parts that make up the artifact."""

    metadata: NotRequired[dict[str, Any]]
    """Metadata about the artifact."""

    index: int
    """The index of the artifact."""

    append: NotRequired[bool]
    """Whether to append this artifact to an existing one."""

    last_chunk: NotRequired[bool]
    """Whether this is the last chunk of the artifact."""

name instance-attribute

The name of the artifact.

description instance-attribute

description: NotRequired[str]

A description of the artifact.

parts instance-attribute

parts: list[Part]

The parts that make up the artifact.

metadata instance-attribute

metadata: NotRequired[dict[str, Any]]

Metadata about the artifact.

index instance-attribute

index: int

The index of the artifact.

append instance-attribute

append: NotRequired[bool]

Whether to append this artifact to an existing one.

last_chunk instance-attribute

last_chunk: NotRequired[bool]

Whether this is the last chunk of the artifact.

PushNotificationConfig

Bases: TypedDict

Configuration for push notifications.

A2A supports a secure notification mechanism whereby an agent can notify a client of an update outside of a connected session via a PushNotificationService. Within and across enterprises, it is critical that the agent verifies the identity of the notification service, authenticates itself with the service, and presents an identifier that ties the notification to the executing Task.

The target server of the PushNotificationService should be considered a separate service, and is not guaranteed (or even expected) to be the client directly. This PushNotificationService is responsible for authenticating and authorizing the agent and for proxying the verified notification to the appropriate endpoint (which could be anything from a pub/sub queue, to an email inbox or other service, etc).

For contrived scenarios with isolated client-agent pairs (e.g. local service mesh in a contained VPC, etc.) or isolated environments without enterprise security concerns, the client may choose to simply open a port and act as its own PushNotificationService. Any enterprise implementation will likely have a centralized service that authenticates the remote agents with trusted notification credentials and can handle online/offline scenarios. (This should be thought of similarly to a mobile Push Notification Service).

Source code in fasta2a/fasta2a/schema.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@pydantic.with_config(config={'alias_generator': to_camel})
class PushNotificationConfig(TypedDict):
    """Configuration for push notifications.

    A2A supports a secure notification mechanism whereby an agent can notify a client of an update
    outside of a connected session via a PushNotificationService. Within and across enterprises,
    it is critical that the agent verifies the identity of the notification service, authenticates
    itself with the service, and presents an identifier that ties the notification to the executing
    Task.

    The target server of the PushNotificationService should be considered a separate service, and
    is not guaranteed (or even expected) to be the client directly. This PushNotificationService is
    responsible for authenticating and authorizing the agent and for proxying the verified notification
    to the appropriate endpoint (which could be anything from a pub/sub queue, to an email inbox or
    other service, etc).

    For contrived scenarios with isolated client-agent pairs (e.g. local service mesh in a contained
    VPC, etc.) or isolated environments without enterprise security concerns, the client may choose to
    simply open a port and act as its own PushNotificationService. Any enterprise implementation will
    likely have a centralized service that authenticates the remote agents with trusted notification
    credentials and can handle online/offline scenarios. (This should be thought of similarly to a
    mobile Push Notification Service).
    """

    url: str
    """The URL to send push notifications to."""

    token: NotRequired[str]
    """Token unique to this task/session."""

    authentication: NotRequired[Authentication]
    """Authentication details for push notifications."""

url instance-attribute

url: str

The URL to send push notifications to.

token instance-attribute

token: NotRequired[str]

Token unique to this task/session.

authentication instance-attribute

authentication: NotRequired[Authentication]

Authentication details for push notifications.

TaskPushNotificationConfig

Bases: TypedDict

Configuration for task push notifications.

Source code in fasta2a/fasta2a/schema.py
197
198
199
200
201
202
203
204
205
@pydantic.with_config(config={'alias_generator': to_camel})
class TaskPushNotificationConfig(TypedDict):
    """Configuration for task push notifications."""

    id: str
    """The task id."""

    push_notification_config: PushNotificationConfig
    """The push notification configuration."""

id instance-attribute

id: str

The task id.

push_notification_config instance-attribute

push_notification_config: PushNotificationConfig

The push notification configuration.

Message

Bases: TypedDict

A Message contains any content that is not an Artifact.

This can include things like agent thoughts, user context, instructions, errors, status, or metadata.

All content from a client comes in the form of a Message. Agents send Messages to communicate status or to provide instructions (whereas generated results are sent as Artifacts).

A Message can have multiple parts to denote different pieces of content. For example, a user request could include a textual description from a user and then multiple files used as context from the client.

Source code in fasta2a/fasta2a/schema.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class Message(TypedDict):
    """A Message contains any content that is not an Artifact.

    This can include things like agent thoughts, user context, instructions, errors, status, or metadata.

    All content from a client comes in the form of a Message. Agents send Messages to communicate status or to provide
    instructions (whereas generated results are sent as Artifacts).

    A Message can have multiple parts to denote different pieces of content. For example, a user request could include
    a textual description from a user and then multiple files used as context from the client.
    """

    role: Literal['user', 'agent']
    """The role of the message."""

    parts: list[Part]
    """The parts of the message."""

    metadata: NotRequired[dict[str, Any]]
    """Metadata about the message."""

role instance-attribute

role: Literal['user', 'agent']

The role of the message.

parts instance-attribute

parts: list[Part]

The parts of the message.

metadata instance-attribute

metadata: NotRequired[dict[str, Any]]

Metadata about the message.

TextPart

Bases: _BasePart

A part that contains text.

Source code in fasta2a/fasta2a/schema.py
236
237
238
239
240
241
242
243
class TextPart(_BasePart):
    """A part that contains text."""

    type: Literal['text']
    """The type of the part."""

    text: str
    """The text of the part."""

type instance-attribute

type: Literal['text']

The type of the part.

text instance-attribute

text: str

The text of the part.

FilePart

Bases: _BasePart

A part that contains a file.

Source code in fasta2a/fasta2a/schema.py
246
247
248
249
250
251
252
253
254
@pydantic.with_config(config={'alias_generator': to_camel})
class FilePart(_BasePart):
    """A part that contains a file."""

    type: Literal['file']
    """The type of the part."""

    file: File
    """The file of the part."""

type instance-attribute

type: Literal['file']

The type of the part.

file instance-attribute

file: File

The file of the part.

File module-attribute

File: TypeAlias = Union[_BinaryFile, _URLFile]

A file is a binary file or a URL file.

DataPart

Bases: _BasePart

A part that contains data.

Source code in fasta2a/fasta2a/schema.py
288
289
290
291
292
293
294
295
296
@pydantic.with_config(config={'alias_generator': to_camel})
class DataPart(_BasePart):
    """A part that contains data."""

    type: Literal['data']
    """The type of the part."""

    data: dict[str, Any]
    """The data of the part."""

type instance-attribute

type: Literal['data']

The type of the part.

data instance-attribute

data: dict[str, Any]

The data of the part.

Part module-attribute

Part = Annotated[
    Union[TextPart, FilePart, DataPart],
    Field(discriminator="type"),
]

A fully formed piece of content exchanged between a client and a remote agent as part of a Message or an Artifact.

Each Part has its own content type and metadata.

TaskState module-attribute

TaskState: TypeAlias = Literal[
    "submitted",
    "working",
    "input-required",
    "completed",
    "canceled",
    "failed",
    "unknown",
]

The possible states of a task.

TaskStatus

Bases: TypedDict

Status and accompanying message for a task.

Source code in fasta2a/fasta2a/schema.py
309
310
311
312
313
314
315
316
317
318
319
320
@pydantic.with_config(config={'alias_generator': to_camel})
class TaskStatus(TypedDict):
    """Status and accompanying message for a task."""

    state: TaskState
    """The current state of the task."""

    message: NotRequired[Message]
    """Additional status updates for client."""

    timestamp: NotRequired[str]
    """ISO datetime value of when the status was updated."""

state instance-attribute

state: TaskState

The current state of the task.

message instance-attribute

message: NotRequired[Message]

Additional status updates for client.

timestamp instance-attribute

timestamp: NotRequired[str]

ISO datetime value of when the status was updated.

Task

Bases: TypedDict

A Task is a stateful entity that allows Clients and Remote Agents to achieve a specific outcome.

Clients and Remote Agents exchange Messages within a Task. Remote Agents generate results as Artifacts. A Task is always created by a Client and the status is always determined by the Remote Agent.

Source code in fasta2a/fasta2a/schema.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
@pydantic.with_config(config={'alias_generator': to_camel})
class Task(TypedDict):
    """A Task is a stateful entity that allows Clients and Remote Agents to achieve a specific outcome.

    Clients and Remote Agents exchange Messages within a Task. Remote Agents generate results as Artifacts.
    A Task is always created by a Client and the status is always determined by the Remote Agent.
    """

    id: str
    """Unique identifier for the task."""

    session_id: NotRequired[str]
    """Client-generated id for the session holding the task."""

    status: TaskStatus
    """Current status of the task."""

    history: NotRequired[list[Message]]
    """Optional history of messages."""

    artifacts: NotRequired[list[Artifact]]
    """Collection of artifacts created by the agent."""

    metadata: NotRequired[dict[str, Any]]
    """Extension metadata."""

id instance-attribute

id: str

Unique identifier for the task.

session_id instance-attribute

session_id: NotRequired[str]

Client-generated id for the session holding the task.

status instance-attribute

status: TaskStatus

Current status of the task.

history instance-attribute

Optional history of messages.

artifacts instance-attribute

artifacts: NotRequired[list[Artifact]]

Collection of artifacts created by the agent.

metadata instance-attribute

metadata: NotRequired[dict[str, Any]]

Extension metadata.

TaskStatusUpdateEvent

Bases: TypedDict

Sent by server during sendSubscribe or subscribe requests.

Source code in fasta2a/fasta2a/schema.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
@pydantic.with_config(config={'alias_generator': to_camel})
class TaskStatusUpdateEvent(TypedDict):
    """Sent by server during sendSubscribe or subscribe requests."""

    id: str
    """The id of the task."""

    status: TaskStatus
    """The status of the task."""

    final: bool
    """Indicates the end of the event stream."""

    metadata: NotRequired[dict[str, Any]]
    """Extension metadata."""

id instance-attribute

id: str

The id of the task.

status instance-attribute

status: TaskStatus

The status of the task.

final instance-attribute

final: bool

Indicates the end of the event stream.

metadata instance-attribute

metadata: NotRequired[dict[str, Any]]

Extension metadata.

TaskArtifactUpdateEvent

Bases: TypedDict

Sent by server during sendSubscribe or subscribe requests.

Source code in fasta2a/fasta2a/schema.py
367
368
369
370
371
372
373
374
375
376
377
378
@pydantic.with_config(config={'alias_generator': to_camel})
class TaskArtifactUpdateEvent(TypedDict):
    """Sent by server during sendSubscribe or subscribe requests."""

    id: str
    """The id of the task."""

    artifact: Artifact
    """The artifact that was updated."""

    metadata: NotRequired[dict[str, Any]]
    """Extension metadata."""

id instance-attribute

id: str

The id of the task.

artifact instance-attribute

artifact: Artifact

The artifact that was updated.

metadata instance-attribute

metadata: NotRequired[dict[str, Any]]

Extension metadata.

TaskIdParams

Bases: TypedDict

Parameters for a task id.

Source code in fasta2a/fasta2a/schema.py
381
382
383
384
385
386
@pydantic.with_config(config={'alias_generator': to_camel})
class TaskIdParams(TypedDict):
    """Parameters for a task id."""

    id: str
    metadata: NotRequired[dict[str, Any]]

TaskQueryParams

Bases: TaskIdParams

Query parameters for a task.

Source code in fasta2a/fasta2a/schema.py
389
390
391
392
393
394
@pydantic.with_config(config={'alias_generator': to_camel})
class TaskQueryParams(TaskIdParams):
    """Query parameters for a task."""

    history_length: NotRequired[int]
    """Number of recent messages to be retrieved."""

history_length instance-attribute

history_length: NotRequired[int]

Number of recent messages to be retrieved.

TaskSendParams

Bases: TypedDict

Sent by the client to the agent to create, continue, or restart a task.

Source code in fasta2a/fasta2a/schema.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
@pydantic.with_config(config={'alias_generator': to_camel})
class TaskSendParams(TypedDict):
    """Sent by the client to the agent to create, continue, or restart a task."""

    id: str
    """The id of the task."""

    session_id: NotRequired[str]
    """The server creates a new sessionId for new tasks if not set."""

    message: Message
    """The message to send to the agent."""

    history_length: NotRequired[int]
    """Number of recent messages to be retrieved."""

    push_notification: NotRequired[PushNotificationConfig]
    """Where the server should send notifications when disconnected."""

    metadata: NotRequired[dict[str, Any]]
    """Extension metadata."""

id instance-attribute

id: str

The id of the task.

session_id instance-attribute

session_id: NotRequired[str]

The server creates a new sessionId for new tasks if not set.

message instance-attribute

message: Message

The message to send to the agent.

history_length instance-attribute

history_length: NotRequired[int]

Number of recent messages to be retrieved.

push_notification instance-attribute

push_notification: NotRequired[PushNotificationConfig]

Where the server should send notifications when disconnected.

metadata instance-attribute

metadata: NotRequired[dict[str, Any]]

Extension metadata.

JSONRPCMessage

Bases: TypedDict

A JSON RPC message.

Source code in fasta2a/fasta2a/schema.py
420
421
422
423
424
425
426
427
class JSONRPCMessage(TypedDict):
    """A JSON RPC message."""

    jsonrpc: Literal['2.0']
    """The JSON RPC version."""

    id: int | str | None
    """The request id."""

jsonrpc instance-attribute

jsonrpc: Literal['2.0']

The JSON RPC version.

id instance-attribute

id: int | str | None

The request id.

JSONRPCRequest

Bases: JSONRPCMessage, Generic[Method, Params]

A JSON RPC request.

Source code in fasta2a/fasta2a/schema.py
434
435
436
437
438
439
440
441
class JSONRPCRequest(JSONRPCMessage, Generic[Method, Params]):
    """A JSON RPC request."""

    method: Method
    """The method to call."""

    params: Params
    """The parameters to pass to the method."""

method instance-attribute

method: Method

The method to call.

params instance-attribute

params: Params

The parameters to pass to the method.

JSONRPCError

Bases: TypedDict, Generic[CodeT, MessageT]

A JSON RPC error.

Source code in fasta2a/fasta2a/schema.py
452
453
454
455
456
457
class JSONRPCError(TypedDict, Generic[CodeT, MessageT]):
    """A JSON RPC error."""

    code: CodeT
    message: MessageT
    data: NotRequired[Any]

JSONRPCResponse

Bases: JSONRPCMessage, Generic[ResultT, ErrorT]

A JSON RPC response.

Source code in fasta2a/fasta2a/schema.py
464
465
466
467
468
class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]):
    """A JSON RPC response."""

    result: NotRequired[ResultT]
    error: NotRequired[ErrorT]

JSONParseError module-attribute

JSONParseError = JSONRPCError[
    Literal[-32700], Literal["Invalid JSON payload"]
]

A JSON RPC error for a parse error.

InvalidRequestError module-attribute

InvalidRequestError = JSONRPCError[
    Literal[-32600],
    Literal["Request payload validation error"],
]

A JSON RPC error for an invalid request.

MethodNotFoundError module-attribute

MethodNotFoundError = JSONRPCError[
    Literal[-32601], Literal["Method not found"]
]

A JSON RPC error for a method not found.

InvalidParamsError module-attribute

InvalidParamsError = JSONRPCError[
    Literal[-32602], Literal["Invalid parameters"]
]

A JSON RPC error for invalid parameters.

InternalError module-attribute

InternalError = JSONRPCError[
    Literal[-32603], Literal["Internal error"]
]

A JSON RPC error for an internal error.

TaskNotFoundError module-attribute

TaskNotFoundError = JSONRPCError[
    Literal[-32001], Literal["Task not found"]
]

A JSON RPC error for a task not found.

TaskNotCancelableError module-attribute

TaskNotCancelableError = JSONRPCError[
    Literal[-32002], Literal["Task not cancelable"]
]

A JSON RPC error for a task not cancelable.

PushNotificationNotSupportedError module-attribute

PushNotificationNotSupportedError = JSONRPCError[
    Literal[-32003],
    Literal["Push notification not supported"],
]

A JSON RPC error for a push notification not supported.

UnsupportedOperationError module-attribute

UnsupportedOperationError = JSONRPCError[
    Literal[-32004],
    Literal["This operation is not supported"],
]

A JSON RPC error for an unsupported operation.

ContentTypeNotSupportedError module-attribute

ContentTypeNotSupportedError = JSONRPCError[
    Literal[-32005], Literal["Incompatible content types"]
]

A JSON RPC error for incompatible content types.

SendTaskRequest module-attribute

SendTaskRequest = JSONRPCRequest[
    Literal["tasks/send"], TaskSendParams
]

A JSON RPC request to send a task.

SendTaskResponse module-attribute

SendTaskResponse = JSONRPCResponse[
    Task, JSONRPCError[Any, Any]
]

A JSON RPC response to send a task.

SendTaskStreamingRequest module-attribute

SendTaskStreamingRequest = JSONRPCRequest[
    Literal["tasks/sendSubscribe"], TaskSendParams
]

A JSON RPC request to send a task and receive updates.

SendTaskStreamingResponse module-attribute

A JSON RPC response to send a task and receive updates.

GetTaskRequest module-attribute

GetTaskRequest = JSONRPCRequest[
    Literal["tasks/get"], TaskQueryParams
]

A JSON RPC request to get a task.

GetTaskResponse module-attribute

A JSON RPC response to get a task.

CancelTaskRequest module-attribute

CancelTaskRequest = JSONRPCRequest[
    Literal["tasks/cancel"], TaskIdParams
]

A JSON RPC request to cancel a task.

CancelTaskResponse module-attribute

A JSON RPC response to cancel a task.

SetTaskPushNotificationRequest module-attribute

SetTaskPushNotificationRequest = JSONRPCRequest[
    Literal["tasks/pushNotification/set"],
    TaskPushNotificationConfig,
]

A JSON RPC request to set a task push notification.

SetTaskPushNotificationResponse module-attribute

A JSON RPC response to set a task push notification.

GetTaskPushNotificationRequest module-attribute

GetTaskPushNotificationRequest = JSONRPCRequest[
    Literal["tasks/pushNotification/get"], TaskIdParams
]

A JSON RPC request to get a task push notification.

GetTaskPushNotificationResponse module-attribute

A JSON RPC response to get a task push notification.

ResubscribeTaskRequest module-attribute

ResubscribeTaskRequest = JSONRPCRequest[
    Literal["tasks/resubscribe"], TaskIdParams
]

A JSON RPC request to resubscribe to a task.

A2ARequest module-attribute

A JSON RPC request to the A2A server.

A2AResponse module-attribute

A JSON RPC response from the A2A server.

A2AClient

A client for the A2A protocol.

Source code in fasta2a/fasta2a/client.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class A2AClient:
    """A client for the A2A protocol."""

    def __init__(self, base_url: str = 'http://localhost:8000', http_client: httpx.AsyncClient | None = None) -> None:
        if http_client is None:
            self.http_client = httpx.AsyncClient(base_url=base_url)
        else:
            self.http_client = http_client
            self.http_client.base_url = base_url

    async def send_task(
        self,
        message: Message,
        history_length: int | None = None,
        push_notification: PushNotificationConfig | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> SendTaskResponse:
        task = TaskSendParams(message=message, id=str(uuid.uuid4()))
        if history_length is not None:
            task['history_length'] = history_length
        if push_notification is not None:
            task['push_notification'] = push_notification
        if metadata is not None:
            task['metadata'] = metadata

        payload = SendTaskRequest(jsonrpc='2.0', id=None, method='tasks/send', params=task)
        content = a2a_request_ta.dump_json(payload, by_alias=True)
        response = await self.http_client.post('/', content=content, headers={'Content-Type': 'application/json'})
        self._raise_for_status(response)
        return send_task_response_ta.validate_json(response.content)

    async def get_task(self, task_id: str) -> GetTaskResponse:
        payload = GetTaskRequest(jsonrpc='2.0', id=None, method='tasks/get', params={'id': task_id})
        content = a2a_request_ta.dump_json(payload, by_alias=True)
        response = await self.http_client.post('/', content=content, headers={'Content-Type': 'application/json'})
        self._raise_for_status(response)
        return get_task_response_ta.validate_json(response.content)

    def _raise_for_status(self, response: httpx.Response) -> None:
        if response.status_code >= 400:
            raise UnexpectedResponseError(response.status_code, response.text)

UnexpectedResponseError

Bases: Exception

An error raised when an unexpected response is received from the server.

Source code in fasta2a/fasta2a/client.py
73
74
75
76
77
78
class UnexpectedResponseError(Exception):
    """An error raised when an unexpected response is received from the server."""

    def __init__(self, status_code: int, content: str) -> None:
        self.status_code = status_code
        self.content = content