MINERVA BaseAgent 계약 패턴

ABC + Pydantic으로 에이전트 인터페이스를 정의한다

MINERVA의 모든 에이전트는 BaseAgent ABC를 상속하고 Pydantic 스키마(Query, Response, StreamEvent)를 사용한다. 이 계약 기반 설계로 에이전트 추가, 서빙 레이어 연결, A/B 실험 교체가 일관되게 이루어진다. ABC와 Pydantic의 결합 패턴, 구현 예시, 설계 이점을 정리한다.

Agent
저자

Kwangmin Kim

공개

2026년 05월 05일

1 왜 계약(Contract)이 필요한가

MINERVA에는 여러 에이전트가 있다: QnA Chatbot, Data Standardizer, 그리고 앞으로 추가될 에이전트들. 각 에이전트의 내부 로직은 다르지만, 외부에서 호출하는 방식은 동일해야 한다.

계약이 없으면 발생하는 문제:

  • 에이전트마다 메서드 이름이 다르다 (run, execute, process, generate)
  • 입출력 형식이 다르다 (dict, tuple, 커스텀 클래스)
  • 서빙 레이어가 에이전트마다 다른 코드를 작성해야 한다
  • 새 에이전트 추가 시 서빙 레이어, 프론트엔드까지 수정해야 한다

계약이 있으면:

  • 모든 에이전트가 run(Query) -> Response 인터페이스를 따른다
  • 서빙 레이어는 에이전트 종류를 몰라도 동일하게 호출할 수 있다
  • 새 에이전트는 계약만 구현하면 자동으로 서빙된다

2 계약 정의: contracts.py

2.1 Pydantic 스키마

import uuid
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field

class ConversationTurn(BaseModel):
    role: str           # "user" | "assistant"
    content: str

class Query(BaseModel):
    text: str
    history: list[ConversationTurn] = Field(default_factory=list)
    agent_params: dict[str, Any] = Field(default_factory=dict)

    # A/B 실험 맥락
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    experiment_id: Optional[str] = None
    arm_id: Optional[str] = None

class Citation(BaseModel):
    index: int                                    # 본문 [1], [2] 표시 번호
    content: str
    metadata: dict[str, Any] = Field(default_factory=dict)
    score: Optional[float] = None                 # 원 cosine — 로깅·실험·threshold 기준
    display_score: Optional[float] = None         # UI 표시용 보정 점수 (relevance.py)
    section: Optional[str] = None                 # 예: "§7.2.4"

class Response(BaseModel):
    text: str
    citations: list[Citation] = Field(default_factory=list)

    # 관찰성·분석
    run_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
    model: Optional[str] = None                   # 실제 사용된 LLM deployment
    latency_ms: Optional[int] = None              # 첫 요청부터 done 까지
    ttft_ms: Optional[int] = None                 # Time to First Token
    input_tokens: Optional[int] = None
    output_tokens: Optional[int] = None
    arm_id: Optional[str] = None                  # Query.arm_id 복사 (사후 분석용)

    # Agent별 자유 출력
    agent_data: dict[str, Any] = Field(default_factory=dict)

    timestamp: datetime = Field(default_factory=datetime.now)

class StreamEvent(BaseModel):
    type: str                                     # "token" | "citation" | "done" | "error"
    text: Optional[str] = None                    # type="token" 시 토큰 조각
    citation: Optional[Citation] = None           # type="citation" 시 인용
    response: Optional[Response] = None           # type="done" 시 최종 Response 전체
    error: Optional[str] = None                   # type="error" 시 메시지

