A2A · DISCOVERY

에이전트 탐색 & 레지스트리

🔍 Agent Card 탐색 패턴 📋 중앙 레지스트리 설계 🤖 LLM 기반 자동 라우팅

Agent Card를 조회하는 방법부터 중앙 레지스트리로 다수의 Agent를 관리하고, LLM이 자동으로 최적의 Agent를 선택하는 패턴까지 다룹니다.

탐색 계층 구조

A2A 탐색은 세 가지 수준으로 이뤄집니다. 단순 시스템은 정적 URL 목록으로 충분하지만, 동적으로 확장되는 멀티에이전트 시스템에서는 레지스트리와 LLM 기반 라우팅이 필요합니다.

Level 1. 정적 등록 환경변수 / 설정 파일로 Agent URL 하드코딩 소규모 고정 토폴로지 Level 2. 중앙 레지스트리 Agent Card 수집 서버 태그/기능별 검색 API 중규모 동적 토폴로지 Level 3. LLM 자동 라우팅 Agent Card를 LLM에 제공 의미론적 능력 매칭 대규모 동적 환경 탐색 흐름 (Level 2 기준) ① Agent 시작 Card를 레지스트리에 등록 ② 레지스트리 태그/기능 인덱싱 ③ Orchestrator 기능 기반 쿼리 ④ tasks/send 선택된 Agent 호출 Health check 실패 시 레지스트리에서 자동 제거 그림 1. A2A 에이전트 탐색 계층 구조

Agent Card 수동 조회

가장 기본적인 탐색 — 알려진 URL에서 Agent Card를 직접 조회하고 능력을 확인합니다.

pythondiscovery.py — Agent Card 조회 및 능력 검증
import httpx
from dataclasses import dataclass
from typing import Optional

@dataclass
class AgentInfo:
    name: str
    url: str
    description: str
    skills: list[dict]
    supports_streaming: bool
    auth_schemes: list[str]

async def fetch_agent_card(base_url: str, timeout: float = 5.0) -> Optional[AgentInfo]:
    try:
        async with httpx.AsyncClient(timeout=timeout) as client:
            resp = await client.get(f"{base_url.rstrip('/')}/.well-known/agent.json")
            resp.raise_for_status()
            card = resp.json()
    except (httpx.HTTPError, ValueError) as e:
        print(f"[Discovery] {base_url} 조회 실패: {e}")
        return None

    return AgentInfo(
        name=card.get("name", "Unknown"),
        url=card.get("url", base_url),
        description=card.get("description", ""),
        skills=card.get("skills", []),
        supports_streaming=card.get("capabilities", {}).get("streaming", False),
        auth_schemes=card.get("authentication", {}).get("schemes", [])
    )

def agent_supports_skill(agent: AgentInfo, skill_id: str) -> bool:
    return any(s.get("id") == skill_id for s in agent.skills)

def agents_with_tag(agents: list[AgentInfo], tag: str) -> list[AgentInfo]:
    result = []
    for agent in agents:
        for skill in agent.skills:
            if tag in skill.get("tags", []):
                result.append(agent)
                break
    return result

# 사용 예시
async def discover_agents():
    known_urls = [
        "http://search-agent:8001",
        "http://code-agent:8002",
        "http://analysis-agent:8003",
    ]
    agents = [a for a in
              (await asyncio.gather(*[fetch_agent_card(u) for u in known_urls]))
              if a is not None]
    print(f"발견된 Agent: {[a.name for a in agents]}")
    return agents

중앙 레지스트리 서버 구현

다수의 Agent를 동적으로 관리하는 레지스트리 서버입니다. Agent가 시작 시 등록하고, 헬스체크로 생존 여부를 모니터링합니다.

pythonregistry_server.py — Agent 레지스트리 FastAPI 서버
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import asyncio, httpx, time

app = FastAPI(title="A2A Agent Registry")

class RegisterRequest(BaseModel):
    url: str           # Agent 기본 URL
    ttl: int = 300    # 초 단위 만료 시간

# 메모리 스토어 (프로덕션에서는 Redis 사용)
registry: dict[str, dict] = {}   # url → {card, registered_at, expires_at}

@app.post("/register")
async def register_agent(req: RegisterRequest, bg: BackgroundTasks):
    """Agent가 자신을 등록 — 시작 시 호출"""
    card = await fetch_card(req.url)
    if not card:
        raise HTTPException(400, f"Agent Card 조회 실패: {req.url}")

    registry[req.url] = {
        "card": card,
        "registered_at": time.time(),
        "expires_at": time.time() + req.ttl,
        "healthy": True
    }
    return {"status": "registered", "agent": card["name"]}

@app.delete("/register")
async def deregister_agent(url: str):
    """Agent 종료 시 등록 해제"""
    registry.pop(url, None)
    return {"status": "deregistered"}

@app.get("/agents")
async def list_agents(tag: str = None, skill_id: str = None, streaming: bool = None):
    """건강한 Agent 목록 반환. 필터 파라미터 지원"""
    now = time.time()
    results = []
    for url, entry in registry.items():
        if entry["expires_at"] < now or not entry["healthy"]:
            continue
        card = entry["card"]
        if streaming is not None:
            if card.get("capabilities", {}).get("streaming") != streaming:
                continue
        if skill_id and not any(s["id"] == skill_id for s in card.get("skills", [])):
            continue
        if tag and not any(
            tag in s.get("tags", []) for s in card.get("skills", [])
        ):
            continue
        results.append(card)
    return {"agents": results, "count": len(results)}

# 헬스체크 백그라운드 태스크
@app.on_event("startup")
async def start_health_check():
    asyncio.create_task(health_check_loop())

async def health_check_loop():
    while True:
        await asyncio.sleep(30)
        for url in list(registry):
            ok = await ping_agent(url)
            if url in registry:
                registry[url]["healthy"] = ok

async def ping_agent(url: str) -> bool:
    try:
        async with httpx.AsyncClient(timeout=3.0) as c:
            resp = await c.get(f"{url}/.well-known/agent.json")
            return resp.status_code == 200
    except:
        return False

async def fetch_card(url: str) -> dict | None:
    try:
        async with httpx.AsyncClient(timeout=5.0) as c:
            r = await c.get(f"{url.rstrip('/')}/.well-known/agent.json")
            return r.json() if r.status_code == 200 else None
    except:
        return None

LLM 기반 자동 라우팅

Agent Card의 descriptionskills를 LLM에 제공하면, LLM이 사용자 요청에 가장 적합한 Agent를 자동으로 선택합니다.

pythonllm_router.py — LLM 기반 Agent 자동 선택
import json
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-haiku-4-5-20251001")  # 라우팅은 경량 모델 충분

def format_agents_for_llm(agents: list[dict]) -> str:
    """Agent 목록을 LLM이 읽기 쉬운 형태로 변환"""
    lines = []
    for i, agent in enumerate(agents, 1):
        skills_str = ", ".join(
            f"{s['id']}({s.get('description','')})"
            for s in agent.get("skills", [])
        )
        lines.append(
            f"{i}. {agent['name']} — {agent.get('description', '')}\n"
            f"   Skills: {skills_str}"
        )
    return "\n".join(lines)

async def llm_select_agents(
    user_query: str,
    available_agents: list[dict],
    max_agents: int = 3
) -> list[str]:
    """LLM이 사용자 쿼리에 맞는 Agent를 선택하여 이름 목록 반환"""
    agent_list = format_agents_for_llm(available_agents)

    prompt = f"""당신은 AI Agent 라우터입니다. 사용자 요청을 처리하기에 가장 적합한 Agent를 선택하세요.

사용 가능한 Agent:
{agent_list}

사용자 요청: {user_query}

규칙:
- 최대 {max_agents}개 Agent 선택
- 꼭 필요한 Agent만 선택 (불필요한 Agent는 제외)
- 선택한 Agent 이름을 JSON 배열로만 답하세요

예시: ["Search Agent", "Analysis Agent"]"""

    response = await llm.ainvoke([{"role": "user", "content": prompt}])
    try:
        selected = json.loads(response.content)
        return [name for name in selected if isinstance(name, str)]
    except:
        return [available_agents[0]["name"]] if available_agents else []

# 레지스트리 + LLM 라우팅 통합
async def smart_route(user_query: str, registry_url: str) -> list[dict]:
    """레지스트리에서 Agent 목록을 조회하고 LLM으로 선택"""
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"{registry_url}/agents")
        all_agents = resp.json()["agents"]

    selected_names = await llm_select_agents(user_query, all_agents)
    return [a for a in all_agents if a["name"] in selected_names]

Agent 자가 등록 패턴

Agent 서버가 시작될 때 레지스트리에 자동 등록하고, 종료될 때 등록을 해제합니다. FastAPI lifecycle events를 활용합니다.

pythonself_registration.py — FastAPI lifespan으로 자가 등록
from contextlib import asynccontextmanager
import httpx, os, asyncio

REGISTRY_URL = os.getenv("REGISTRY_URL", "http://registry:8000")
SELF_URL     = os.getenv("AGENT_URL",    "http://search-agent:8001")
HEARTBEAT_INTERVAL = 60  # TTL보다 짧게
TTL = 300

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 시작: 레지스트리 등록
    await register_self()
    heartbeat_task = asyncio.create_task(heartbeat_loop())

    yield  # ← 서버 실행 중

    # 종료: 등록 해제
    heartbeat_task.cancel()
    await deregister_self()

async def register_self():
    async with httpx.AsyncClient() as c:
        try:
            await c.post(
                f"{REGISTRY_URL}/register",
                json={"url": SELF_URL, "ttl": TTL}
            )
            print(f"[Registry] 등록 완료: {SELF_URL}")
        except httpx.HTTPError as e:
            print(f"[Registry] 등록 실패 (계속 실행): {e}")

async def deregister_self():
    async with httpx.AsyncClient() as c:
        await c.delete(f"{REGISTRY_URL}/register", params={"url": SELF_URL})

async def heartbeat_loop():
    """TTL이 만료되기 전에 주기적으로 재등록"""
    while True:
        await asyncio.sleep(HEARTBEAT_INTERVAL)
        await register_self()

app = FastAPI(lifespan=lifespan)
💡 프로덕션 레지스트리 고려사항
  • 스토리지 — 인메모리 대신 Redis 사용 (재시작에도 등록 유지)
  • 네임스페이스 — 환경(dev/prod)별로 레지스트리를 분리하거나 태그로 구분
  • 인증 — 레지스트리 등록 API에 API Key 또는 mTLS 적용
  • 카드 캐싱 — 레지스트리가 Agent Card를 캐시해 탐색 지연 최소화
  • 버전 라우팅 — Agent Card의 version 필드로 카나리 배포 구현