A2A · STREAMING
Streaming & SSE
장기 실행 Task를 위한 SSE 스트리밍 구현, 두 가지 이벤트 타입 처리, Push Notification 설정 방법을 다룹니다.
왜 스트리밍이 필요한가
웹 검색 + 요약처럼 수십 초 걸리는 Task에 동기 HTTP를 쓰면 클라이언트가 타임아웃됩니다. tasks/sendSubscribe는 Task를 서버에 접수한 뒤 SSE 스트림으로 진행 상황을 실시간으로 받습니다. 최종 결과도 같은 스트림으로 수신합니다.
SSE 이벤트 타입 2가지
| 이벤트 타입 | 언제 | 페이로드 |
|---|---|---|
| TaskStatusUpdateEvent | 상태 전이 시 (submitted→working→completed 등) | id, status(state, message), final: bool |
| TaskArtifactUpdateEvent | 결과물이 생성/갱신될 때 (스트리밍 출력 포함) | id, artifact(parts, index, lastChunk) |
서버 측 — SSE 스트림 생성
pythonFastAPI SSE 서버 구현
import asyncio, json, uuid
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def sse_event(event_type: str, data: dict) -> str:
"""SSE 이벤트 포맷으로 변환"""
return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
async def task_stream_generator(task_id: str, message: str):
# 1. submitted 알림
yield await sse_event("TaskStatusUpdateEvent", {
"id": task_id,
"status": {"state": "submitted"},
"final": False
})
# 2. working 알림
yield await sse_event("TaskStatusUpdateEvent", {
"id": task_id,
"status": {
"state": "working",
"message": {
"role": "agent",
"parts": [{"type": "text", "text": "분석을 시작합니다..."}]
}
},
"final": False
})
# 3. 스트리밍 Artifact 전송 (청크 단위)
tokens = ["분석 ", "결과: ", "긍정적 ", "감정 ", "감지됨"]
for i, token in enumerate(tokens):
is_last = (i == len(tokens) - 1)
yield await sse_event("TaskArtifactUpdateEvent", {
"id": task_id,
"artifact": {
"parts": [{"type": "text", "text": token}],
"index": 0,
"lastChunk": is_last,
"append": i > 0 # 첫 청크 외엔 이어붙이기
}
})
await asyncio.sleep(0.1)
# 4. completed 알림 (final=True → 스트림 종료)
yield await sse_event("TaskStatusUpdateEvent", {
"id": task_id,
"status": {"state": "completed"},
"final": True # ← 이 이벤트 후 SSE 스트림 종료
})
@app.post("/")
async def handle_request(request: Request):
body = await request.json()
if body.get("method") == "tasks/sendSubscribe":
params = body["params"]
task_id = params["id"]
message = params["message"]["parts"][0]["text"]
return StreamingResponse(
task_stream_generator(task_id, message),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
클라이언트 측 — SSE 수신
pythonhttpx-sse로 스트림 소비
import httpx, json, uuid
from httpx_sse import aconnect_sse # pip install httpx-sse
async def stream_task(agent_url: str, message: str) -> str:
task_id = str(uuid.uuid4())
payload = {
"jsonrpc": "2.0", "id": 1,
"method": "tasks/sendSubscribe",
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
}
artifact_text = ""
async with httpx.AsyncClient(timeout=120) as client:
async with aconnect_sse(client, "POST", agent_url, json=payload) as event_source:
async for sse in event_source.aiter_sse():
data = json.loads(sse.data)
if sse.event == "TaskStatusUpdateEvent":
state = data["status"]["state"]
print(f"[{state.upper()}]")
if data.get("final"):
break
elif sse.event == "TaskArtifactUpdateEvent":
for part in data["artifact"]["parts"]:
if part["type"] == "text":
print(part["text"], end="", flush=True)
artifact_text += part["text"]
return artifact_text
Push Notification 설정
클라이언트가 SSE를 유지할 수 없는 경우(예: 서버리스 환경), Webhook URL을 등록하면 Task 완료 시 HTTP POST로 결과를 받습니다.
pythonPush Notification 등록
# tasks/pushNotification/set 메서드로 Webhook 등록
push_payload = {
"jsonrpc": "2.0", "id": 2,
"method": "tasks/pushNotification/set",
"params": {
"id": task_id,
"pushNotificationConfig": {
"url": "https://my-app.example.com/webhook/a2a",
"authentication": {
"schemes": ["bearer"],
"credentials": "my-webhook-secret"
}
}
}
}
resp = await client.post(agent_url, json=push_payload)
# 이제 Task 완료/실패 시 webhook URL로 TaskStatusUpdateEvent POST