MINERVA FastAPI 서빙 레이어

라우터, 캐싱, warmup, SSE 스트리밍

MINERVA의 FastAPI 서빙 레이어는 에이전트 코어와 프론트엔드를 연결한다. 7개 라우터 모듈, lifespan warmup, 에이전트 캐싱, SSE 스트리밍 응답, A/B 실험 연동, CORS 설정을 다룬다.

Agent
저자

Kwangmin Kim

공개

2026년 05월 05일

1 서빙 레이어의 역할

서빙 레이어는 3-Layer 아키텍처의 중간 계층이다. 프론트엔드(React)의 HTTP 요청을 받아 에이전트 코어를 호출하고, 결과를 HTTP 응답으로 변환한다.

React (Layer 1)
    │ HTTP 요청
    ▼
FastAPI (Layer 2 — 이 포스트)
    │ BaseAgent.run() / .stream()
    ▼
Agent Core (Layer 3)

서빙 레이어가 담당하는 것:

  • HTTP 요청/응답 처리 (Pydantic 검증 포함)
  • 에이전트 라이프사이클 관리 (초기화, 캐싱, 정리)
  • SSE 스트리밍 전송
  • CORS 미들웨어
  • A/B 실험 요청 라우팅

서빙 레이어가 담당하지 않는 것:

  • RAG 검색 로직 (Agent Core의 책임)
  • LLM 호출 (Agent Core의 책임)
  • UI 렌더링 (Frontend의 책임)

2 main.py 구조

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: 에이전트 초기화
    print("에이전트 초기화 중...")
    agents = {}
    agents["qna_chatbot"] = QnaChatbotAgent(load_config("qna_chatbot"))
    agents["data_standardizer"] = DataStandardizerAgent(load_config("data_standardizer"))

    # Warmup: cold start 방지
    for agent in agents.values():
        agent.warmup()

    app.state.agents = agents
    app.state.agent_cache = AgentCache()

    yield

    # Shutdown: 리소스 정리
    print("리소스 정리 중...")

app = FastAPI(title="MINERVA API", lifespan=lifespan)

# CORS 미들웨어
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 라우터 등록
app.include_router(health.router)
app.include_router(qna_chatbot.router)
app.include_router(data_standardizer.router)
app.include_router(monitoring.router)
app.include_router(records.router)
app.include_router(feedback.router)
app.include_router(ab.router)

3 Lifespan과 Warmup

3.1 왜 Warmup이 필요한가

AI Agent 서버는 첫 요청 시 다음 작업이 발생한다:

  • 벡터 인덱스 로딩 (FAISS 파일 → 메모리)
  • BM25 인덱스 구축 (문서 토크나이징)
  • LLM 연결 확인 (Azure OpenAI 인증)
  • Reranker 모델 로딩

이 초기화가 첫 사용자 요청에서 발생하면 10~30초의 지연이 생긴다. Lifespan의 startup 단계에서 미리 수행하면 사용자는 즉시 응답을 받을 수 있다.

3.2 Warmup 구현

class QnaChatbotAgent(BaseAgent):
    def warmup(self):
        """서버 시작 시 호출 — cold start 방지"""
        # 벡터 인덱스 로딩
        self.pipeline.retriever.load_index()

        # BM25 인덱스 구축
        self.pipeline.retriever.build_bm25_index()

        # 테스트 쿼리로 전체 파이프라인 검증
        test_query = Query(text="테스트")
        _ = self.run(test_query)

        print(f"[{self.name}] warmup 완료")

테스트 쿼리를 실행하면 LLM 연결, 임베딩 모델 로딩 등 지연 초기화(lazy initialization)되는 컴포넌트까지 모두 준비된다.

4 라우터 구조

7개 라우터가 각각 하나의 도메인을 담당한다.