각 모델의 역할:

  • Query: 에이전트에 전달하는 입력. text만 필수이다. agent_params는 에이전트별 자유 입력(예: Data Standardizer의 mode: "data" | "code")을 담는다.
  • Citation: 답변의 근거 문서. index는 본문의 [1]·[2]와 매칭되는 번호다. score는 원 cosine similarity(로깅·실험에 사용), display_score는 한국어 임베딩의 0.5 대가 “매우 관련”임을 반영하는 UI 보정 점수다 — 두 점수를 분리해 원 점수를 보존한다.
  • Response: 에이전트가 반환하는 출력. run_id는 자동 생성되며 사용자 피드백 매칭 키로 쓰인다. latency_ms는 전체 완료 시간, ttft_ms는 첫 토큰까지의 시간으로 사용자 체감 응답성에 직접 영향을 미친다. agent_data는 에이전트별 자유 출력(예: Standardizer의 {"physical_name": ...})이다.
  • StreamEvent: SSE 이벤트의 분리 필드 표현. event/data 문자열 페어가 아니라 type 필드로 분기하고 페이로드는 타입별 전용 필드(text/citation/response/error)에 담는다 — Pydantic 직렬화로 자동 검증된다.

2.2 ABC 인터페이스

from abc import ABC, abstractmethod
from typing import Iterator, Optional

class BaseAgent(ABC):
    """모든 Agent의 공통 계약.

    Subclass 필수: name (식별자), run(query) → Response.
    선택: domain (도메인 공유 자원 키), stream(query) (override 시 토큰 단위).
    """

    name: str = ""                              # subclass가 반드시 덮어씀
    domain: Optional[str] = None                # 예: "standardization"

    @abstractmethod
    def run(self, query: Query) -> Response:
        """동기 실행 — 전체 응답을 한 번에 반환한다."""
        ...

    def stream(self, query: Query) -> Iterator[StreamEvent]:
        """스트리밍 실행. 기본 구현은 run()을 감싸 done 이벤트만 1개 yield.

        실제 토큰 스트리밍이 필요한 Agent는 이 메서드를 override한다.
        """
        response = self.run(query)
        yield StreamEvent(type="done", response=response)

핵심 설계 결정 두 가지:

  • run만 추상: stream은 기본 구현을 제공한다. 토큰 스트리밍이 어렵거나 불필요한 에이전트(배치 표준화, 정적 분석 등)도 run만 구현하면 SSE 라우터가 동작한다 — done 이벤트 한 번으로 응답이 전달된다. 토큰 단위 UX가 필요한 QnA Chatbot이나 Data Standardizer만 stream을 override한다.
  • domain 클래스 속성: 같은 도메인(예: "standardization")에 속한 에이전트들이 RAG 인덱스·약어 사전·도메인 분류기 같은 무거운 자원을 공유할 수 있게 하는 키다. 인스턴스화 시점에 도메인별 자원이 주입된다.

3 에이전트 구현

3.1 QnA Chatbot — 단일 LCEL 체인

import time
from typing import Iterator

class QnaChatbotAgent(BaseAgent):
    name = "qna_chatbot"
    domain = "standardization"

    def __init__(self, config: RAGConfig):
        self.pipeline = RAGPipeline(config)
        self.llm = self._init_llm(config.llm)

    def run(self, query: Query) -> Response:
        start = time.time()
        ttft_start = time.time()

        # RAG 파이프라인 실행 (검색·리랭크·parent 매핑)
        docs, chain, chain_inputs, model_used = self._prepare(query)
        answer = chain.invoke(chain_inputs)
        ttft_ms = int((time.time() - ttft_start) * 1000)

        return Response(
            text=answer,
            citations=self._build_citations(docs, answer),
            model=model_used,
            ttft_ms=ttft_ms,
            latency_ms=int((time.time() - start) * 1000),
            arm_id=query.arm_id,
        )

    def stream(self, query: Query) -> Iterator[StreamEvent]:
        start = time.time()
        docs, chain, chain_inputs, model_used = self._prepare(query)

        accumulated = ""
        for chunk in chain.stream(chain_inputs):
            accumulated += chunk
            yield StreamEvent(type="token", text=chunk)

        # 본문 마커([1, §...])를 보고 실제 인용된 문서만 추리는 사후 필터링
        citations = self._build_citations(docs, accumulated)
        for cit in citations:
            yield StreamEvent(type="citation", citation=cit)

        # 완료 시 Response 전체를 done 이벤트로 — 클라이언트는 latency·tokens·model을 한 번에 수신
        yield StreamEvent(
            type="done",
            response=Response(
                text=accumulated,
                citations=citations,
                model=model_used,
                latency_ms=int((time.time() - start) * 1000),
                arm_id=query.arm_id,
            ),
        )

