PROD · HITL

Human-in-the-Loop

🚦 에스컬레이션 정책 🔔 비동기 승인 API 📋 감사 로그

고위험 작업은 사람의 승인을 거쳐야 합니다. 언제 에스컬레이션할지 결정하고, 비동기 승인 흐름과 감사 추적 시스템을 구현합니다.

에스컬레이션 정책 설계

모든 작업에 HITL을 적용하면 자동화 이점이 사라집니다. 위험도 기반 에스컬레이션 매트릭스로 필요한 경우에만 사람을 개입시킵니다.

위험도예시자동 처리HITL 필요
🟢 낮음정보 조회, 문서 요약, FAQ 답변✅ 즉시
🟡 중간이메일 초안, 데이터 분석, 코드 생성✅ 기본사용자 옵션
🟠 높음외부 API 호출, DB 업데이트, 파일 삭제⚠️ 재시도 후✅ 필수
🔴 치명결제 처리, 계정 삭제, 대량 데이터 변경✅ 다중 승인
pythonescalation.py — 위험도 기반 자동 에스컬레이션
from enum import Enum
from langchain_anthropic import ChatAnthropic
from pydantic import BaseModel

class RiskLevel(Enum):
    LOW      = "low"
    MEDIUM   = "medium"
    HIGH     = "high"
    CRITICAL = "critical"

# 도구 → 위험도 매핑
TOOL_RISK = {
    "web_search":       RiskLevel.LOW,
    "read_file":        RiskLevel.LOW,
    "write_file":       RiskLevel.HIGH,
    "delete_file":      RiskLevel.HIGH,
    "send_email":       RiskLevel.HIGH,
    "db_query":         RiskLevel.MEDIUM,
    "db_write":         RiskLevel.HIGH,
    "process_payment":  RiskLevel.CRITICAL,
    "delete_account":   RiskLevel.CRITICAL,
}

class ActionRisk(BaseModel):
    risk_level: str
    reason: str
    requires_approval: bool

async def assess_action_risk(
    tool_name: str,
    tool_args: dict,
    context: str = ""
) -> ActionRisk:
    """도구 호출의 위험도를 평가"""
    base_risk = TOOL_RISK.get(tool_name, RiskLevel.MEDIUM)

    # 고위험 작업은 항상 승인 필요
    if base_risk == RiskLevel.CRITICAL:
        return ActionRisk(
            risk_level="critical",
            reason=f"{tool_name}은 고위험 작업입니다",
            requires_approval=True
        )

    # HIGH 위험 도구는 LLM으로 컨텍스트 기반 추가 평가
    if base_risk == RiskLevel.HIGH:
        llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
        structured = llm.with_structured_output(ActionRisk)
        return await structured.ainvoke([{
            "role": "user",
            "content": f"""다음 도구 호출의 위험도를 평가하세요.

도구: {tool_name}
인수: {tool_args}
컨텍스트: {context}

위험도: low/medium/high/critical 중 하나
사람 승인 필요 여부: True/False"""
        }])

    return ActionRisk(
        risk_level=base_risk.value,
        reason="표준 위험도 매핑",
        requires_approval=False
    )

비동기 HITL API

승인 대기 중 Agent를 차단하지 않고 FastAPI + WebSocket으로 비동기 승인 흐름을 구현합니다.

pythonapproval_api.py — 비동기 승인 서버
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio, uuid, time

app = FastAPI()

# 승인 대기 큐 (프로덕션에서는 Redis)
pending: dict[str, dict] = {}
decisions: dict[str, dict] = {}

class ApprovalRequest(BaseModel):
    tool_name: str
    tool_args: dict
    context: str
    risk_level: str
    timeout_seconds: int = 300

@app.post("/approval/request")
async def request_approval(req: ApprovalRequest) -> dict:
    """Agent → 승인 서버: 승인 요청 등록"""
    request_id = str(uuid.uuid4())
    pending[request_id] = {
        "id":         request_id,
        "tool_name":  req.tool_name,
        "tool_args":  req.tool_args,
        "context":    req.context,
        "risk_level": req.risk_level,
        "created_at": time.time(),
        "expires_at": time.time() + req.timeout_seconds,
        "status":     "pending"
    }
    return {"request_id": request_id, "status": "pending"}

@app.post("/approval/{request_id}/decide")
async def decide_approval(
    request_id: str,
    decision: str,   # "approve" | "reject"
    reviewer_id: str,
    comment: str = ""
) -> dict:
    """검토자 → 승인 서버: 결정 기록"""
    if request_id not in pending:
        raise HTTPException(404, "요청을 찾을 수 없습니다")

    entry = pending.pop(request_id)
    decisions[request_id] = {
        **entry,
        "decision":    decision,
        "reviewer_id": reviewer_id,
        "comment":     comment,
        "decided_at":  time.time(),
        "status":      decision
    }
    return {"status": "recorded", "decision": decision}

# Agent가 폴링으로 결정을 기다림
@app.get("/approval/{request_id}/status")
async def get_decision(request_id: str) -> dict:
    if request_id in decisions:
        return decisions[request_id]
    if request_id in pending:
        entry = pending[request_id]
        if time.time() > entry["expires_at"]:
            pending.pop(request_id)
            return {"status": "timeout"}
        return {"status": "pending"}
    raise HTTPException(404, "요청을 찾을 수 없습니다")

LangGraph HITL 통합

pythonhitl_node.py — 위험도 평가 + 승인 대기 노드
import httpx, asyncio

APPROVAL_SERVER = "http://approval-server:8080"

async def hitl_tool_node(state) -> dict:
    """도구 실행 전 위험도 평가 → 필요 시 승인 대기"""
    last = state["messages"][-1]
    tool_messages = []

    for call in last.tool_calls:
        risk = await assess_action_risk(call["name"], call["args"])

        if risk.requires_approval:
            # 승인 요청 등록
            async with httpx.AsyncClient() as c:
                resp = await c.post(f"{APPROVAL_SERVER}/approval/request", json={
                    "tool_name":  call["name"],
                    "tool_args":  call["args"],
                    "context":    str(state["messages"][-3:]),
                    "risk_level": risk.risk_level
                })
                request_id = resp.json()["request_id"]

            # 폴링으로 결정 대기 (최대 5분)
            decision = await wait_for_decision(request_id, timeout=300)

            if decision != "approve":
                tool_messages.append(ToolMessage(
                    tool_call_id=call["id"],
                    content=f"[BLOCKED] 작업이 승인되지 않았습니다: {decision}"
                ))
                continue

        # 승인됐거나 저위험 → 실행
        result = await execute_tool_async(call["name"], call["args"], call["id"])
        tool_messages.append(result)

    return {"messages": tool_messages}

async def wait_for_decision(request_id: str, timeout: int = 300) -> str:
    deadline = asyncio.get_event_loop().time() + timeout
    async with httpx.AsyncClient() as c:
        while asyncio.get_event_loop().time() < deadline:
            resp = await c.get(f"{APPROVAL_SERVER}/approval/{request_id}/status")
            data = resp.json()
            if data["status"] != "pending":
                return data["status"]
            await asyncio.sleep(5)  # 5초마다 폴링
    return "timeout"