1 본 글의 위치
08-0편 데이터 흐름 추적이 사용자 질문이 RunResponse로 변환되는 동기 호출 한 호흡을 추적했다. 본 글은 그 흐름 위에 얹히는 운영 계층 4가지를 다룬다.
- SSE 스트리밍 경로 — 동기와 갈라지는 분기점부터 done 이벤트까지
- 운영 관측성 — perf_counter timing 로그, runs.jsonl 메트릭, feedback.jsonl 사이드 채널
- 흐름에서 발견한 설계 취약점 — 바이브 코딩 결과물의 한계 진단
- 다른 에이전트의 흐름 — Data Standardizer Supervisor 패턴
마지막에 Phase C-2 LangGraph 분해 예고로 마무리한다.
- MINERVA 데이터 흐름 추적 (08-0) — 동기 호출 한 호흡의 단계별 추적
- MINERVA FastAPI 서빙 레이어 (04) — SSE StreamingResponse 구조
- MINERVA 상태 관리 해부 (09) — JSONL 4계층 영속화
2 SSE 스트리밍 경로
2.1 분기점: chain.invoke() vs chain.stream()
동기(/run)와 스트리밍(/stream)의 차이는 agent._prepare()를 공유한 뒤 LCEL chain 호출 방식에서 갈린다.
공통: _prepare() → docs, chain, chain_inputs
동기: chain.invoke(chain_inputs) → str (완료 후 한 번에)
스트리밍: chain.stream(chain_inputs) → Iterator[str] (토큰마다 yield)
2.2 스트리밍 이벤트 타입
2.3 스트리밍 구현
# src/agents/qna_chatbot/agent.py
def stream(self, query: Query) -> Iterator[StreamEvent]:
start = time.perf_counter()
# _prepare 단계 실패 → error 이벤트 1개 yield 후 종료
try:
docs, chain, chain_inputs, model_used = self._prepare(query)
except Exception as e:
yield StreamEvent(type="error", error=f"{type(e).__name__}: {e}")
return
t_prepared = time.perf_counter()
accumulated: list[str] = []
ttft_ms: Optional[int] = None
try:
for chunk in chain.stream(chain_inputs):
if chunk:
if ttft_ms is None:
# 첫 토큰 도달 시점 측정
ttft_ms = int((time.perf_counter() - start) * 1000)
accumulated.append(chunk)
yield StreamEvent(type="token", text=chunk)
except Exception as e:
yield StreamEvent(type="error", error=f"{type(e).__name__}: {e}")
return
# 전체 누적 텍스트로 Response 조립 후 done 이벤트
latency_ms = int((time.perf_counter() - start) * 1000)
yield StreamEvent(
type="done",
response=self._build_response(
"".join(accumulated), docs, query,
latency_ms=latency_ms, ttft_ms=ttft_ms,
model_used=model_used,
),
)2.4 SSE 전송 형식
# src/services/api/routers/qna_chatbot.py
def stream(req: RunRequest) -> StreamingResponse:
...
def event_generator():
final_response = None
for event in agent.stream(query):
yield f"data: {event.model_dump_json()}\n\n" # SSE 표준 포맷
if event.type == "done":
final_response = event.response
if final_response is not None:
log_run(...)
return StreamingResponse(event_generator(), media_type="text/event-stream")실제 SSE 스트림 (각 줄이 독립적인 JSON):
data: {"type":"token","text":"연","citation":null,"response":null,"error":null}
data: {"type":"token","text":"차","citation":null,"response":null,"error":null}
data: {"type":"token","text":"는","citation":null,"response":null,"error":null}
... (토큰 단위 반복)
data: {"type":"done","text":null,"citation":null,"response":{"text":"연차는 입사 1년...","citations":[...],"run_id":"a3f2b1...","model":"gpt-4.1","latency_ms":3240,"ttft_ms":1820,...},"error":null}
동기 경로와 스트리밍 경로 비교:
| 항목 | 동기 (/run) |
스트리밍 (/stream) |
|---|---|---|
| RAG (_prepare) | 동일 | 동일 |
| LLM 호출 | chain.invoke() |
chain.stream() |
| 응답 타입 | RunResponse { response } |
SSE data: {StreamEvent}\n\n |
| 인용 정보 | response.citations에 포함 |
done 이벤트의 response.citations에 포함 |
ttft_ms |
latency_ms와 동일 (블로킹) |
첫 non-empty 토큰 도달 시점 |
| 로깅 | run 완료 후 즉시 | done 이벤트 수신 후 |
3 운영 관측성
본편 단계 5에서 Response가 조립된 직후, 운영 관점의 부가 작업이 일어난다. 사용자 응답에 직접 영향을 주지 않지만 운영 디버깅·메트릭 수집의 토대다.
3.1 실측 타이밍 — [timing] 로그
_prepare()와 hybrid_search() 모두 time.perf_counter()로 단계별 지연을 측정해 [timing] 로그로 남긴다. 운영 환경에서 TTFT 회귀 진단의 1차 단서다.
[timing] hybrid_search: azure=420ms rerank/cosine=15ms map_parents=2ms children=12 parents=6
[timing] cosine: query_embed=0ms doc_embed=0ms(cached) numpy+sort=2ms n=12
[timing] prepare: init=0ms retrieve=437ms ctx+history=1ms chain=3ms docs=6 context_chars=4823 history_chars=412
[ttft] qna_chatbot total=1820ms (prepare=441ms llm_first_token=1379ms)
각 필드의 의미:
| 로그 prefix | 필드 | 의미 |
|---|---|---|
hybrid_search |
azure | Azure 검색 응답까지 (네트워크 + 인덱스) |
hybrid_search |
rerank/cosine | reranker 또는 cosine 유사도 계산 |
hybrid_search |
map_parents | child→parent 매핑 |
cosine |
query_embed | 0ms = _hybrid_search_with_vectors에서 이미 계산됨 |
cosine |
doc_embed | cached = vector 재사용, api = embed_documents 호출 |
prepare |
init | prompt/llm lazy 로드 (최초 요청만 100~500ms) |
prepare |
retrieve | _retrieve_docs() 전체 (hybrid_search 내부 합산) |
ttft |
total | start ~ 첫 token yield |
ttft |
llm_first_token | _prepared ~ 첫 token (LLM TTFT) |
prepare와 ttft.total - ttft.llm_first_token이 일치해야 한다(perf_counter 기준 동일).
3.2 메트릭 로깅 — record_from_response()
응답 후 log_run()이 단일 JSONL(data/runtime/runs.jsonl)에 한 줄을 추가한다. 스키마는 PostgreSQL 컬럼과 1:1 매핑되어 있어, 향후 PG INSERT 전환 시 변환 없이 import 가능하다.
# src/core/metrics_logger.py
def record_from_response(agent_name, query, response, *, extras=None):
return {
"timestamp": datetime.now().isoformat(timespec="seconds"),
"run_id": response.run_id, # uuid hex — 피드백 매칭 키
"agent_name": agent_name,
"user_id": query.user_id,
"session_id": query.session_id,
"query": query.text,
"answer": response.text,
"total_time_ms": response.latency_ms,
"ttft_ms": response.ttft_ms,
"success": True,
"model_name": response.model,
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"citation_count": citation_count, # cite_meta 또는 본문 [N] 추정
"documents_retrieved": cite_meta,
"has_table": has_table(text), # 답변 본문 휴리스틱
"has_code_block": has_code_block(text),
"has_qna_format": has_qna_format(text),
"has_principle": has_principle(text),
"experiment_name": query.experiment_id,
"arm_id": response.arm_id or query.arm_id,
"extras": extras, # arm_overrides 스냅샷
}citation_count 보강 로직: len(response.citations)가 0이면 답변 본문의 [N] 마커를 정규식으로 세서 보강한다(count_citations()). LLM이 인용은 했지만 Citation 메타가 비어있는 경우(예: 검색 결과 빈 리스트인데 답변에 인용 형식이 등장)에 대한 fallback이다.
3.3 사용자 피드백 사이드 채널
runs.jsonl이 응답 시점에 한 줄 추가되면, 그 줄의 run_id가 사용자 피드백 매칭 키가 된다. 사용자가 응답 카드의 helpful/unhelpful 버튼을 누르면 별도 라우터(POST /feedback)가 data/runtime/feedback.jsonl에 4컬럼(timestamp/run_id/agent_name/helpful)을 append한다. 분석 시 pd.read_json(lines=True) 두 파일을 run_id로 left-join하면 응답·피드백·실험·비용을 한 DataFrame에서 비교할 수 있다.
# feedback.jsonl 예시
{"timestamp": "2026-05-05T10:24:01", "run_id": "a3f2b1c4...", "agent_name": "qna_chatbot", "helpful": true}이 사이드 채널이 핵심인 이유: 응답 시점에는 helpful 여부를 모르므로 두 JSONL을 분리한다. 응답 row를 사후 UPDATE하면 append-only 이점(동시 쓰기 안전, 스트리밍 처리)이 깨진다. 비슷한 이유로 비용도 사후 계산이다 — pricing.py의 모델별 단가 표를 runs.jsonl의 model_name + input_tokens + output_tokens에 적용해 호출별 USD를 분석 시점에 산출한다.
4 흐름에서 발견한 설계 취약점
08-0편의 동기 흐름과 본 글의 스트리밍·관측성을 종합하면 바이브 코딩 결과물의 몇 가지 취약점이 드러난다.
1. _prepare()가 검색·컨텍스트·chain을 한 번에 실행한다
검색 실패, 컨텍스트 조립 실패, chain 조립 실패를 구분하기 어렵다. 에러 로그가 나와도 어느 단계에서 실패했는지 파악하려면 코드를 깊이 읽어야 한다. Phase C-2에서 LangGraph Node로 분해하면 각 단계가 독립적인 에러 경계를 갖는다.
2. 스트리밍 에러 처리가 부분적이다
chain.stream() 도중 LLM 에러가 발생하면 이미 yield된 토큰은 클라이언트에 전달된 상태다. done 이벤트가 없으므로 클라이언트는 스트림이 불완전하게 끊겼다는 것을 error 이벤트를 받아야만 안다. 클라이언트가 error 이벤트를 처리하지 않으면 partial 텍스트가 그대로 표시될 수 있다. 이는 10편 에러 전파에서 다룬다.
3. Citation 파싱이 LLM 출력 텍스트 패턴에 의존한다
_filter_citations()는 답변에서 [N] 또는 [N, §...] 패턴을 정규식으로 찾는다. LLM이 다른 형식으로 출력하면 인용이 누락된다. 구조화된 출력(structured output)으로 개선이 필요하다.
4. Parent 매핑 실패 시 child fallback이 있지만 로그 외 알림이 없다
parent_id가 없거나 parent_store에 없는 child가 존재하면 해당 child를 직접 반환하는 fallback이 작동한다. 이 경우 LLM에 전달되는 컨텍스트가 child 크기(~400자)로 줄어들어 응답 품질이 저하될 수 있다. 현재는 print()로 경고만 남기고 별도 메트릭이 없다.
5 다른 에이전트의 흐름 — Data Standardizer는 Supervisor 패턴
08-0편은 QnA Chatbot의 단일 LCEL 체인 경로를 따라갔다. Data Standardizer 에이전트는 같은 BaseAgent 계약을 따르되 내부가 Supervisor + sub_agent 구조라서 데이터 흐름이 다르게 흘러간다.
query.text
│
▼ _resolve_mode(query) ← "data" | "code" 자동 감지
│
▼ RagRecommender.run(text, mode) ← sub_agent 1: 표준화 RAG 추천
│ (raw_text, docs)
│
▼ _apply_post_processing(raw_text) ← sub_agents/post_processing/{tables,code}
│ 물리명 부여 + ALBERT 도메인 분류 + 표 정렬
│ (도메인 캐시로 같은 한글 논리명 재예측 회피)
│
▼ _is_full_format(processed)?
│ ├─ True: DomainAuditor.audit_and_fix() ← sub_agent 2: 우선순위 감사
│ └─ False: skip (단건 모드만 LLM 비용 발생)
│
▼ Response(text=processed, agent_data={"mode": mode}, ...)
스트리밍 경로의 핵심 차이: 토큰은 RagRecommender의 LLM 출력을 그대로 yield하지만, post_processing은 표 전체가 모여야 일관되게 적용 가능하다 (물리명·도메인 분류·표 정렬은 행 단위가 아닌 표 단위 작업). 따라서 done 이벤트 시점에서 일괄 후처리한 최종 텍스트로 Response.text를 채우고, 클라이언트는 토큰 진행 중에는 raw 표를, 완료 후에는 정제 표를 본다.
자세한 sub_agent 구현 패턴은 에이전트 위임 (21)에서 다룬다.
6 Phase C-2 예고 — LangGraph Node 분해
08-0편의 _prepare() 안에 순차적으로 묶인 흐름을 LangGraph의 명시적 Node와 Edge로 분리하면 다음과 같이 된다.
AgentState: { query, docs, context, chain_inputs, answer, response }
[retrieve_node] query.text → docs (list[Document])
│
▼
[build_context_node] docs → context (str)
│
▼
[build_inputs_node] context + query → chain_inputs (dict)
│
▼
[generate_node] chain_inputs → answer (str)
│
▼
[build_response_node] answer + docs → response (Response)
│
▼
[END]
각 Node가 독립적으로 테스트 가능해지고, 조건부 Edge로 검색 실패 시 fallback 경로를 추가할 수 있다. 자세한 내용은 LangGraph 기초 (13)에서 다룬다.
7 정리
| 항목 | 핵심 |
|---|---|
| SSE 스트리밍 | 동기와 _prepare() 공유, chain.stream()으로 분기. error 이벤트 시 done 없이 종료 |
| 실측 timing | perf_counter 기반 [timing] 로그가 운영 TTFT 회귀 진단 1차 단서 |
| 메트릭 로깅 | runs.jsonl PG 호환 스키마, 응답 시점 1회 append. citation_count 정규식 fallback |
| 피드백 사이드 채널 | feedback.jsonl 별도 파일, run_id로 join. append-only 보존 + 비용 사후 계산 |
| 설계 취약점 | _prepare() 묶음, 스트리밍 부분 처리, Citation 정규식 의존, Parent fallback 무음 강등 |
| Data Standardizer 흐름 | Supervisor + sub_agent 정적 파이프라인. 토큰 yield + done 시점 일괄 후처리 |
| Phase C-2 전환 | _prepare() 5개 Node로 분해, 조건부 Edge로 폴백 |
본편 동기 흐름 위에 운영 계층을 얹어 시리즈 후반부(C-2 LangGraph 전환·C-9 관측성)의 토대를 마련한다.
8 관련 주제
선행 학습
- MINERVA 데이터 흐름 추적 (08-0) — 동기 호출 한 호흡
- MINERVA FastAPI 서빙 레이어 (04) — SSE StreamingResponse
- MINERVA 상태 관리 해부 (09) — JSONL 4계층 영속화
Phase C-1 후속 포스트
- 에러 전파 경로 분석 (10) — 부분 스트리밍 + 무음 강등 처리
- Config 의존성 추적 (11) — 환경변수·YAML·실험 override
Phase C-2 — LangGraph 전환
Phase C-3 연결
- 에이전트 위임 (21) — Data Standardizer Supervisor의 발전형