PROD · HITL
Human-in-the-Loop
고위험 작업은 사람의 승인을 거쳐야 합니다. 언제 에스컬레이션할지 결정하고, 비동기 승인 흐름과 감사 추적 시스템을 구현합니다.
에스컬레이션 정책 설계
모든 작업에 HITL을 적용하면 자동화 이점이 사라집니다. 위험도 기반 에스컬레이션 매트릭스로 필요한 경우에만 사람을 개입시킵니다.
| 위험도 | 예시 | 자동 처리 | HITL 필요 |
|---|---|---|---|
| 🟢 낮음 | 정보 조회, 문서 요약, FAQ 답변 | ✅ 즉시 | ❌ |
| 🟡 중간 | 이메일 초안, 데이터 분석, 코드 생성 | ✅ 기본 | 사용자 옵션 |
| 🟠 높음 | 외부 API 호출, DB 업데이트, 파일 삭제 | ⚠️ 재시도 후 | ✅ 필수 |
| 🔴 치명 | 결제 처리, 계정 삭제, 대량 데이터 변경 | ❌ | ✅ 다중 승인 |
pythonescalation.py — 위험도 기반 자동 에스컬레이션
from enum import Enum
from langchain_anthropic import ChatAnthropic
from pydantic import BaseModel
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
# 도구 → 위험도 매핑
TOOL_RISK = {
"web_search": RiskLevel.LOW,
"read_file": RiskLevel.LOW,
"write_file": RiskLevel.HIGH,
"delete_file": RiskLevel.HIGH,
"send_email": RiskLevel.HIGH,
"db_query": RiskLevel.MEDIUM,
"db_write": RiskLevel.HIGH,
"process_payment": RiskLevel.CRITICAL,
"delete_account": RiskLevel.CRITICAL,
}
class ActionRisk(BaseModel):
risk_level: str
reason: str
requires_approval: bool
async def assess_action_risk(
tool_name: str,
tool_args: dict,
context: str = ""
) -> ActionRisk:
"""도구 호출의 위험도를 평가"""
base_risk = TOOL_RISK.get(tool_name, RiskLevel.MEDIUM)
# 고위험 작업은 항상 승인 필요
if base_risk == RiskLevel.CRITICAL:
return ActionRisk(
risk_level="critical",
reason=f"{tool_name}은 고위험 작업입니다",
requires_approval=True
)
# HIGH 위험 도구는 LLM으로 컨텍스트 기반 추가 평가
if base_risk == RiskLevel.HIGH:
llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
structured = llm.with_structured_output(ActionRisk)
return await structured.ainvoke([{
"role": "user",
"content": f"""다음 도구 호출의 위험도를 평가하세요.
도구: {tool_name}
인수: {tool_args}
컨텍스트: {context}
위험도: low/medium/high/critical 중 하나
사람 승인 필요 여부: True/False"""
}])
return ActionRisk(
risk_level=base_risk.value,
reason="표준 위험도 매핑",
requires_approval=False
)
비동기 HITL API
승인 대기 중 Agent를 차단하지 않고 FastAPI + WebSocket으로 비동기 승인 흐름을 구현합니다.
pythonapproval_api.py — 비동기 승인 서버
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio, uuid, time
app = FastAPI()
# 승인 대기 큐 (프로덕션에서는 Redis)
pending: dict[str, dict] = {}
decisions: dict[str, dict] = {}
class ApprovalRequest(BaseModel):
tool_name: str
tool_args: dict
context: str
risk_level: str
timeout_seconds: int = 300
@app.post("/approval/request")
async def request_approval(req: ApprovalRequest) -> dict:
"""Agent → 승인 서버: 승인 요청 등록"""
request_id = str(uuid.uuid4())
pending[request_id] = {
"id": request_id,
"tool_name": req.tool_name,
"tool_args": req.tool_args,
"context": req.context,
"risk_level": req.risk_level,
"created_at": time.time(),
"expires_at": time.time() + req.timeout_seconds,
"status": "pending"
}
return {"request_id": request_id, "status": "pending"}
@app.post("/approval/{request_id}/decide")
async def decide_approval(
request_id: str,
decision: str, # "approve" | "reject"
reviewer_id: str,
comment: str = ""
) -> dict:
"""검토자 → 승인 서버: 결정 기록"""
if request_id not in pending:
raise HTTPException(404, "요청을 찾을 수 없습니다")
entry = pending.pop(request_id)
decisions[request_id] = {
**entry,
"decision": decision,
"reviewer_id": reviewer_id,
"comment": comment,
"decided_at": time.time(),
"status": decision
}
return {"status": "recorded", "decision": decision}
# Agent가 폴링으로 결정을 기다림
@app.get("/approval/{request_id}/status")
async def get_decision(request_id: str) -> dict:
if request_id in decisions:
return decisions[request_id]
if request_id in pending:
entry = pending[request_id]
if time.time() > entry["expires_at"]:
pending.pop(request_id)
return {"status": "timeout"}
return {"status": "pending"}
raise HTTPException(404, "요청을 찾을 수 없습니다")
LangGraph HITL 통합
pythonhitl_node.py — 위험도 평가 + 승인 대기 노드
import httpx, asyncio
APPROVAL_SERVER = "http://approval-server:8080"
async def hitl_tool_node(state) -> dict:
"""도구 실행 전 위험도 평가 → 필요 시 승인 대기"""
last = state["messages"][-1]
tool_messages = []
for call in last.tool_calls:
risk = await assess_action_risk(call["name"], call["args"])
if risk.requires_approval:
# 승인 요청 등록
async with httpx.AsyncClient() as c:
resp = await c.post(f"{APPROVAL_SERVER}/approval/request", json={
"tool_name": call["name"],
"tool_args": call["args"],
"context": str(state["messages"][-3:]),
"risk_level": risk.risk_level
})
request_id = resp.json()["request_id"]
# 폴링으로 결정 대기 (최대 5분)
decision = await wait_for_decision(request_id, timeout=300)
if decision != "approve":
tool_messages.append(ToolMessage(
tool_call_id=call["id"],
content=f"[BLOCKED] 작업이 승인되지 않았습니다: {decision}"
))
continue
# 승인됐거나 저위험 → 실행
result = await execute_tool_async(call["name"], call["args"], call["id"])
tool_messages.append(result)
return {"messages": tool_messages}
async def wait_for_decision(request_id: str, timeout: int = 300) -> str:
deadline = asyncio.get_event_loop().time() + timeout
async with httpx.AsyncClient() as c:
while asyncio.get_event_loop().time() < deadline:
resp = await c.get(f"{APPROVAL_SERVER}/approval/{request_id}/status")
data = resp.json()
if data["status"] != "pending":
return data["status"]
await asyncio.sleep(5) # 5초마다 폴링
return "timeout"