라우터 Prefix 역할
health.py /, /health 서버 상태·서비스 정보 확인
qna_chatbot.py /agents/qna_chatbot QnA 채팅 실행/스트리밍, 문서 목록·목차 조회
data_standardizer.py /agents/data_standardizer 데이터/코드 표준화 (mode 파라미터 자동 감지)
monitoring.py /monitoring 사용량·지연 시간·인용률 등 10개 지표 집계
records.py /records 개별 호출 드릴다운 (agent/status/metric/days/q 필터)
feedback.py /feedback, /feedback/comment 사용자 피드백 수집 (helpful/unhelpful + 텍스트 코멘트)
ab.py /monitoring/ab A/B 실험 목록 + arm별 메트릭·통계 검정

각 에이전트 라우터에는 핵심 실행 엔드포인트(/run, /stream) 외에 부가 정보 엔드포인트가 추가된다. 예를 들어 qna_chatbot에는 /documents(메타 목록)와 /documents/{id}/outline(문서 목차)가 있어 프론트엔드의 참조 패널이 호출한다.

거버넌스 프롬프트 매니페스트 (Phase 10.92)

QnA 라우터는 문서 출처별로 시스템 프롬프트를 다르게 적용한다. DEFAULT_DOCUMENTS_PROMPTS = {"DAMA": "governance"} 같은 매핑으로, DAMA 거버넌스 문서가 검색 결과에 포함되면 거버넌스 전용 프롬프트가 활성화된다. 이 분기는 라우터에서 RAG 결과를 본 뒤 결정되므로, 단일 에이전트가 다중 도메인 톤을 동적으로 전환할 수 있다.

4.1 에이전트 라우터 패턴

모든 에이전트 라우터는 동일한 패턴을 따른다.

# routers/qna_chatbot.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse

router = APIRouter(prefix="/agents/qna_chatbot", tags=["QnA Chatbot"])

@router.post("/run")
def run(request: RunRequest, raw_request: Request):
    agent = _get_agent(raw_request, request)
    query = Query(
        text=request.text,
        history=request.history,
        user_id=request.user_id,
    )
    response = agent.run(query)
    return RunResponse(
        response=response,
        experiment_id=request.agent_params.get("experiment_id"),
        arm_id=request.agent_params.get("arm_id"),
    )

@router.post("/stream")
def stream(request: RunRequest, raw_request: Request):
    agent = _get_agent(raw_request, request)
    query = Query(text=request.text, history=request.history)

    async def generate():
        async for event in agent.stream(query):
            yield f"event: {event.event}\ndata: {event.data}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

def _get_agent(raw_request: Request, request: RunRequest) -> BaseAgent:
    """A/B 실험이 있으면 해당 arm의 에이전트를, 없으면 기본 에이전트를 반환"""
    experiment_id = request.agent_params.get("experiment_id")
    arm_id = request.agent_params.get("arm_id")

    if experiment_id and arm_id:
        return raw_request.app.state.agent_cache.get_or_create(
            QnaChatbotAgent, experiment_id, arm_id,
            load_experiment_config(experiment_id, arm_id),
        )

    return raw_request.app.state.agents["qna_chatbot"]

_get_agent가 A/B 실험 연동의 핵심이다. 실험 파라미터가 있으면 해당 설정으로 생성된 에이전트를, 없으면 기본 에이전트를 반환한다.

5 SSE 스트리밍

5.1 StreamingResponse

from fastapi.responses import StreamingResponse

@router.post("/stream")
def stream(request: RunRequest, raw_request: Request):
    agent = _get_agent(raw_request, request)
    query = Query(text=request.text, history=request.history)

    async def generate():
        try:
            async for event in agent.stream(query):
                # 클라이언트 연결 해제 확인
                if await raw_request.is_disconnected():
                    break
                yield f"event: {event.event}\ndata: {event.data}\n\n"
        except Exception as e:
            yield f'event: error\ndata: {{"message": "{str(e)}"}}\n\n'

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

