A2A · DISCOVERY
에이전트 탐색 & 레지스트리
Agent Card를 조회하는 방법부터 중앙 레지스트리로 다수의 Agent를 관리하고, LLM이 자동으로 최적의 Agent를 선택하는 패턴까지 다룹니다.
탐색 계층 구조
A2A 탐색은 세 가지 수준으로 이뤄집니다. 단순 시스템은 정적 URL 목록으로 충분하지만, 동적으로 확장되는 멀티에이전트 시스템에서는 레지스트리와 LLM 기반 라우팅이 필요합니다.
그림 1. A2A 에이전트 탐색 계층 구조
Agent Card 수동 조회
가장 기본적인 탐색 — 알려진 URL에서 Agent Card를 직접 조회하고 능력을 확인합니다.
pythondiscovery.py — Agent Card 조회 및 능력 검증
import httpx
from dataclasses import dataclass
from typing import Optional
@dataclass
class AgentInfo:
name: str
url: str
description: str
skills: list[dict]
supports_streaming: bool
auth_schemes: list[str]
async def fetch_agent_card(base_url: str, timeout: float = 5.0) -> Optional[AgentInfo]:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(f"{base_url.rstrip('/')}/.well-known/agent.json")
resp.raise_for_status()
card = resp.json()
except (httpx.HTTPError, ValueError) as e:
print(f"[Discovery] {base_url} 조회 실패: {e}")
return None
return AgentInfo(
name=card.get("name", "Unknown"),
url=card.get("url", base_url),
description=card.get("description", ""),
skills=card.get("skills", []),
supports_streaming=card.get("capabilities", {}).get("streaming", False),
auth_schemes=card.get("authentication", {}).get("schemes", [])
)
def agent_supports_skill(agent: AgentInfo, skill_id: str) -> bool:
return any(s.get("id") == skill_id for s in agent.skills)
def agents_with_tag(agents: list[AgentInfo], tag: str) -> list[AgentInfo]:
result = []
for agent in agents:
for skill in agent.skills:
if tag in skill.get("tags", []):
result.append(agent)
break
return result
# 사용 예시
async def discover_agents():
known_urls = [
"http://search-agent:8001",
"http://code-agent:8002",
"http://analysis-agent:8003",
]
agents = [a for a in
(await asyncio.gather(*[fetch_agent_card(u) for u in known_urls]))
if a is not None]
print(f"발견된 Agent: {[a.name for a in agents]}")
return agents
중앙 레지스트리 서버 구현
다수의 Agent를 동적으로 관리하는 레지스트리 서버입니다. Agent가 시작 시 등록하고, 헬스체크로 생존 여부를 모니터링합니다.
pythonregistry_server.py — Agent 레지스트리 FastAPI 서버
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import asyncio, httpx, time
app = FastAPI(title="A2A Agent Registry")
class RegisterRequest(BaseModel):
url: str # Agent 기본 URL
ttl: int = 300 # 초 단위 만료 시간
# 메모리 스토어 (프로덕션에서는 Redis 사용)
registry: dict[str, dict] = {} # url → {card, registered_at, expires_at}
@app.post("/register")
async def register_agent(req: RegisterRequest, bg: BackgroundTasks):
"""Agent가 자신을 등록 — 시작 시 호출"""
card = await fetch_card(req.url)
if not card:
raise HTTPException(400, f"Agent Card 조회 실패: {req.url}")
registry[req.url] = {
"card": card,
"registered_at": time.time(),
"expires_at": time.time() + req.ttl,
"healthy": True
}
return {"status": "registered", "agent": card["name"]}
@app.delete("/register")
async def deregister_agent(url: str):
"""Agent 종료 시 등록 해제"""
registry.pop(url, None)
return {"status": "deregistered"}
@app.get("/agents")
async def list_agents(tag: str = None, skill_id: str = None, streaming: bool = None):
"""건강한 Agent 목록 반환. 필터 파라미터 지원"""
now = time.time()
results = []
for url, entry in registry.items():
if entry["expires_at"] < now or not entry["healthy"]:
continue
card = entry["card"]
if streaming is not None:
if card.get("capabilities", {}).get("streaming") != streaming:
continue
if skill_id and not any(s["id"] == skill_id for s in card.get("skills", [])):
continue
if tag and not any(
tag in s.get("tags", []) for s in card.get("skills", [])
):
continue
results.append(card)
return {"agents": results, "count": len(results)}
# 헬스체크 백그라운드 태스크
@app.on_event("startup")
async def start_health_check():
asyncio.create_task(health_check_loop())
async def health_check_loop():
while True:
await asyncio.sleep(30)
for url in list(registry):
ok = await ping_agent(url)
if url in registry:
registry[url]["healthy"] = ok
async def ping_agent(url: str) -> bool:
try:
async with httpx.AsyncClient(timeout=3.0) as c:
resp = await c.get(f"{url}/.well-known/agent.json")
return resp.status_code == 200
except:
return False
async def fetch_card(url: str) -> dict | None:
try:
async with httpx.AsyncClient(timeout=5.0) as c:
r = await c.get(f"{url.rstrip('/')}/.well-known/agent.json")
return r.json() if r.status_code == 200 else None
except:
return None
LLM 기반 자동 라우팅
Agent Card의 description과 skills를 LLM에 제공하면, LLM이 사용자 요청에 가장 적합한 Agent를 자동으로 선택합니다.
pythonllm_router.py — LLM 기반 Agent 자동 선택
import json
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-haiku-4-5-20251001") # 라우팅은 경량 모델 충분
def format_agents_for_llm(agents: list[dict]) -> str:
"""Agent 목록을 LLM이 읽기 쉬운 형태로 변환"""
lines = []
for i, agent in enumerate(agents, 1):
skills_str = ", ".join(
f"{s['id']}({s.get('description','')})"
for s in agent.get("skills", [])
)
lines.append(
f"{i}. {agent['name']} — {agent.get('description', '')}\n"
f" Skills: {skills_str}"
)
return "\n".join(lines)
async def llm_select_agents(
user_query: str,
available_agents: list[dict],
max_agents: int = 3
) -> list[str]:
"""LLM이 사용자 쿼리에 맞는 Agent를 선택하여 이름 목록 반환"""
agent_list = format_agents_for_llm(available_agents)
prompt = f"""당신은 AI Agent 라우터입니다. 사용자 요청을 처리하기에 가장 적합한 Agent를 선택하세요.
사용 가능한 Agent:
{agent_list}
사용자 요청: {user_query}
규칙:
- 최대 {max_agents}개 Agent 선택
- 꼭 필요한 Agent만 선택 (불필요한 Agent는 제외)
- 선택한 Agent 이름을 JSON 배열로만 답하세요
예시: ["Search Agent", "Analysis Agent"]"""
response = await llm.ainvoke([{"role": "user", "content": prompt}])
try:
selected = json.loads(response.content)
return [name for name in selected if isinstance(name, str)]
except:
return [available_agents[0]["name"]] if available_agents else []
# 레지스트리 + LLM 라우팅 통합
async def smart_route(user_query: str, registry_url: str) -> list[dict]:
"""레지스트리에서 Agent 목록을 조회하고 LLM으로 선택"""
async with httpx.AsyncClient() as client:
resp = await client.get(f"{registry_url}/agents")
all_agents = resp.json()["agents"]
selected_names = await llm_select_agents(user_query, all_agents)
return [a for a in all_agents if a["name"] in selected_names]
Agent 자가 등록 패턴
Agent 서버가 시작될 때 레지스트리에 자동 등록하고, 종료될 때 등록을 해제합니다. FastAPI lifecycle events를 활용합니다.
pythonself_registration.py — FastAPI lifespan으로 자가 등록
from contextlib import asynccontextmanager
import httpx, os, asyncio
REGISTRY_URL = os.getenv("REGISTRY_URL", "http://registry:8000")
SELF_URL = os.getenv("AGENT_URL", "http://search-agent:8001")
HEARTBEAT_INTERVAL = 60 # TTL보다 짧게
TTL = 300
@asynccontextmanager
async def lifespan(app: FastAPI):
# 시작: 레지스트리 등록
await register_self()
heartbeat_task = asyncio.create_task(heartbeat_loop())
yield # ← 서버 실행 중
# 종료: 등록 해제
heartbeat_task.cancel()
await deregister_self()
async def register_self():
async with httpx.AsyncClient() as c:
try:
await c.post(
f"{REGISTRY_URL}/register",
json={"url": SELF_URL, "ttl": TTL}
)
print(f"[Registry] 등록 완료: {SELF_URL}")
except httpx.HTTPError as e:
print(f"[Registry] 등록 실패 (계속 실행): {e}")
async def deregister_self():
async with httpx.AsyncClient() as c:
await c.delete(f"{REGISTRY_URL}/register", params={"url": SELF_URL})
async def heartbeat_loop():
"""TTL이 만료되기 전에 주기적으로 재등록"""
while True:
await asyncio.sleep(HEARTBEAT_INTERVAL)
await register_self()
app = FastAPI(lifespan=lifespan)
💡 프로덕션 레지스트리 고려사항
- 스토리지 — 인메모리 대신 Redis 사용 (재시작에도 등록 유지)
- 네임스페이스 — 환경(dev/prod)별로 레지스트리를 분리하거나 태그로 구분
- 인증 — 레지스트리 등록 API에 API Key 또는 mTLS 적용
- 카드 캐싱 — 레지스트리가 Agent Card를 캐시해 탐색 지연 최소화
- 버전 라우팅 — Agent Card의
version필드로 카나리 배포 구현