A2A · STREAMING

Streaming & SSE

📡 SSE 이벤트 구조 💻 서버/클라이언트 코드 🔔 Push Notification

장기 실행 Task를 위한 SSE 스트리밍 구현, 두 가지 이벤트 타입 처리, Push Notification 설정 방법을 다룹니다.

왜 스트리밍이 필요한가

웹 검색 + 요약처럼 수십 초 걸리는 Task에 동기 HTTP를 쓰면 클라이언트가 타임아웃됩니다. tasks/sendSubscribe는 Task를 서버에 접수한 뒤 SSE 스트림으로 진행 상황을 실시간으로 받습니다. 최종 결과도 같은 스트림으로 수신합니다.

SSE 이벤트 타입 2가지

이벤트 타입언제페이로드
TaskStatusUpdateEvent 상태 전이 시 (submitted→working→completed 등) id, status(state, message), final: bool
TaskArtifactUpdateEvent 결과물이 생성/갱신될 때 (스트리밍 출력 포함) id, artifact(parts, index, lastChunk)

서버 측 — SSE 스트림 생성

pythonFastAPI SSE 서버 구현
import asyncio, json, uuid
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def sse_event(event_type: str, data: dict) -> str:
    """SSE 이벤트 포맷으로 변환"""
    return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"

async def task_stream_generator(task_id: str, message: str):
    # 1. submitted 알림
    yield await sse_event("TaskStatusUpdateEvent", {
        "id": task_id,
        "status": {"state": "submitted"},
        "final": False
    })

    # 2. working 알림
    yield await sse_event("TaskStatusUpdateEvent", {
        "id": task_id,
        "status": {
            "state": "working",
            "message": {
                "role": "agent",
                "parts": [{"type": "text", "text": "분석을 시작합니다..."}]
            }
        },
        "final": False
    })

    # 3. 스트리밍 Artifact 전송 (청크 단위)
    tokens = ["분석 ", "결과: ", "긍정적 ", "감정 ", "감지됨"]
    for i, token in enumerate(tokens):
        is_last = (i == len(tokens) - 1)
        yield await sse_event("TaskArtifactUpdateEvent", {
            "id": task_id,
            "artifact": {
                "parts": [{"type": "text", "text": token}],
                "index": 0,
                "lastChunk": is_last,
                "append": i > 0  # 첫 청크 외엔 이어붙이기
            }
        })
        await asyncio.sleep(0.1)

    # 4. completed 알림 (final=True → 스트림 종료)
    yield await sse_event("TaskStatusUpdateEvent", {
        "id": task_id,
        "status": {"state": "completed"},
        "final": True  # ← 이 이벤트 후 SSE 스트림 종료
    })

@app.post("/")
async def handle_request(request: Request):
    body = await request.json()
    if body.get("method") == "tasks/sendSubscribe":
        params = body["params"]
        task_id = params["id"]
        message = params["message"]["parts"][0]["text"]
        return StreamingResponse(
            task_stream_generator(task_id, message),
            media_type="text/event-stream",
            headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
        )

클라이언트 측 — SSE 수신

pythonhttpx-sse로 스트림 소비
import httpx, json, uuid
from httpx_sse import aconnect_sse  # pip install httpx-sse

async def stream_task(agent_url: str, message: str) -> str:
    task_id = str(uuid.uuid4())
    payload = {
        "jsonrpc": "2.0", "id": 1,
        "method": "tasks/sendSubscribe",
        "params": {
            "id": task_id,
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            }
        }
    }

    artifact_text = ""

    async with httpx.AsyncClient(timeout=120) as client:
        async with aconnect_sse(client, "POST", agent_url, json=payload) as event_source:
            async for sse in event_source.aiter_sse():
                data = json.loads(sse.data)

                if sse.event == "TaskStatusUpdateEvent":
                    state = data["status"]["state"]
                    print(f"[{state.upper()}]")
                    if data.get("final"):
                        break

                elif sse.event == "TaskArtifactUpdateEvent":
                    for part in data["artifact"]["parts"]:
                        if part["type"] == "text":
                            print(part["text"], end="", flush=True)
                            artifact_text += part["text"]

    return artifact_text

Push Notification 설정

클라이언트가 SSE를 유지할 수 없는 경우(예: 서버리스 환경), Webhook URL을 등록하면 Task 완료 시 HTTP POST로 결과를 받습니다.

pythonPush Notification 등록
# tasks/pushNotification/set 메서드로 Webhook 등록
push_payload = {
    "jsonrpc": "2.0", "id": 2,
    "method": "tasks/pushNotification/set",
    "params": {
        "id": task_id,
        "pushNotificationConfig": {
            "url": "https://my-app.example.com/webhook/a2a",
            "authentication": {
                "schemes": ["bearer"],
                "credentials": "my-webhook-secret"
            }
        }
    }
}
resp = await client.post(agent_url, json=push_payload)
# 이제 Task 완료/실패 시 webhook URL로 TaskStatusUpdateEvent POST