주요 처리:

  • is_disconnected(): 사용자가 페이지를 떠나면 스트리밍을 중단한다. LLM 호출 비용을 절약한다
  • try/except: 스트리밍 중 에러가 발생하면 error 이벤트로 클라이언트에 전달한다
  • Cache-Control: no-cache: 브라우저가 SSE 응답을 캐시하지 않도록 한다

6 에이전트 캐싱

A/B 실험에서 같은 설정의 에이전트를 매 요청마다 초기화하면 벡터 인덱스 로딩 등으로 수 초가 소요된다. (experiment_name, arm_id) 키로 캐싱하여 초기화를 1회로 제한한다.

import threading

class AgentCache:
    def __init__(self):
        self._cache: dict[tuple[str, str], BaseAgent] = {}
        self._lock = threading.Lock()

    def get_or_create(
        self,
        agent_class: type[BaseAgent],
        experiment_name: str,
        arm_id: str,
        config: RAGConfig,
    ) -> BaseAgent:
        key = (experiment_name, arm_id)

        with self._lock:
            if key not in self._cache:
                agent = agent_class(config)
                agent.warmup()
                self._cache[key] = agent

        return self._cache[key]

    def invalidate(self, experiment_name: str):
        """실험 종료 시 해당 에이전트들을 캐시에서 제거"""
        with self._lock:
            keys_to_remove = [
                k for k in self._cache if k[0] == experiment_name
            ]
            for key in keys_to_remove:
                del self._cache[key]

threading.Lock()이 필요한 이유: uvicorn은 멀티워커로 실행될 수 있고, 같은 arm에 대한 첫 요청이 동시에 도착하면 에이전트가 두 번 생성될 수 있다.

7 에러 처리

from fastapi import HTTPException

@router.post("/run")
def run(request: RunRequest, raw_request: Request):
    try:
        agent = _get_agent(raw_request, request)
        query = Query(text=request.text, history=request.history)
        response = agent.run(query)
        return RunResponse(response=response)

    except KeyError as e:
        raise HTTPException(status_code=404, detail=f"에이전트를 찾을 수 없다: {e}")

    except TimeoutError:
        raise HTTPException(status_code=503, detail="LLM 응답 시간 초과")

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"내부 오류: {str(e)}")

에러 종류에 따라 적절한 HTTP 상태 코드를 반환한다. 프론트엔드는 상태 코드에 따라 다른 에러 메시지를 표시한다.

8 전체 파일 구조

src/services/api/
├── main.py              # FastAPI 앱, lifespan, CORS, 라우터 등록
├── schemas.py           # RunRequest, RunResponse 등 HTTP 스키마
└── routers/
    ├── health.py        # GET /, GET /health
    ├── qna_chatbot.py   # POST /agents/qna_chatbot/{run,stream}
    │                    # GET  /agents/qna_chatbot/documents
    │                    # GET  /agents/qna_chatbot/documents/{id}/outline
    ├── data_standardizer.py  # POST /agents/data_standardizer/{run,stream}
    ├── monitoring.py    # GET  /monitoring/metrics  (10개 지표 집계)
    ├── records.py       # GET  /records  (drill-down)
    ├── feedback.py      # POST /feedback, /feedback/comment
    └── ab.py            # GET  /monitoring/ab/experiments
                         # GET  /monitoring/ab/{experiment}  (arm별 메트릭+통계)

각 파일이 하나의 도메인만 담당하므로, 새 기능을 추가할 때 다른 파일을 수정할 필요가 없다. monitoringab가 같은 /monitoring 트리 아래로 들어가는 이유는 운영 대시보드에서 사용량 지표와 A/B 결과를 한 화면에서 보여주기 위함이다 — 이는 곧 프론트엔드 Monitoring.tsxTests.tsx가 같은 prefix의 데이터를 소비한다는 뜻이다.

9 관련 주제

선행 지식 (Phase A)

후속 주제

다른 카테고리 연결

Subscribe

Enjoy this blog? Get notified of new posts by email: