A2A · ORCHESTRATION

Orchestrator 설계

📐 아키텍처 패턴 💻 순차/병렬 실행 🔧 장애 처리 전략

여러 원격 A2A Agent를 조율하는 Orchestrator의 핵심 책임, 태스크 분해·위임·집계 패턴, 장애 시나리오 처리를 설계합니다.

Orchestrator의 4가지 책임

🗂️
태스크 분해
복잡한 사용자 요청을 원자적 서브태스크로 분해. LLM 추론으로 의존 관계 파악
🎯
Agent 선택
Agent Card의 skills, capabilities를 분석해 각 서브태스크에 최적 Agent 배정
🔄
실행 조율
독립 태스크는 병렬로, 의존성 있는 태스크는 순차로 실행. 진행 상황 추적
🧩
결과 통합
각 Agent Artifact를 수집하고 최종 응답으로 합성. 최종 사용자에게 전달

Orchestrator 아키텍처 패턴

🎭 Orchestrator Agent 1. 분해 2. 선택 3. 실행 4. 통합 Agent Card 조회 → tasks/send / tasks/sendSubscribe GET /agent.json GET /agent.json GET /agent.json 🔍 검색 Agent skills: [web-search] streaming: true 💻 코드 Agent skills: [code-exec] streaming: false 📊 분석 Agent skills: [data-analysis] streaming: true Artifact: 검색 결과 Artifact: 실행 코드 Artifact: 분석 보고서 → Orchestrator가 3개 Artifact를 통합해 최종 응답 생성 그림 1. Orchestrator가 3개 Agent에게 병렬로 Task 위임하고 결과 통합

순차 vs 병렬 실행 구현

python병렬 + 순차 혼합 Orchestrator
import asyncio, httpx, uuid, json

class A2AClient:
    def __init__(self, agent_url: str, token: str = ""):
        self.url = agent_url
        self.headers = {"Authorization": f"Bearer {token}"} if token else {}

    async def send_task(self, message: str, skill_id: str = None) -> dict:
        payload = {
            "jsonrpc": "2.0", "id": 1,
            "method": "tasks/send",
            "params": {
                "id": str(uuid.uuid4()),
                "skill": skill_id,  # 특정 Skill 지정 (선택)
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": message}]
                }
            }
        }
        async with httpx.AsyncClient(timeout=60) as client:
            resp = await client.post(self.url, json=payload, headers=self.headers)
            result = resp.json()["result"]
            if result["status"]["state"] == "failed":
                raise RuntimeError(f"Task failed: {result['status']['message']}")
            return result


async def orchestrate(user_query: str) -> str:
    search = A2AClient("http://search-agent:8001")
    code   = A2AClient("http://code-agent:8002")
    report = A2AClient("http://report-agent:8003")

    # 1단계: 검색과 코드 생성 병렬 실행
    search_task, code_task = await asyncio.gather(
        search.send_task(f"{user_query}에 대해 검색해줘"),
        code.send_task(f"{user_query} 분석 코드 작성해줘"),
        return_exceptions=True  # 하나가 실패해도 계속 진행
    )

    # 실패 처리
    search_text = _extract_text(search_task) if not isinstance(search_task, Exception) else "검색 실패"
    code_text   = _extract_text(code_task)   if not isinstance(code_task, Exception)   else "코드 생성 실패"

    # 2단계: 결과를 받아 최종 보고서 생성 (순차)
    final = await report.send_task(
        f"다음 결과로 최종 보고서를 작성해줘:\n검색: {search_text}\n코드: {code_text}"
    )
    return _extract_text(final)

def _extract_text(task: dict) -> str:
    for artifact in task.get("artifacts", []):
        for part in artifact.get("parts", []):
            if part["type"] == "text":
                return part["text"]
    return ""

장애 처리 전략

✓ 전략
Fallback Agent
주 Agent 실패 시 동일 Skill을 제공하는 예비 Agent로 자동 전환
✓ 전략
Partial Success
일부 Agent 실패해도 성공한 결과만으로 최종 응답 생성. return_exceptions=True 활용
✓ 전략
Circuit Breaker
연속 실패한 Agent는 일정 시간 호출 차단. 복구 후 자동 재연결
✓ 전략
Timeout Budget
전체 Orchestration에 최대 시간 예산 설정. 개별 Agent timeout = 전체 / 의존 단계 수