1 왜 계약(Contract)이 필요한가
MINERVA에는 여러 에이전트가 있다: QnA Chatbot, Data Standardizer, 그리고 앞으로 추가될 에이전트들. 각 에이전트의 내부 로직은 다르지만, 외부에서 호출하는 방식은 동일해야 한다.
계약이 없으면 발생하는 문제:
- 에이전트마다 메서드 이름이 다르다 (
run,execute,process,generate) - 입출력 형식이 다르다 (dict, tuple, 커스텀 클래스)
- 서빙 레이어가 에이전트마다 다른 코드를 작성해야 한다
- 새 에이전트 추가 시 서빙 레이어, 프론트엔드까지 수정해야 한다
계약이 있으면:
- 모든 에이전트가
run(Query) -> Response인터페이스를 따른다 - 서빙 레이어는 에이전트 종류를 몰라도 동일하게 호출할 수 있다
- 새 에이전트는 계약만 구현하면 자동으로 서빙된다
2 계약 정의: contracts.py
2.1 Pydantic 스키마
import uuid
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field
class ConversationTurn(BaseModel):
role: str # "user" | "assistant"
content: str
class Query(BaseModel):
text: str
history: list[ConversationTurn] = Field(default_factory=list)
agent_params: dict[str, Any] = Field(default_factory=dict)
# A/B 실험 맥락
user_id: Optional[str] = None
session_id: Optional[str] = None
experiment_id: Optional[str] = None
arm_id: Optional[str] = None
class Citation(BaseModel):
index: int # 본문 [1], [2] 표시 번호
content: str
metadata: dict[str, Any] = Field(default_factory=dict)
score: Optional[float] = None # 원 cosine — 로깅·실험·threshold 기준
display_score: Optional[float] = None # UI 표시용 보정 점수 (relevance.py)
section: Optional[str] = None # 예: "§7.2.4"
class Response(BaseModel):
text: str
citations: list[Citation] = Field(default_factory=list)
# 관찰성·분석
run_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
model: Optional[str] = None # 실제 사용된 LLM deployment
latency_ms: Optional[int] = None # 첫 요청부터 done 까지
ttft_ms: Optional[int] = None # Time to First Token
input_tokens: Optional[int] = None
output_tokens: Optional[int] = None
arm_id: Optional[str] = None # Query.arm_id 복사 (사후 분석용)
# Agent별 자유 출력
agent_data: dict[str, Any] = Field(default_factory=dict)
timestamp: datetime = Field(default_factory=datetime.now)
class StreamEvent(BaseModel):
type: str # "token" | "citation" | "done" | "error"
text: Optional[str] = None # type="token" 시 토큰 조각
citation: Optional[Citation] = None # type="citation" 시 인용
response: Optional[Response] = None # type="done" 시 최종 Response 전체
error: Optional[str] = None # type="error" 시 메시지각 모델의 역할:
- Query: 에이전트에 전달하는 입력.
text만 필수이다.agent_params는 에이전트별 자유 입력(예: Data Standardizer의mode: "data" | "code")을 담는다. - Citation: 답변의 근거 문서.
index는 본문의[1]·[2]와 매칭되는 번호다.score는 원 cosine similarity(로깅·실험에 사용),display_score는 한국어 임베딩의 0.5 대가 “매우 관련”임을 반영하는 UI 보정 점수다 — 두 점수를 분리해 원 점수를 보존한다. - Response: 에이전트가 반환하는 출력.
run_id는 자동 생성되며 사용자 피드백 매칭 키로 쓰인다.latency_ms는 전체 완료 시간,ttft_ms는 첫 토큰까지의 시간으로 사용자 체감 응답성에 직접 영향을 미친다.agent_data는 에이전트별 자유 출력(예: Standardizer의{"physical_name": ...})이다. - StreamEvent: SSE 이벤트의 분리 필드 표현.
event/data문자열 페어가 아니라type필드로 분기하고 페이로드는 타입별 전용 필드(text/citation/response/error)에 담는다 — Pydantic 직렬화로 자동 검증된다.
2.2 ABC 인터페이스
from abc import ABC, abstractmethod
from typing import Iterator, Optional
class BaseAgent(ABC):
"""모든 Agent의 공통 계약.
Subclass 필수: name (식별자), run(query) → Response.
선택: domain (도메인 공유 자원 키), stream(query) (override 시 토큰 단위).
"""
name: str = "" # subclass가 반드시 덮어씀
domain: Optional[str] = None # 예: "standardization"
@abstractmethod
def run(self, query: Query) -> Response:
"""동기 실행 — 전체 응답을 한 번에 반환한다."""
...
def stream(self, query: Query) -> Iterator[StreamEvent]:
"""스트리밍 실행. 기본 구현은 run()을 감싸 done 이벤트만 1개 yield.
실제 토큰 스트리밍이 필요한 Agent는 이 메서드를 override한다.
"""
response = self.run(query)
yield StreamEvent(type="done", response=response)핵심 설계 결정 두 가지:
run만 추상:stream은 기본 구현을 제공한다. 토큰 스트리밍이 어렵거나 불필요한 에이전트(배치 표준화, 정적 분석 등)도run만 구현하면 SSE 라우터가 동작한다 — done 이벤트 한 번으로 응답이 전달된다. 토큰 단위 UX가 필요한 QnA Chatbot이나 Data Standardizer만stream을 override한다.domain클래스 속성: 같은 도메인(예:"standardization")에 속한 에이전트들이 RAG 인덱스·약어 사전·도메인 분류기 같은 무거운 자원을 공유할 수 있게 하는 키다. 인스턴스화 시점에 도메인별 자원이 주입된다.
3 에이전트 구현
3.1 QnA Chatbot — 단일 LCEL 체인
import time
from typing import Iterator
class QnaChatbotAgent(BaseAgent):
name = "qna_chatbot"
domain = "standardization"
def __init__(self, config: RAGConfig):
self.pipeline = RAGPipeline(config)
self.llm = self._init_llm(config.llm)
def run(self, query: Query) -> Response:
start = time.time()
ttft_start = time.time()
# RAG 파이프라인 실행 (검색·리랭크·parent 매핑)
docs, chain, chain_inputs, model_used = self._prepare(query)
answer = chain.invoke(chain_inputs)
ttft_ms = int((time.time() - ttft_start) * 1000)
return Response(
text=answer,
citations=self._build_citations(docs, answer),
model=model_used,
ttft_ms=ttft_ms,
latency_ms=int((time.time() - start) * 1000),
arm_id=query.arm_id,
)
def stream(self, query: Query) -> Iterator[StreamEvent]:
start = time.time()
docs, chain, chain_inputs, model_used = self._prepare(query)
accumulated = ""
for chunk in chain.stream(chain_inputs):
accumulated += chunk
yield StreamEvent(type="token", text=chunk)
# 본문 마커([1, §...])를 보고 실제 인용된 문서만 추리는 사후 필터링
citations = self._build_citations(docs, accumulated)
for cit in citations:
yield StreamEvent(type="citation", citation=cit)
# 완료 시 Response 전체를 done 이벤트로 — 클라이언트는 latency·tokens·model을 한 번에 수신
yield StreamEvent(
type="done",
response=Response(
text=accumulated,
citations=citations,
model=model_used,
latency_ms=int((time.time() - start) * 1000),
arm_id=query.arm_id,
),
)3.2 Data Standardizer — Supervisor 패턴
QnA가 단일 LCEL 체인이라면, Data Standardizer는 하위 에이전트(sub_agent)를 조율하는 Supervisor다. 같은 BaseAgent 계약을 따르지만 내부는 여러 단계로 분해된다.
class DataStandardizerAgent(BaseAgent):
name = "data_standardizer"
domain = "standardization"
def __init__(self, config: RAGConfig):
self._config = config
self._recommender = None # RagRecommender — lazy init
self._auditor = None # DomainAuditor — lazy init
self._domain_cache: dict[str, tuple[str, float] | None] = {}
def run(self, query: Query) -> Response:
start = time.time()
recommender = self._ensure_recommender()
mode = _resolve_mode(query) # "data" | "code" 자동 감지
# 1. RAG 추천 (sub_agent)
raw_text, docs = recommender.run(query.text, mode=mode)
# 2. 후처리 — 물리명 생성 + ALBERT 도메인 분류 (sub_agents/post_processing)
processed = _apply_post_processing(raw_text, domain_cache=self._domain_cache)
# 3. 단건 모드일 때만 도메인 감사 (LLM 비용 절약)
if _is_full_format(processed):
processed = self._ensure_auditor().audit_and_fix(processed)
return Response(
text=processed,
citations=self._build_citations(docs, processed),
latency_ms=int((time.time() - start) * 1000),
arm_id=query.arm_id,
agent_data={"mode": mode}, # 자유 필드로 mode 노출
)
def stream(self, query: Query) -> Iterator[StreamEvent]:
recommender = self._ensure_recommender()
mode = _resolve_mode(query)
# 토큰은 그대로 흘려 보내되 — 후처리는 done 시점에 일괄
raw_text = ""
docs = []
for item in recommender.stream(query.text, mode=mode):
if isinstance(item, str):
raw_text += item
yield StreamEvent(type="token", text=item)
else:
_, docs = item # 마지막 (full_text, docs)
processed = _apply_post_processing(raw_text, domain_cache=self._domain_cache)
if _is_full_format(processed):
processed = self._ensure_auditor().audit_and_fix(processed)
yield StreamEvent(
type="done",
response=Response(text=processed, citations=self._build_citations(docs, processed),
arm_id=query.arm_id, agent_data={"mode": mode}),
)토큰 스트리밍은 사용자에게 즉시 응답을 보여주지만, 후처리(표 정렬, 물리명 부여, 도메인 분류)는 표 전체를 봐야 일관되게 적용된다. 그래서 토큰은 raw 그대로 yield하고, done 이벤트 시점에 정제된 최종 텍스트를 Response로 전달한다.
4 서빙 레이어에서의 활용
계약 덕분에 서빙 레이어는 에이전트 종류를 몰라도 동일한 코드로 처리한다.
4.1 라우터
# routers/qna_chatbot.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
router = APIRouter(prefix="/agents/qna_chatbot", tags=["QnA Chatbot"])
@router.post("/run")
def run(request: RunRequest, raw_request: Request):
agent = raw_request.app.state.agents["qna_chatbot"]
query = Query(text=request.text, history=request.history, user_id=request.user_id)
response = agent.run(query)
return RunResponse(response=response)
@router.post("/stream")
def stream(request: RunRequest, raw_request: Request):
agent = raw_request.app.state.agents["qna_chatbot"]
query = Query(text=request.text, history=request.history)
def generate():
for event in agent.stream(query):
# type별 페이로드를 SSE 문자열로 직렬화 — sse-starlette가 처리하도록 위임 가능
yield {"event": event.type, "data": event.model_dump_json()}
return EventSourceResponse(generate())4.2 Pydantic 스키마 (서빙용)
# schemas.py — 서빙 레이어의 HTTP 스키마
from pydantic import BaseModel
class RunRequest(BaseModel):
text: str
history: list[dict] = []
user_id: str | None = None
agent_params: dict = {}
class RunResponse(BaseModel):
response: Response # contracts.py의 Response 재사용
experiment_id: str | None = None
arm_id: str | None = NoneRunRequest(HTTP 요청)와 Query(에이전트 입력)를 분리하는 이유는 HTTP 요청에는 agent_params 같은 서빙 관련 필드가 포함되지만, 에이전트 코어에는 불필요하기 때문이다. 서빙 레이어가 RunRequest에서 Query를 추출하여 에이전트에 전달한다.
5 에이전트 캐싱
A/B 실험에서 같은 설정의 에이전트를 매 요청마다 생성하면 비효율적이다. (experiment_name, arm_id) 키로 에이전트 인스턴스를 캐싱한다.
import threading
class AgentCache:
def __init__(self):
self._cache: dict[tuple[str, str], BaseAgent] = {}
self._lock = threading.Lock()
def get_or_create(
self,
agent_class: type[BaseAgent],
experiment_name: str,
arm_id: str,
config: RAGConfig,
) -> BaseAgent:
key = (experiment_name, arm_id)
with self._lock:
if key not in self._cache:
self._cache[key] = agent_class(config)
return self._cache[key]threading.Lock()으로 동시 요청에서 같은 에이전트가 두 번 생성되는 것을 방지한다.
6 새 에이전트 추가 체크리스트
계약 기반 설계 덕분에 새 에이전트를 추가하는 절차가 표준화된다.
| 단계 | 파일 | 작업 |
|---|---|---|
| 1 | src/agents/new_agent/agent.py |
BaseAgent 상속, run과 stream 구현 |
| 2 | data/configs/new_agent.yaml |
에이전트 설정 파일 생성 |
| 3 | src/services/api/routers/new_agent.py |
APIRouter 생성 (/agents/new_agent/run, /stream) |
| 4 | src/services/api/main.py |
app.include_router(new_agent.router) 추가 |
| 5 | frontend/src/pages/NewAgentPage.tsx |
페이지 컴포넌트 생성 |
| 6 | frontend/src/main.tsx |
라우트 추가 |
에이전트 코어, 서빙, 프론트엔드 각각에 파일을 하나씩 추가하면 완성이다. 기존 코드를 수정할 필요가 거의 없다.
7 설계 이점 정리
| 원칙 | 구현 | 이점 |
|---|---|---|
| 계약 기반 | BaseAgent ABC + domain 속성 |
에이전트 추가 시 기존 코드 수정 불필요, 도메인 자원 공유 |
| 스키마 고정 | Pydantic Query/Response/Citation/StreamEvent |
타입 검증 자동화, OpenAPI 문서 생성 |
| 관심사 분리 | RunRequest(HTTP) vs Query(에이전트) |
서빙과 비즈니스 로직 독립 |
| 실험 호환 | 캐시 키에 arm_id 포함, Response.arm_id로 사후 분석 |
같은 에이전트의 다른 설정 동시 운영 |
| 이중 인터페이스 | run 추상 + stream 기본 구현 |
토큰 UX가 필요 없는 에이전트도 SSE 라우터 호환 |
| 관측성 내장 | run_id 자동 생성, latency_ms/ttft_ms 분리 |
사용자 피드백 매칭 + UX 체감 응답성 분리 측정 |
| 자유 출력 | Response.agent_data dict |
에이전트별 고유 데이터(mode, physical_name 등)를 계약 손대지 않고 노출 |
8 관련 주제
선행 지식
- Pydantic 심화 – Field, validator, BaseSettings
- Python ABC – 추상 베이스 클래스
후속 주제
- BaseAgent 계약 v2 – LangGraph 호환 인터페이스와 v1 어댑터 – 본 v1 위에 그래프·State·Checkpointer 능력을 더한 확장
- RAG 파이프라인 설계 – BaseAgent.run() 내부의 검색 파이프라인
- FastAPI 서빙 레이어 – 계약을 HTTP로 노출하는 서빙 구조
- A/B 실험 프레임워크 – 에이전트 캐싱과 실험 설정 override
다른 카테고리 연결
- MINERVA 아키텍처 개요 – 3-Layer 전체 구조