3.2 Data Standardizer — Supervisor 패턴

QnA가 단일 LCEL 체인이라면, Data Standardizer는 하위 에이전트(sub_agent)를 조율하는 Supervisor다. 같은 BaseAgent 계약을 따르지만 내부는 여러 단계로 분해된다.

class DataStandardizerAgent(BaseAgent):
    name = "data_standardizer"
    domain = "standardization"

    def __init__(self, config: RAGConfig):
        self._config = config
        self._recommender = None      # RagRecommender — lazy init
        self._auditor = None          # DomainAuditor — lazy init
        self._domain_cache: dict[str, tuple[str, float] | None] = {}

    def run(self, query: Query) -> Response:
        start = time.time()
        recommender = self._ensure_recommender()
        mode = _resolve_mode(query)               # "data" | "code" 자동 감지

        # 1. RAG 추천 (sub_agent)
        raw_text, docs = recommender.run(query.text, mode=mode)

        # 2. 후처리 — 물리명 생성 + ALBERT 도메인 분류 (sub_agents/post_processing)
        processed = _apply_post_processing(raw_text, domain_cache=self._domain_cache)

        # 3. 단건 모드일 때만 도메인 감사 (LLM 비용 절약)
        if _is_full_format(processed):
            processed = self._ensure_auditor().audit_and_fix(processed)

        return Response(
            text=processed,
            citations=self._build_citations(docs, processed),
            latency_ms=int((time.time() - start) * 1000),
            arm_id=query.arm_id,
            agent_data={"mode": mode},            # 자유 필드로 mode 노출
        )

    def stream(self, query: Query) -> Iterator[StreamEvent]:
        recommender = self._ensure_recommender()
        mode = _resolve_mode(query)

        # 토큰은 그대로 흘려 보내되 — 후처리는 done 시점에 일괄
        raw_text = ""
        docs = []
        for item in recommender.stream(query.text, mode=mode):
            if isinstance(item, str):
                raw_text += item
                yield StreamEvent(type="token", text=item)
            else:
                _, docs = item                    # 마지막 (full_text, docs)

        processed = _apply_post_processing(raw_text, domain_cache=self._domain_cache)
        if _is_full_format(processed):
            processed = self._ensure_auditor().audit_and_fix(processed)

        yield StreamEvent(
            type="done",
            response=Response(text=processed, citations=self._build_citations(docs, processed),
                              arm_id=query.arm_id, agent_data={"mode": mode}),
        )

토큰 스트리밍은 사용자에게 즉시 응답을 보여주지만, 후처리(표 정렬, 물리명 부여, 도메인 분류)는 표 전체를 봐야 일관되게 적용된다. 그래서 토큰은 raw 그대로 yield하고, done 이벤트 시점에 정제된 최종 텍스트를 Response로 전달한다.

4 서빙 레이어에서의 활용

계약 덕분에 서빙 레이어는 에이전트 종류를 몰라도 동일한 코드로 처리한다.

4.1 라우터

# routers/qna_chatbot.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse

router = APIRouter(prefix="/agents/qna_chatbot", tags=["QnA Chatbot"])

@router.post("/run")
def run(request: RunRequest, raw_request: Request):
    agent = raw_request.app.state.agents["qna_chatbot"]
    query = Query(text=request.text, history=request.history, user_id=request.user_id)
    response = agent.run(query)
    return RunResponse(response=response)

