A2A · ORCHESTRATION
Orchestrator 설계
여러 원격 A2A Agent를 조율하는 Orchestrator의 핵심 책임, 태스크 분해·위임·집계 패턴, 장애 시나리오 처리를 설계합니다.
Orchestrator의 4가지 책임
태스크 분해
복잡한 사용자 요청을 원자적 서브태스크로 분해. LLM 추론으로 의존 관계 파악
Agent 선택
Agent Card의 skills, capabilities를 분석해 각 서브태스크에 최적 Agent 배정
실행 조율
독립 태스크는 병렬로, 의존성 있는 태스크는 순차로 실행. 진행 상황 추적
결과 통합
각 Agent Artifact를 수집하고 최종 응답으로 합성. 최종 사용자에게 전달
Orchestrator 아키텍처 패턴
그림 1. Orchestrator가 3개 Agent에게 병렬로 Task 위임하고 결과 통합
순차 vs 병렬 실행 구현
python병렬 + 순차 혼합 Orchestrator
import asyncio, httpx, uuid, json
class A2AClient:
def __init__(self, agent_url: str, token: str = ""):
self.url = agent_url
self.headers = {"Authorization": f"Bearer {token}"} if token else {}
async def send_task(self, message: str, skill_id: str = None) -> dict:
payload = {
"jsonrpc": "2.0", "id": 1,
"method": "tasks/send",
"params": {
"id": str(uuid.uuid4()),
"skill": skill_id, # 특정 Skill 지정 (선택)
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
}
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(self.url, json=payload, headers=self.headers)
result = resp.json()["result"]
if result["status"]["state"] == "failed":
raise RuntimeError(f"Task failed: {result['status']['message']}")
return result
async def orchestrate(user_query: str) -> str:
search = A2AClient("http://search-agent:8001")
code = A2AClient("http://code-agent:8002")
report = A2AClient("http://report-agent:8003")
# 1단계: 검색과 코드 생성 병렬 실행
search_task, code_task = await asyncio.gather(
search.send_task(f"{user_query}에 대해 검색해줘"),
code.send_task(f"{user_query} 분석 코드 작성해줘"),
return_exceptions=True # 하나가 실패해도 계속 진행
)
# 실패 처리
search_text = _extract_text(search_task) if not isinstance(search_task, Exception) else "검색 실패"
code_text = _extract_text(code_task) if not isinstance(code_task, Exception) else "코드 생성 실패"
# 2단계: 결과를 받아 최종 보고서 생성 (순차)
final = await report.send_task(
f"다음 결과로 최종 보고서를 작성해줘:\n검색: {search_text}\n코드: {code_text}"
)
return _extract_text(final)
def _extract_text(task: dict) -> str:
for artifact in task.get("artifacts", []):
for part in artifact.get("parts", []):
if part["type"] == "text":
return part["text"]
return ""
장애 처리 전략
✓ 전략
Fallback Agent
주 Agent 실패 시 동일 Skill을 제공하는 예비 Agent로 자동 전환
✓ 전략
Partial Success
일부 Agent 실패해도 성공한 결과만으로 최종 응답 생성. return_exceptions=True 활용
✓ 전략
Circuit Breaker
연속 실패한 Agent는 일정 시간 호출 차단. 복구 후 자동 재연결
✓ 전략
Timeout Budget
전체 Orchestration에 최대 시간 예산 설정. 개별 Agent timeout = 전체 / 의존 단계 수