A2A · LANGGRAPH

LangGraph + A2A 구현

🔗 A2A 서버로 LangGraph 🌐 A2A 클라이언트 연동 🔀 멀티에이전트 그래프

LangGraph Agent를 A2A 서버로 노출하고, 다른 LangGraph Agent가 A2A 클라이언트로 호출하는 완전한 멀티에이전트 시스템을 구축합니다.

전체 구조 개요

LangGraph와 A2A를 결합하면 각 Agent가 독립적인 LangGraph 그래프로 구현되면서, A2A 프로토콜을 통해 서로 협력하는 시스템이 됩니다. 각 Agent는 A2A 서버 역할(incoming task 수신)과 A2A 클라이언트 역할(다른 agent에게 task 위임)을 동시에 수행할 수 있습니다.

👤 사용자 요청 🎭 Orchestrator (LangGraph) StateGraph + A2AClientNode route_task() → [search_node | code_node | analysis_node] → aggregate() A2A SERVER 🔍 Search Agent LangGraph ReAct Tavily + DuckDuckGo Tools A2A SERVER 💻 Code Agent LangGraph ReAct Python Sandbox + Linter A2A SERVER 📊 Analysis Agent LangGraph + SSE Stream Pandas + Matplotlib A2A SERVER 각 Agent: FastAPI + LangGraph + A2A SDK — 독립 배포 가능 그림 1. LangGraph + A2A 멀티에이전트 아키텍처 — 각 노드가 독립 A2A 서버

LangGraph Agent를 A2A 서버로 노출

LangGraph로 만든 ReAct Agent를 A2A 서버로 래핑합니다. FastAPI가 A2A JSON-RPC 요청을 수신하고, LangGraph 그래프를 실행한 뒤 결과를 Task 형식으로 반환합니다.

pythonsearch_agent_server.py — LangGraph Agent를 A2A 서버로
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
from langchain_community.tools.tavily_search import TavilySearchResults
import uuid, json, asyncio

app = FastAPI()

# LangGraph ReAct Agent 구성
llm = ChatAnthropic(model="claude-opus-4-7")
tools = [TavilySearchResults(max_results=5)]
graph = create_react_agent(llm, tools)

AGENT_CARD = {
    "name": "Search Agent",
    "description": "LangGraph ReAct 기반 웹 검색 에이전트",
    "url": "http://localhost:8001",
    "version": "1.0.0",
    "capabilities": {
        "streaming": True,
        "pushNotifications": False,
        "stateTransitionHistory": False
    },
    "authentication": {"schemes": ["none"]},
    "defaultInputModes": ["text/plain"],
    "defaultOutputModes": ["text/plain", "application/json"],
    "skills": [{
        "id": "web-search",
        "name": "Web Search",
        "description": "최신 정보를 웹에서 검색하고 요약합니다",
        "examples": ["2025년 AI 트렌드 검색해줘", "LangGraph 최신 버전 기능 알려줘"]
    }]
}

@app.get("/.well-known/agent.json")
async def agent_card():
    return JSONResponse(AGENT_CARD, headers={"Cache-Control": "max-age=3600"})

@app.post("/")
async def handle_rpc(request: Request):
    body = await request.json()
    method = body.get("method")
    params = body.get("params", {})
    rpc_id = body.get("id")

    if method == "tasks/send":
        result = await run_task(params)
        return JSONResponse({
            "jsonrpc": "2.0", "id": rpc_id, "result": result
        })

    elif method == "tasks/sendSubscribe":
        return StreamingResponse(
            stream_task(params, rpc_id),
            media_type="text/event-stream"
        )

    return JSONResponse({
        "jsonrpc": "2.0", "id": rpc_id,
        "error": {"code": -32601, "message": "Method not found"}
    })

async def run_task(params: dict) -> dict:
    task_id = params.get("id", str(uuid.uuid4()))
    user_message = extract_message(params)

    # LangGraph 실행
    state = await graph.ainvoke({
        "messages": [{"role": "user", "content": user_message}]
    })
    answer = state["messages"][-1].content

    return {
        "id": task_id,
        "status": {"state": "completed"},
        "artifacts": [{
            "name": "search-result",
            "parts": [{"type": "text", "text": answer}]
        }]
    }

async def stream_task(params: dict, rpc_id: str):
    task_id = params.get("id", str(uuid.uuid4()))
    user_message = extract_message(params)

    def emit(event: dict) -> str:
        return f"data: {json.dumps(event)}\n\n"

    # submitted 이벤트
    yield emit({
        "jsonrpc": "2.0", "id": rpc_id,
        "result": {"id": task_id, "status": {"state": "submitted"}, "final": False}
    })

    # LangGraph astream_events로 진행 상황 스트리밍
    async for event in graph.astream_events(
        {"messages": [{"role": "user", "content": user_message}]},
        version="v2"
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"].content
            if chunk:
                yield emit({
                    "jsonrpc": "2.0", "id": rpc_id,
                    "result": {
                        "id": task_id,
                        "artifact": {
                            "name": "search-result",
                            "parts": [{"type": "text", "text": chunk}],
                            "append": True
                        },
                        "final": False
                    }
                })
        elif kind == "on_tool_start":
            yield emit({
                "jsonrpc": "2.0", "id": rpc_id,
                "result": {
                    "id": task_id,
                    "status": {
                        "state": "working",
                        "message": {
                            "role": "agent",
                            "parts": [{"type": "text", "text": f"검색 중: {event['name']}"}]
                        }
                    },
                    "final": False
                }
            })

    # completed 이벤트
    yield emit({
        "jsonrpc": "2.0", "id": rpc_id,
        "result": {"id": task_id, "status": {"state": "completed"}, "final": True}
    })

def extract_message(params: dict) -> str:
    for msg in params.get("message", {}).get("parts", []):
        if msg.get("type") == "text":
            return msg["text"]
    return ""

LangGraph Orchestrator — A2A 클라이언트 노드

Orchestrator 자체도 LangGraph 그래프로 구현합니다. 각 원격 Agent 호출은 LangGraph 노드로 표현되며, 조건부 엣지로 라우팅을 처리합니다.

pythonorchestrator_graph.py — LangGraph 기반 Orchestrator
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Annotated, Literal
from operator import add
import asyncio, httpx, uuid, json

# ─── 상태 정의 ───────────────────────────────────────
class OrchestratorState(TypedDict):
    user_query: str
    plan: str
    search_result: str
    code_result: str
    analysis_result: str
    final_answer: str
    next_agent: Literal["search", "code", "analysis", "aggregate"]

llm = ChatAnthropic(model="claude-opus-4-7")

AGENT_URLS = {
    "search":   "http://search-agent:8001",
    "code":     "http://code-agent:8002",
    "analysis": "http://analysis-agent:8003",
}

# ─── A2A 호출 헬퍼 ───────────────────────────────────
async def call_a2a_agent(agent_name: str, query: str) -> str:
    url = AGENT_URLS[agent_name]
    task_id = str(uuid.uuid4())
    payload = {
        "jsonrpc": "2.0", "id": "1", "method": "tasks/send",
        "params": {
            "id": task_id,
            "message": {"role": "user", "parts": [{"type": "text", "text": query}]}
        }
    }
    async with httpx.AsyncClient(timeout=60.0) as client:
        resp = await client.post(url, json=payload)
        result = resp.json()["result"]
        for artifact in result.get("artifacts", []):
            for part in artifact.get("parts", []):
                if part["type"] == "text":
                    return part["text"]
    return ""

# ─── 노드 함수 ───────────────────────────────────────
async def planner_node(state: OrchestratorState) -> dict:
    """어떤 Agent를 호출할지 결정"""
    response = await llm.ainvoke([{
        "role": "user",
        "content": f"""다음 질문을 처리하기 위해 어떤 에이전트가 필요한지 JSON으로 답하세요.
질문: {state['user_query']}
형식: {{"agents": ["search", "code", "analysis"], "reason": "..."}}
search: 웹 검색 필요, code: 코드 실행 필요, analysis: 데이터 분석 필요"""
    }])
    plan_text = response.content
    return {"plan": plan_text}

async def search_node(state: OrchestratorState) -> dict:
    result = await call_a2a_agent("search", state["user_query"])
    return {"search_result": result}

async def code_node(state: OrchestratorState) -> dict:
    result = await call_a2a_agent("code", state["user_query"])
    return {"code_result": result}

async def analysis_node(state: OrchestratorState) -> dict:
    result = await call_a2a_agent("analysis", state["user_query"])
    return {"analysis_result": result}

async def aggregate_node(state: OrchestratorState) -> dict:
    parts = []
    if state.get("search_result"):   parts.append(f"[검색 결과]\n{state['search_result']}")
    if state.get("code_result"):     parts.append(f"[코드 결과]\n{state['code_result']}")
    if state.get("analysis_result"): parts.append(f"[분석 결과]\n{state['analysis_result']}")

    response = await llm.ainvoke([{
        "role": "user",
        "content": f"다음 결과들을 통합해 최종 답변을 작성하세요:\n\n{chr(10).join(parts)}"
    }])
    return {"final_answer": response.content}

# ─── 그래프 구성 ─────────────────────────────────────
builder = StateGraph(OrchestratorState)
builder.add_node("planner",   planner_node)
builder.add_node("search",    search_node)
builder.add_node("code",      code_node)
builder.add_node("analysis",  analysis_node)
builder.add_node("aggregate", aggregate_node)

builder.set_entry_point("planner")

# 플래너 → 병렬 실행 (실제로는 Send API 사용)
builder.add_edge("planner", "search")
builder.add_edge("planner", "code")
builder.add_edge("planner", "analysis")
builder.add_edge("search",   "aggregate")
builder.add_edge("code",     "aggregate")
builder.add_edge("analysis", "aggregate")
builder.add_edge("aggregate", END)

orchestrator_graph = builder.compile()

LangGraph Send API — 동적 병렬 실행

플래너가 실행 시점에 어떤 Agent를 호출할지 결정하는 동적 패턴에는 LangGraph의 Send API를 사용합니다.

pythondynamic_dispatch.py — Send API로 동적 병렬 위임
from langgraph.types import Send
import json

async def dynamic_router(state: OrchestratorState) -> list[Send]:
    """플래너 결과를 파싱해 필요한 Agent에만 Send"""
    try:
        plan = json.loads(state["plan"])
        agents = plan.get("agents", [])
    except:
        agents = ["search"]  # 기본값

    sends = []
    for agent in agents:
        if agent in ("search", "code", "analysis"):
            sends.append(Send(agent, {"user_query": state["user_query"]}))

    # 해당 Agent가 없으면 바로 aggregate로
    return sends if sends else [Send("aggregate", state)]

# 그래프에서 conditional_edges로 등록
builder.add_conditional_edges(
    "planner",
    dynamic_router,
    ["search", "code", "analysis", "aggregate"]
)
💡 LangGraph Send API vs asyncio.gather()
  • asyncio.gather() — Python 레벨 병렬화. LangGraph 그래프 외부에서 직접 제어할 때 적합
  • Send API — LangGraph 그래프 내부 병렬화. 체크포인팅, 재시도, 디버깅이 LangGraph 생태계와 통합됨
  • 프로덕션에서는 Send API가 LangSmith 추적, 중단/재개 기능과 자동 통합되어 더 유리

상태 공유 패턴 — A2A 컨텍스트 전달

멀티턴 대화에서 Orchestrator가 이전 결과를 Sub-Agent에 전달해야 할 때, Task의 message 파트에 컨텍스트를 포함합니다.

pythoncontext_passing.py — 이전 결과를 다음 Agent에 전달
async def analysis_node_with_context(state: OrchestratorState) -> dict:
    """검색 결과를 컨텍스트로 분석 Agent에 전달"""
    context_query = f"""다음 검색 결과를 바탕으로 데이터를 분석해주세요.

[원본 질문]
{state['user_query']}

[검색 결과 컨텍스트]
{state.get('search_result', '없음')}

위 결과를 통계적으로 분석하고 인사이트를 도출해주세요."""

    payload = {
        "jsonrpc": "2.0", "id": "1", "method": "tasks/send",
        "params": {
            "id": str(uuid.uuid4()),
            "message": {
                "role": "user",
                "parts": [
                    {"type": "text", "text": context_query},
                    # 구조화 데이터는 data 파트로 전달
                    {"type": "data", "data": {
                        "source": "search_agent",
                        "raw_results": state.get("search_result", "")
                    }}
                ]
            }
        }
    }
    async with httpx.AsyncClient(timeout=120.0) as client:
        resp = await client.post(AGENT_URLS["analysis"], json=payload)
        result_text = extract_text_from_task(resp.json())
    return {"analysis_result": result_text}