I Actually Built It. Here’s Every Line That Matters — and Every Line That Broke First.
A2A Protocol, Part 2: Real Code, Real Failures, Real Production Lessons

If you haven’t read Part 1, here’s the 30-second version:
We built an agentic workflow for a logistics client. When a second client came along, we almost copy-pasted the entire stack. My lead engineer stopped us. We spent a month researching A2A — an open protocol that lets agents built on different frameworks discover each other, delegate tasks, and return typed results over plain HTTP. The idea: one shared set of agents, many customer orchestrators on top.
Part 1 was the theory. This is Part 2. The code. Two validation cases we ran before we trusted any of this in production. And the specific things that broke.
The Architecture — Corrected
Here is the full picture. Read it carefully, because the key insight is that the orchestrator is both a server and a client simultaneously.

Why the orchestrator must be an A2A server too:
If the orchestrator is only a client (a plain Python script), it cannot be discovered by anything above it. It cannot receive tasks in a structured lifecycle. It cannot emit failed events upstream when something goes wrong inside it. It is a dead end in the mesh.
A2A only governs boundaries. The orchestrator has two boundaries: one facing inward (it calls shared agents as a client) and one facing outward (it receives tasks as a server). Both must be implemented.
This means Customer A’s orchestrator and Customer B’s orchestrator are each fully deployed A2A services — with their own Agent Cards, their own AgentExecutor, and their own OAuth2 middleware. The routing logic lives inside their executor, not in standalone scripts.
Project Structure
project/
├── shared_agent/ # deployed once, reused across all customers
│ ├── agent_card.py
│ ├── executor.py
│ ├── agentic_logic.py
│ ├── auth.py
│ └── main.py
│
├── orchestrator/ # one deployment per customer — this IS the A2A server
│ ├── agent_card.py
│ ├── executor.py # routes tasks to shared agents via A2AClient
│ ├── routers/
│ │ ├── rule_router.py
│ │ ├── llm_router.py
│ │ └── hybrid_router.py
│ ├── auth.py
│ └── main.py
│
└── common/
└── a2a_client.py # shared A2AClient wrapper used by orchestrator
Setup
pip install a2a-sdk uvicorn starlette httpx python-jose[cryptography] openai
Part 1: The Shared Agent (Server Only)
The shared agent is a pure A2A server. It does one job well. It knows nothing about which customer is calling it or why. It receives a task, runs its pipeline, and returns a result.
Step 1 — Shared Agent Card
Write skill descriptions like API contracts. The orchestrator’s routing logic — and any LLM making routing decisions — reads these descriptions. Vague descriptions cause wrong routing.
# shared_agent/agent_card.py
from a2a.types import AgentCard, AgentSkill, AgentCapabilities
def build_agent_card(host: str, port: int) -> AgentCard:
return AgentCard(
name="AnomalyDetectionAgent",
description=(
"Detects anomalies in structured time-series or tabular data. "
"Returns a structured report with flagged records and confidence scores."
),
url=f"http://{host}:{port}",
version="1.0.0",
skills=[
AgentSkill(
id="detect_anomalies",
name="Detect Anomalies",
description=(
"Accepts structured data (JSON or CSV-formatted text) and returns "
"a list of anomalous records with scores. "
"Use for: outlier detection, fraud signals, sensor fault detection."
),
inputModes=["text", "data"],
outputModes=["data"],
examples=[
"Detect anomalies in this JSON array of shipment records.",
"Flag outliers in the following sensor readings.",
],
),
AgentSkill(
id="validate_input",
name="Validate Input",
description=(
"Validates input against defined business rules and schema constraints. "
"Use for: schema validation, business rule checks, pre-processing guards."
),
inputModes=["text", "data"],
outputModes=["data"],
),
],
capabilities=AgentCapabilities(streaming=True),
defaultInputModes=["text"],
defaultOutputModes=["data"],
)
Step 2 — Abstract Your Agentic Logic
Keep your actual logic completely decoupled from A2A. Define an interface. The A2A layer never imports your LangGraph graph or Semantic Kernel directly — it only calls the interface. When you swap frameworks, A2A doesn’t change.
# shared_agent/agentic_logic.py
from abc import ABC, abstractmethod
from typing import Any
class AgenticPipeline(ABC):
"""
The abstraction boundary between A2A and your actual agent logic.
Implement this with whatever you have:
LangGraph: graph.ainvoke({"messages": [HumanMessage(content=task_input)]})
Semantic Kernel: kernel.invoke(plugin_name, task_input)
LangChain: chain.ainvoke({"input": task_input})
Pure Python: any deterministic logic, no LLM required
A2A doesn't care which. That's the point.
"""
@abstractmethod
async def run(
self, task_input: str, context: dict[str, Any]
) -> dict[str, Any]:
"""
Returns a dict with at minimum:
{"result": str, "status": "success" | "error"}
"""
raise NotImplementedError
class ConcreteAgenticPipeline(AgenticPipeline):
def __init__(self):
# Initialize your pipeline here.
# e.g. self.graph = build_langgraph_graph()
pass
async def run(
self, task_input: str, context: dict[str, Any]
) -> dict[str, Any]:
# ── REPLACE THIS BLOCK WITH YOUR LOGIC ───────────────────────────────
#
# LangGraph example:
# result = await self.graph.ainvoke(
# {"messages": [HumanMessage(content=task_input)]}
# )
# return {"result": result["output"], "status": "success"}
#
result = f"Processed: {task_input}" # placeholder - replace this
return {"result": result, "status": "success"}
# ─────────────────────────────────────────────────────────────────────
Step 3 — Shared Agent Executor
The try/except is not optional. If your pipeline raises and there is no except block, the event queue never receives a failedevent. The client’s SSE connection stays open forever — a zombie task with no way to close it.
Every code path through execute() must end with final=True.
# shared_agent/executor.py
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
TaskState, TaskStatus,
TaskStatusUpdateEvent, TaskArtifactUpdateEvent,
Artifact, TextPart, Part,
)
from shared_agent.agentic_logic import AgenticPipeline
class SharedAgentExecutor(AgentExecutor):
def __init__(self, pipeline: AgenticPipeline):
self.pipeline = pipeline
async def execute(
self, context: RequestContext, event_queue: EventQueue
) -> None:
task_id = context.task_id
task_input = self._extract_text(context)
# 1. Signal working
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=task_id,
status=TaskStatus(state=TaskState.working),
final=False,
)
)
try:
# 2. Run pipeline
output = await self.pipeline.run(
task_input=task_input,
context={
"task_id": task_id,
"metadata": context.task.metadata or {},
},
)
# 3. Emit artifact
await event_queue.enqueue_event(
TaskArtifactUpdateEvent(
taskId=task_id,
artifact=Artifact(
artifactId="result",
parts=[Part(root=TextPart(text=output["result"]))],
),
lastChunk=True,
)
)
# 4. Signal completed
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=task_id,
status=TaskStatus(state=TaskState.completed),
final=True,
)
)
except Exception as exc:
# 5. Signal failed - without this the client waits forever
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=task_id,
status=TaskStatus(
state=TaskState.failed,
message={"error": str(exc)},
),
final=True,
)
)
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
# Must be a real implementation - a stub leaves SSE connections open
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=context.task_id,
status=TaskStatus(state=TaskState.cancelled),
final=True,
)
)
def _extract_text(self, context: RequestContext) -> str:
for part in context.message.parts:
if hasattr(part.root, "text"):
return part.root.text
return ""
Step 4 — OAuth2 Middleware
The Agent Card endpoint must always be public. Clients need to read it before they know what token to request. If you protect it, discovery breaks entirely and no agent can onboard without manual intervention.
# shared_agent/auth.py
import os
import httpx
from jose import jwt, JWTError
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
KEYCLOAK_URL = os.getenv("KEYCLOAK_URL", "http://localhost:8080")
REALM = os.getenv("KEYCLOAK_REALM", "agent-realm")
AUDIENCE = os.getenv("KEYCLOAK_AUDIENCE", "a2a-server")
REQUIRED_ROLE = os.getenv("REQUIRED_ROLE", "agent-access")
JWKS_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/certs"
_jwks_cache: dict = {} # add TTL + key-rotation refresh before prod
async def _get_jwks() -> dict:
global _jwks_cache
if not _jwks_cache:
async with httpx.AsyncClient() as client:
resp = await client.get(JWKS_URL)
resp.raise_for_status()
_jwks_cache = resp.json()
return _jwks_cache
async def validate_token(token: str) -> dict:
jwks = await _get_jwks()
try:
header = jwt.get_unverified_header(token)
key = next((k for k in jwks["keys"] if k["kid"] == header["kid"]), None)
if not key:
raise ValueError("Signing key not found in JWKS")
claims = jwt.decode(token, key, algorithms=["RS256"], audience=AUDIENCE)
roles = claims.get("realm_access", {}).get("roles", [])
if REQUIRED_ROLE not in roles:
raise ValueError(f"Required role '{REQUIRED_ROLE}' not present")
return claims
except JWTError as exc:
raise ValueError(f"Token validation failed: {exc}") from exc
class OAuth2Middleware(BaseHTTPMiddleware):
UNPROTECTED_PATHS = {"/.well-known/agent.json"} # always public - do not add auth here
async def dispatch(self, request: Request, call_next):
if request.url.path in self.UNPROTECTED_PATHS:
return await call_next(request)
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse(status_code=401, content={"error": "Missing Authorization header"})
token = auth_header.removeprefix("Bearer ").strip()
try:
claims = await validate_token(token)
request.state.token_claims = claims
except ValueError as exc:
return JSONResponse(status_code=401, content={"error": str(exc)})
return await call_next(request)
Step 5 — Shared Agent Server Entry Point
# shared_agent/main.py
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from shared_agent.agent_card import build_agent_card
from shared_agent.executor import SharedAgentExecutor
from shared_agent.agentic_logic import ConcreteAgenticPipeline
from shared_agent.auth import OAuth2Middleware
HOST = "0.0.0.0"
PORT = 8002 # shared agent runs on 8002; orchestrator runs on 8001
def create_app():
pipeline = ConcreteAgenticPipeline()
executor = SharedAgentExecutor(pipeline=pipeline)
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(), # swap for Redis before prod
)
agent_card = build_agent_card(host="localhost", port=PORT)
app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=handler,
).build()
app.add_middleware(OAuth2Middleware)
return app
if __name__ == "__main__":
uvicorn.run(create_app(), host=HOST, port=PORT)
Part 2: The Orchestrator (Server + Client)
This is where the original article was incomplete.
The orchestrator is not a standalone Python script. It is a fully deployed A2A service. It has its own Agent Card, its own AgentExecutor, its own OAuth2 middleware, and its own main.py. The routing logic lives inside its AgentExecutor.execute() method — that is the correct place for it.
When its executor runs, it makes outbound calls to shared agents using A2AClient. It is simultaneously an A2A server (facing the external caller) and an A2A client (facing shared agents).
Step 6 — Common A2A Client Wrapper
This is the shared code that both orchestrators (Customer A, Customer B) use to call any shared agent. It handles auth and message construction.
# common/a2a_client.py
import uuid
import os
import httpx
from a2a.client import A2AClient
from a2a.types import (
SendMessageRequest, MessageSendParams,
Message, TextPart, Part,
)
KEYCLOAK_URL = os.getenv("KEYCLOAK_URL", "http://localhost:8080")
REALM = os.getenv("KEYCLOAK_REALM", "agent-realm")
CLIENT_ID = os.getenv("A2A_CLIENT_ID", "a2a-client")
CLIENT_SECRET = os.getenv("A2A_CLIENT_SECRET", "your-client-secret")
async def get_access_token() -> str:
"""
OAuth2 Client Credentials flow - server-to-server auth.
No user involved. The orchestrator authenticates as itself.
"""
token_url = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"
async with httpx.AsyncClient() as client:
resp = await client.post(
token_url,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
)
resp.raise_for_status()
return resp.json()["access_token"]
async def call_agent(agent_url: str, task_text: str, token: str) -> str:
"""
Send a task to any A2A-compliant server and return the result text.
Works identically regardless of what framework is running inside the server.
"""
async with httpx.AsyncClient() as http_client:
a2a_client = A2AClient(httpx_client=http_client, url=agent_url)
request = SendMessageRequest(
id=str(uuid.uuid4()),
params=MessageSendParams(
message=Message(
role="user",
messageId=str(uuid.uuid4()),
parts=[Part(root=TextPart(text=task_text))],
)
),
)
response = await a2a_client.send_message(
request,
headers={"Authorization": f"Bearer {token}"},
)
task = response.root
for artifact in getattr(task.result, "artifacts", []):
for part in artifact.parts:
if hasattr(part.root, "text"):
return part.root.text
return str(task)
Step 7 — Routing Logic (Inside the Orchestrator)
The routing logic is not standalone scripts anymore. It is a module that the orchestrator’s executor calls internally. The interfaces are identical — only where they live has changed.
# orchestrator/routers/rule_router.py
import re
from dataclasses import dataclass
from typing import Callable
# Registry maps agent_id → URL of the deployed shared agent
AGENT_REGISTRY: dict[str, str] = {
"anomaly_detection": "http://anomaly-agent:8002",
"contract_lookup": "http://contract-agent:8003",
"escalation_draft": "http://escalation-agent:8004",
}
@dataclass
class RoutingRule:
name: str
condition: Callable[[str], bool]
agent_id: str
priority: int # lower = evaluated first; first match wins
ROUTING_RULES: list[RoutingRule] = [
RoutingRule(
name="validation_task",
condition=lambda t: any(
kw in t.lower()
for kw in ["validate", "check", "verify", "is valid", "confirm"]
),
agent_id="anomaly_detection",
priority=1,
),
RoutingRule(
name="analysis_task",
condition=lambda t: any(
kw in t.lower()
for kw in ["analyze", "analyse", "summarize", "summarise", "report on"]
),
agent_id="anomaly_detection",
priority=2,
),
RoutingRule(
name="contract_task",
condition=lambda t: any(
kw in t.lower()
for kw in ["contract", "sla", "agreement", "clause", "terms"]
),
agent_id="contract_lookup",
priority=3,
),
RoutingRule(
name="escalation_task",
condition=lambda t: any(
kw in t.lower()
for kw in ["escalate", "draft escalation", "notify manager", "urgent issue"]
),
agent_id="escalation_draft",
priority=4,
),
RoutingRule(
name="structured_data_task",
condition=lambda t: bool(re.search(r"{.*?}", t, re.DOTALL)),
agent_id="anomaly_detection",
priority=5,
),
# Catch-all - always last; triggers LLM fallback in the hybrid router
RoutingRule(
name="default",
condition=lambda _: True,
agent_id="anomaly_detection",
priority=999,
),
]
def resolve_agent(task: str) -> tuple[str, str]:
"""Returns (agent_id, rule_name) for the first matching rule."""
for rule in sorted(ROUTING_RULES, key=lambda r: r.priority):
if rule.condition(task):
return rule.agent_id, rule.name
raise ValueError("No routing rule matched - add a catch-all rule")
# orchestrator/routers/llm_router.py
import json
import httpx
from openai import AsyncOpenAI
from orchestrator.routers.rule_router import AGENT_REGISTRY
llm = AsyncOpenAI() # set OPENAI_API_KEY in env
async def _fetch_agent_card(agent_url: str) -> dict:
"""Agent Cards are public - no auth needed."""
async with httpx.AsyncClient() as client:
resp = await client.get(f"{agent_url}/.well-known/agent.json")
resp.raise_for_status()
return resp.json()
def _build_routing_prompt(task: str, cards: list[dict]) -> str:
card_summaries = "nn".join(
f"Agent ID: {c['_registry_id']}n"
f"Name: {c.get('name')}n"
f"Description: {c.get('description')}n"
f"Skills:n" + "n".join(
f" - [{s['id']}] {s['description']}"
for s in c.get("skills", [])
)
for c in cards
)
return f"""You are a task router for an agentic system.
Given the task below and the available agents, return the agent_id of the
most appropriate agent. If no agent is a strong match, return the agent
with the broadest scope.
TASK:
{task}
AVAILABLE AGENTS:
{card_summaries}
Respond with JSON only, no other text:
{{"agent_id": "<id>", "skill_id": "<skill_id>", "reason": "<one sentence>"}}"""
async def resolve_agent_llm(task: str) -> tuple[str, str]:
"""
Returns (agent_id, reason) by reading all Agent Cards at runtime
and asking the LLM to pick the best match.
Use temperature=0 - routing must be deterministic.
"""
cards = []
for agent_id, url in AGENT_REGISTRY.items():
try:
card = await _fetch_agent_card(url)
card["_registry_id"] = agent_id
cards.append(card)
except Exception as exc:
print(f"[LLM Router] Warning: could not reach '{agent_id}': {exc}")
if not cards:
raise RuntimeError("No agents reachable - cannot route task")
response = await llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": _build_routing_prompt(task, cards)}],
response_format={"type": "json_object"},
temperature=0,
)
decision = json.loads(response.choices[0].message.content)
chosen_id = decision.get("agent_id")
if chosen_id not in AGENT_REGISTRY:
raise ValueError(
f"LLM chose unknown agent '{chosen_id}'. "
f"Available: {list(AGENT_REGISTRY.keys())}"
)
return chosen_id, decision.get("reason", "")
# orchestrator/routers/hybrid_router.py
from orchestrator.routers.rule_router import resolve_agent, AGENT_REGISTRY
from orchestrator.routers.llm_router import resolve_agent_llm
async def resolve(task: str) -> tuple[str, str]:
"""
Fast path: rule-based for known task shapes (~0ms, zero LLM cost).
Slow path: LLM fallback for anything unclassified.
Returns (agent_url, routing_method) so the caller knows which path fired.
"""
agent_id, matched_rule = resolve_agent(task)
if matched_rule != "default":
agent_url = AGENT_REGISTRY[agent_id]
print(f"[Hybrid Router] Fast path → agent: '{agent_id}' rule: '{matched_rule}'")
return agent_url, f"rule:{matched_rule}"
else:
agent_id, reason = await resolve_agent_llm(task)
agent_url = AGENT_REGISTRY[agent_id]
print(f"[Hybrid Router] Slow path → agent: '{agent_id}' reason: '{reason}'")
return agent_url, f"llm:{reason}"
Step 8 — Orchestrator Agent Card
The orchestrator’s Agent Card describes what the orchestrator offers to external callers. Its skills are higher-level than the shared agents’ skills — they describe business workflows, not technical capabilities.
# orchestrator/agent_card.py
from a2a.types import AgentCard, AgentSkill, AgentCapabilities
def build_agent_card(host: str, port: int) -> AgentCard:
return AgentCard(
name="CustomerAOrchestrator",
description=(
"Orchestrates logistics workflow tasks for Customer A. "
"Routes incoming tasks to the appropriate shared agent and returns results. "
"Handles anomaly detection, contract lookup, and escalation drafting."
),
url=f"http://{host}:{port}",
version="1.0.0",
skills=[
AgentSkill(
id="process_logistics_task",
name="Process Logistics Task",
description=(
"Accepts a natural language logistics task and routes it to the "
"correct shared agent. Returns a structured result. "
"Use for: shipment analysis, anomaly flagging, contract queries, "
"escalation drafting."
),
inputModes=["text"],
outputModes=["text", "data"],
examples=[
"Detect anomalies in this week's shipment data.",
"Look up the SLA terms for contract #4821.",
"Draft an escalation for the delayed Route 7 shipments.",
],
),
],
capabilities=AgentCapabilities(streaming=True),
defaultInputModes=["text"],
defaultOutputModes=["text"],
)
Step 9 — Orchestrator Executor (The Critical Missing Piece)
This is the class that was entirely absent from the original article. The orchestrator’s executor receives a task from an external caller, runs the hybrid router to pick a shared agent, calls that agent via A2AClient, and streams the result back upstream.
It has the same structure as the shared agent’s executor — working → artifact → completed — because it is also an A2A server. The only difference is what happens in the middle: instead of running a local pipeline, it makes an outbound A2A call.
# orchestrator/executor.pyfrom a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
TaskState, TaskStatus,
TaskStatusUpdateEvent, TaskArtifactUpdateEvent,
Artifact, TextPart, Part,
)
from orchestrator.routers.hybrid_router import resolve
from common.a2a_client import get_access_token, call_agent
class OrchestratorExecutor(AgentExecutor):
"""
The orchestrator's AgentExecutor.
Facing inward: receives A2A tasks from external callers via A2A server stack.
Facing outward: calls shared agents via A2AClient.
This is what makes the orchestrator a full A2A citizen instead of
a plain Python script. Without this class, the orchestrator cannot
participate in the mesh as a server - it can only be a leaf client.
"""
async def execute(
self, context: RequestContext, event_queue: EventQueue
) -> None:
task_id = context.task_id
task_input = self._extract_text(context)
# 1. Signal working to the upstream caller
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=task_id,
status=TaskStatus(state=TaskState.working),
final=False,
)
)
try:
# 2. Resolve which shared agent should handle this task
agent_url, routing_method = await resolve(task_input)
# 3. Get a token and call the shared agent via A2AClient
token = await get_access_token()
result = await call_agent(
agent_url=agent_url,
task_text=task_input,
token=token,
)
# 4. Emit the shared agent's result as our artifact upstream
await event_queue.enqueue_event(
TaskArtifactUpdateEvent(
taskId=task_id,
artifact=Artifact(
artifactId="result",
parts=[Part(root=TextPart(text=result))],
),
lastChunk=True,
)
)
# 5. Signal completed to the upstream caller
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=task_id,
status=TaskStatus(state=TaskState.completed),
final=True,
)
)
except Exception as exc:
# 6. Signal failed upstream - without this the caller waits forever
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=task_id,
status=TaskStatus(
state=TaskState.failed,
message={"error": str(exc)},
),
final=True,
)
)
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
taskId=context.task_id,
status=TaskStatus(state=TaskState.cancelled),
final=True,
)
)
def _extract_text(self, context: RequestContext) -> str:
for part in context.message.parts:
if hasattr(part.root, "text"):
return part.root.text
return ""
Step 10 — Orchestrator Server Entry Point
# orchestrator/main.py
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from orchestrator.agent_card import build_agent_card
from orchestrator.executor import OrchestratorExecutor
from shared_agent.auth import OAuth2Middleware # reuse the same middleware
HOST = "0.0.0.0"
PORT = 8001 # orchestrator is the entry point; shared agents are on higher ports
def create_app():
executor = OrchestratorExecutor()
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(), # swap for Redis before prod
)
agent_card = build_agent_card(host="localhost", port=PORT)
app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=handler,
).build()
app.add_middleware(OAuth2Middleware)
return app
if __name__ == "__main__":
uvicorn.run(create_app(), host=HOST, port=PORT)
Validation Case 1: The Task Lifecycle Bug We Didn’t Expect
What we tested: Does the full A2A task lifecycle work — submitted → working → completed — with the artifact arriving before the final status event?
What we ran:
import asyncio, uuid, httpx
from a2a.client import A2AClient
from a2a.types import SendMessageRequest, MessageSendParams, Message, TextPart, Part
async def validate_lifecycle(agent_url: str, token: str):
async with httpx.AsyncClient() as http:
client = A2AClient(httpx_client=http, url=agent_url)
request = SendMessageRequest(
id=str(uuid.uuid4()),
params=MessageSendParams(
message=Message(
role="user",
messageId=str(uuid.uuid4()),
parts=[Part(root=TextPart(text="test task"))],
)
),
)
response = await client.send_message(
request,
headers={"Authorization": f"Bearer {token}"},
)
task = response.root
assert task is not None, "Task result should not be None"
print(f"Final state: {task.result}")
print("✓ Lifecycle validation passed")
asyncio.run(validate_lifecycle("http://localhost:8001", "your-token-here"))
What broke: We deliberately raised an exception inside the pipeline to test error handling. The test hung. The SSE connection stayed open until we killed the process.
The bug: No except block. The exception propagated out of execute() and the event queue never received a failed event. The client had no signal to close the connection.
The fix: The try/except in both executor files. Every code path must end with final=True. This applies to both the shared agent executor and the orchestrator executor equally.
Validation Case 2: The Auth Boundary We Almost Got Wrong
What we tested: Does the server correctly reject bad tokens while serving the Agent Card publicly?
async def validate_auth_boundary(agent_url: str, valid_token: str):
async with httpx.AsyncClient() as client:
# Agent Card must be public (no auth)
card = await client.get(f"{agent_url}/.well-known/agent.json")
assert card.status_code == 200
print("✓ Agent Card: public")
# No auth → 401
r = await client.post(f"{agent_url}/message/send", json={})
assert r.status_code == 401
print("✓ No token: 401")
# Garbage token → 401
r = await client.post(
f"{agent_url}/message/send",
headers={"Authorization": "Bearer garbage.token.here"},
json={},
)
assert r.status_code == 401
print("✓ Invalid token: 401")
# Valid token, correct role → auth passes
r = await client.post(
f"{agent_url}/message/send",
headers={"Authorization": f"Bearer {valid_token}"},
json={}, # incomplete body - testing auth only, not the task
)
assert r.status_code != 401
print("✓ Valid token: auth passed")
What nearly went wrong: In an early version of the middleware we accidentally added /.well-known/agent.json to the protected paths. The Agent Card returned 401.
The consequence: any client agent trying to discover skills would hit the 401 before it had any way of knowing what token to request. Discovery was permanently broken for unauthenticated callers.
The Agent Card must always be public. It contains no sensitive data. Protecting it is not a security gain — it is a discovery failure.
Running It End to End
# Terminal 1: start the shared agent
KEYCLOAK_URL=http://localhost:8080
KEYCLOAK_REALM=agent-realm
python -m shared_agent.main
# Terminal 2: start the orchestrator
KEYCLOAK_URL=http://localhost:8080
KEYCLOAK_REALM=agent-realm
A2A_CLIENT_SECRET=your-secret
python -m orchestrator.main
# Terminal 3: verify both Agent Cards are public
curl http://localhost:8001/.well-known/agent.json | jq . # orchestrator card
curl http://localhost:8002/.well-known/agent.json | jq . # shared agent card
# Terminal 4: run the official A2A inspector against the orchestrator
npx @a2aproject/a2a-inspector http://localhost:8001
# Terminal 5: send a task to the orchestrator (it routes internally to a shared agent)
A2A_CLIENT_SECRET=your-secret python -c "
import asyncio, uuid, httpx
from a2a.client import A2AClient
from a2a.types import SendMessageRequest, MessageSendParams, Message, TextPart, Part
from common.a2a_client import get_access_token
async def main():
token = await get_access_token()
async with httpx.AsyncClient() as http:
client = A2AClient(httpx_client=http, url='http://localhost:8001')
req = SendMessageRequest(
id=str(uuid.uuid4()),
params=MessageSendParams(
message=Message(
role='user',
messageId=str(uuid.uuid4()),
parts=[Part(root=TextPart(text='Validate this JSON payload.'))]
)
)
)
resp = await client.send_message(req, headers={'Authorization': f'Bearer {token}'})
print(resp.root)
asyncio.run(main())
"
What Changed From the Original Architecture

The shared agent layer did not change. Every fix was in the orchestrator. That is the right result: the shared agents are stable, reusable infrastructure. The orchestrators are the thin, customer-specific layer on top.
What’s Coming in Part 3
Part 2 was local. Part 3 is production.
Next article: deploying this on AKS with Azure APIM as the gateway — rate limiting, scope enforcement, and what a real multi-agent mesh looks like when five agents are in the picture instead of one.
I’ll also cover:
- The one architectural decision we made in Part 2 that we had to undo in Part 3
- How we handle agent versioning when a shared agent needs to change without breaking existing orchestrators
- What observability looks like across an A2A mesh — tracing a task from the orchestrator all the way through three agents and back
Part 3 drops in next weeks.
I Actually Built It. Here’s Every Line That Matters — and Every Line That Broke First. was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.