A2A · LANGGRAPH
LangGraph + A2A 구현
LangGraph Agent를 A2A 서버로 노출하고, 다른 LangGraph Agent가 A2A 클라이언트로 호출하는 완전한 멀티에이전트 시스템을 구축합니다.
전체 구조 개요
LangGraph와 A2A를 결합하면 각 Agent가 독립적인 LangGraph 그래프로 구현되면서, A2A 프로토콜을 통해 서로 협력하는 시스템이 됩니다. 각 Agent는 A2A 서버 역할(incoming task 수신)과 A2A 클라이언트 역할(다른 agent에게 task 위임)을 동시에 수행할 수 있습니다.
그림 1. LangGraph + A2A 멀티에이전트 아키텍처 — 각 노드가 독립 A2A 서버
LangGraph Agent를 A2A 서버로 노출
LangGraph로 만든 ReAct Agent를 A2A 서버로 래핑합니다. FastAPI가 A2A JSON-RPC 요청을 수신하고, LangGraph 그래프를 실행한 뒤 결과를 Task 형식으로 반환합니다.
pythonsearch_agent_server.py — LangGraph Agent를 A2A 서버로
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
from langchain_community.tools.tavily_search import TavilySearchResults
import uuid, json, asyncio
app = FastAPI()
# LangGraph ReAct Agent 구성
llm = ChatAnthropic(model="claude-opus-4-7")
tools = [TavilySearchResults(max_results=5)]
graph = create_react_agent(llm, tools)
AGENT_CARD = {
"name": "Search Agent",
"description": "LangGraph ReAct 기반 웹 검색 에이전트",
"url": "http://localhost:8001",
"version": "1.0.0",
"capabilities": {
"streaming": True,
"pushNotifications": False,
"stateTransitionHistory": False
},
"authentication": {"schemes": ["none"]},
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["text/plain", "application/json"],
"skills": [{
"id": "web-search",
"name": "Web Search",
"description": "최신 정보를 웹에서 검색하고 요약합니다",
"examples": ["2025년 AI 트렌드 검색해줘", "LangGraph 최신 버전 기능 알려줘"]
}]
}
@app.get("/.well-known/agent.json")
async def agent_card():
return JSONResponse(AGENT_CARD, headers={"Cache-Control": "max-age=3600"})
@app.post("/")
async def handle_rpc(request: Request):
body = await request.json()
method = body.get("method")
params = body.get("params", {})
rpc_id = body.get("id")
if method == "tasks/send":
result = await run_task(params)
return JSONResponse({
"jsonrpc": "2.0", "id": rpc_id, "result": result
})
elif method == "tasks/sendSubscribe":
return StreamingResponse(
stream_task(params, rpc_id),
media_type="text/event-stream"
)
return JSONResponse({
"jsonrpc": "2.0", "id": rpc_id,
"error": {"code": -32601, "message": "Method not found"}
})
async def run_task(params: dict) -> dict:
task_id = params.get("id", str(uuid.uuid4()))
user_message = extract_message(params)
# LangGraph 실행
state = await graph.ainvoke({
"messages": [{"role": "user", "content": user_message}]
})
answer = state["messages"][-1].content
return {
"id": task_id,
"status": {"state": "completed"},
"artifacts": [{
"name": "search-result",
"parts": [{"type": "text", "text": answer}]
}]
}
async def stream_task(params: dict, rpc_id: str):
task_id = params.get("id", str(uuid.uuid4()))
user_message = extract_message(params)
def emit(event: dict) -> str:
return f"data: {json.dumps(event)}\n\n"
# submitted 이벤트
yield emit({
"jsonrpc": "2.0", "id": rpc_id,
"result": {"id": task_id, "status": {"state": "submitted"}, "final": False}
})
# LangGraph astream_events로 진행 상황 스트리밍
async for event in graph.astream_events(
{"messages": [{"role": "user", "content": user_message}]},
version="v2"
):
kind = event["event"]
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"].content
if chunk:
yield emit({
"jsonrpc": "2.0", "id": rpc_id,
"result": {
"id": task_id,
"artifact": {
"name": "search-result",
"parts": [{"type": "text", "text": chunk}],
"append": True
},
"final": False
}
})
elif kind == "on_tool_start":
yield emit({
"jsonrpc": "2.0", "id": rpc_id,
"result": {
"id": task_id,
"status": {
"state": "working",
"message": {
"role": "agent",
"parts": [{"type": "text", "text": f"검색 중: {event['name']}"}]
}
},
"final": False
}
})
# completed 이벤트
yield emit({
"jsonrpc": "2.0", "id": rpc_id,
"result": {"id": task_id, "status": {"state": "completed"}, "final": True}
})
def extract_message(params: dict) -> str:
for msg in params.get("message", {}).get("parts", []):
if msg.get("type") == "text":
return msg["text"]
return ""
LangGraph Orchestrator — A2A 클라이언트 노드
Orchestrator 자체도 LangGraph 그래프로 구현합니다. 각 원격 Agent 호출은 LangGraph 노드로 표현되며, 조건부 엣지로 라우팅을 처리합니다.
pythonorchestrator_graph.py — LangGraph 기반 Orchestrator
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Annotated, Literal
from operator import add
import asyncio, httpx, uuid, json
# ─── 상태 정의 ───────────────────────────────────────
class OrchestratorState(TypedDict):
user_query: str
plan: str
search_result: str
code_result: str
analysis_result: str
final_answer: str
next_agent: Literal["search", "code", "analysis", "aggregate"]
llm = ChatAnthropic(model="claude-opus-4-7")
AGENT_URLS = {
"search": "http://search-agent:8001",
"code": "http://code-agent:8002",
"analysis": "http://analysis-agent:8003",
}
# ─── A2A 호출 헬퍼 ───────────────────────────────────
async def call_a2a_agent(agent_name: str, query: str) -> str:
url = AGENT_URLS[agent_name]
task_id = str(uuid.uuid4())
payload = {
"jsonrpc": "2.0", "id": "1", "method": "tasks/send",
"params": {
"id": task_id,
"message": {"role": "user", "parts": [{"type": "text", "text": query}]}
}
}
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(url, json=payload)
result = resp.json()["result"]
for artifact in result.get("artifacts", []):
for part in artifact.get("parts", []):
if part["type"] == "text":
return part["text"]
return ""
# ─── 노드 함수 ───────────────────────────────────────
async def planner_node(state: OrchestratorState) -> dict:
"""어떤 Agent를 호출할지 결정"""
response = await llm.ainvoke([{
"role": "user",
"content": f"""다음 질문을 처리하기 위해 어떤 에이전트가 필요한지 JSON으로 답하세요.
질문: {state['user_query']}
형식: {{"agents": ["search", "code", "analysis"], "reason": "..."}}
search: 웹 검색 필요, code: 코드 실행 필요, analysis: 데이터 분석 필요"""
}])
plan_text = response.content
return {"plan": plan_text}
async def search_node(state: OrchestratorState) -> dict:
result = await call_a2a_agent("search", state["user_query"])
return {"search_result": result}
async def code_node(state: OrchestratorState) -> dict:
result = await call_a2a_agent("code", state["user_query"])
return {"code_result": result}
async def analysis_node(state: OrchestratorState) -> dict:
result = await call_a2a_agent("analysis", state["user_query"])
return {"analysis_result": result}
async def aggregate_node(state: OrchestratorState) -> dict:
parts = []
if state.get("search_result"): parts.append(f"[검색 결과]\n{state['search_result']}")
if state.get("code_result"): parts.append(f"[코드 결과]\n{state['code_result']}")
if state.get("analysis_result"): parts.append(f"[분석 결과]\n{state['analysis_result']}")
response = await llm.ainvoke([{
"role": "user",
"content": f"다음 결과들을 통합해 최종 답변을 작성하세요:\n\n{chr(10).join(parts)}"
}])
return {"final_answer": response.content}
# ─── 그래프 구성 ─────────────────────────────────────
builder = StateGraph(OrchestratorState)
builder.add_node("planner", planner_node)
builder.add_node("search", search_node)
builder.add_node("code", code_node)
builder.add_node("analysis", analysis_node)
builder.add_node("aggregate", aggregate_node)
builder.set_entry_point("planner")
# 플래너 → 병렬 실행 (실제로는 Send API 사용)
builder.add_edge("planner", "search")
builder.add_edge("planner", "code")
builder.add_edge("planner", "analysis")
builder.add_edge("search", "aggregate")
builder.add_edge("code", "aggregate")
builder.add_edge("analysis", "aggregate")
builder.add_edge("aggregate", END)
orchestrator_graph = builder.compile()
LangGraph Send API — 동적 병렬 실행
플래너가 실행 시점에 어떤 Agent를 호출할지 결정하는 동적 패턴에는 LangGraph의 Send API를 사용합니다.
pythondynamic_dispatch.py — Send API로 동적 병렬 위임
from langgraph.types import Send
import json
async def dynamic_router(state: OrchestratorState) -> list[Send]:
"""플래너 결과를 파싱해 필요한 Agent에만 Send"""
try:
plan = json.loads(state["plan"])
agents = plan.get("agents", [])
except:
agents = ["search"] # 기본값
sends = []
for agent in agents:
if agent in ("search", "code", "analysis"):
sends.append(Send(agent, {"user_query": state["user_query"]}))
# 해당 Agent가 없으면 바로 aggregate로
return sends if sends else [Send("aggregate", state)]
# 그래프에서 conditional_edges로 등록
builder.add_conditional_edges(
"planner",
dynamic_router,
["search", "code", "analysis", "aggregate"]
)
💡 LangGraph Send API vs asyncio.gather()
- asyncio.gather() — Python 레벨 병렬화. LangGraph 그래프 외부에서 직접 제어할 때 적합
- Send API — LangGraph 그래프 내부 병렬화. 체크포인팅, 재시도, 디버깅이 LangGraph 생태계와 통합됨
- 프로덕션에서는 Send API가 LangSmith 추적, 중단/재개 기능과 자동 통합되어 더 유리
상태 공유 패턴 — A2A 컨텍스트 전달
멀티턴 대화에서 Orchestrator가 이전 결과를 Sub-Agent에 전달해야 할 때, Task의 message 파트에 컨텍스트를 포함합니다.
pythoncontext_passing.py — 이전 결과를 다음 Agent에 전달
async def analysis_node_with_context(state: OrchestratorState) -> dict:
"""검색 결과를 컨텍스트로 분석 Agent에 전달"""
context_query = f"""다음 검색 결과를 바탕으로 데이터를 분석해주세요.
[원본 질문]
{state['user_query']}
[검색 결과 컨텍스트]
{state.get('search_result', '없음')}
위 결과를 통계적으로 분석하고 인사이트를 도출해주세요."""
payload = {
"jsonrpc": "2.0", "id": "1", "method": "tasks/send",
"params": {
"id": str(uuid.uuid4()),
"message": {
"role": "user",
"parts": [
{"type": "text", "text": context_query},
# 구조화 데이터는 data 파트로 전달
{"type": "data", "data": {
"source": "search_agent",
"raw_results": state.get("search_result", "")
}}
]
}
}
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(AGENT_URLS["analysis"], json=payload)
result_text = extract_text_from_task(resp.json())
return {"analysis_result": result_text}