@router.post("/stream")
def stream(request: RunRequest, raw_request: Request):
    agent = raw_request.app.state.agents["qna_chatbot"]
    query = Query(text=request.text, history=request.history)

    def generate():
        for event in agent.stream(query):
            # type별 페이로드를 SSE 문자열로 직렬화 — sse-starlette가 처리하도록 위임 가능
            yield {"event": event.type, "data": event.model_dump_json()}

    return EventSourceResponse(generate())

4.2 Pydantic 스키마 (서빙용)

# schemas.py — 서빙 레이어의 HTTP 스키마
from pydantic import BaseModel

class RunRequest(BaseModel):
    text: str
    history: list[dict] = []
    user_id: str | None = None
    agent_params: dict = {}

class RunResponse(BaseModel):
    response: Response            # contracts.py의 Response 재사용
    experiment_id: str | None = None
    arm_id: str | None = None

RunRequest(HTTP 요청)와 Query(에이전트 입력)를 분리하는 이유는 HTTP 요청에는 agent_params 같은 서빙 관련 필드가 포함되지만, 에이전트 코어에는 불필요하기 때문이다. 서빙 레이어가 RunRequest에서 Query를 추출하여 에이전트에 전달한다.

5 에이전트 캐싱

A/B 실험에서 같은 설정의 에이전트를 매 요청마다 생성하면 비효율적이다. (experiment_name, arm_id) 키로 에이전트 인스턴스를 캐싱한다.

import threading

class AgentCache:
    def __init__(self):
        self._cache: dict[tuple[str, str], BaseAgent] = {}
        self._lock = threading.Lock()

    def get_or_create(
        self,
        agent_class: type[BaseAgent],
        experiment_name: str,
        arm_id: str,
        config: RAGConfig,
    ) -> BaseAgent:
        key = (experiment_name, arm_id)

        with self._lock:
            if key not in self._cache:
                self._cache[key] = agent_class(config)

        return self._cache[key]

threading.Lock()으로 동시 요청에서 같은 에이전트가 두 번 생성되는 것을 방지한다.

6 새 에이전트 추가 체크리스트

계약 기반 설계 덕분에 새 에이전트를 추가하는 절차가 표준화된다.

단계 파일 작업
1 src/agents/new_agent/agent.py BaseAgent 상속, runstream 구현
2 data/configs/new_agent.yaml 에이전트 설정 파일 생성
3 src/services/api/routers/new_agent.py APIRouter 생성 (/agents/new_agent/run, /stream)
4 src/services/api/main.py app.include_router(new_agent.router) 추가
5 frontend/src/pages/NewAgentPage.tsx 페이지 컴포넌트 생성
6 frontend/src/main.tsx 라우트 추가

에이전트 코어, 서빙, 프론트엔드 각각에 파일을 하나씩 추가하면 완성이다. 기존 코드를 수정할 필요가 거의 없다.

7 설계 이점 정리

원칙 구현 이점
계약 기반 BaseAgent ABC + domain 속성 에이전트 추가 시 기존 코드 수정 불필요, 도메인 자원 공유
스키마 고정 Pydantic Query/Response/Citation/StreamEvent 타입 검증 자동화, OpenAPI 문서 생성
관심사 분리 RunRequest(HTTP) vs Query(에이전트) 서빙과 비즈니스 로직 독립
실험 호환 캐시 키에 arm_id 포함, Response.arm_id로 사후 분석 같은 에이전트의 다른 설정 동시 운영
이중 인터페이스 run 추상 + stream 기본 구현 토큰 UX가 필요 없는 에이전트도 SSE 라우터 호환
관측성 내장 run_id 자동 생성, latency_ms/ttft_ms 분리 사용자 피드백 매칭 + UX 체감 응답성 분리 측정
자유 출력 Response.agent_data dict 에이전트별 고유 데이터(mode, physical_name 등)를 계약 손대지 않고 노출

8 관련 주제

선행 지식

후속 주제

다른 카테고리 연결

Subscribe

Enjoy this blog? Get notified of new posts by email: