1 분해의 목표
13편에서 LangGraph가 무엇이고 왜 필요한지를 다뤘다. 이번 편은 그 이론을 MINERVA의 실제 코드에 적용한다.
분해 후 얻는 것은 네 가지다.
- 노드별 단위 테스트 —
_prepare()한 함수를 통째로 mock하지 않고, 노드 입출력만 mock하면 된다. - 검색 실패 폴백 —
retrieve_children노드가 빈 결과를 반환할 때 conditional edge로 web search 노드로 우회한다. - A/B arm을 노드 교체로 표현 — Reranker A/B를 두 가지 노드(
rerank_cosine,rerank_flashrank)로 두고 arm에 따라 컴파일된 그래프를 바꾼다. - 단계별 관찰성 —
app.stream()또는astream_events()가 노드별 입출력을 자동으로 토해낸다.[timing]로그를 직접 찍을 필요가 없다.
- MINERVA LangGraph 기초 — Node·Edge·State 개념과 Chain/Graph 비교
- MINERVA 데이터 흐름 추적 —
_prepare()단계별 변환 - MINERVA RAG 파이프라인 설계 — Hybrid Search + Parent-Child Chunking
2 분해 전 — 현재 _prepare() 흐름
src/agents/qna_chatbot/agent.py의 핵심부는 다음과 같다.
def _prepare(self, query: Query):
self._ensure_initialized() # (1) lazy init
document_id = query.agent_params.get("document_id", ...)
source_filter = query.agent_params.get("source_filter")
docs = self._retrieve_docs(query.text, document_id, source_filter=source_filter) # (2)~(4)
chain_inputs = self._build_chain_inputs(query, docs) # (5)
llm, model_used = self._get_llm_for_query(query)
chain = self._prompt | llm | StrOutputParser()
return docs, chain, chain_inputs, model_used
def run(self, query: Query) -> Response:
docs, chain, chain_inputs, model_used = self._prepare(query)
answer = chain.invoke(chain_inputs) # (6) generate
return self._build_response(answer, docs, query, ...) # (7) response_retrieve_docs()는 안에서 다시 세 단계로 갈린다 (src/core/rag/retriever.py).
def similarity_search(self, query, k=4, **kwargs):
child_docs = self.vectorstore.similarity_search(query, k=k * 2, **kwargs) # (2) child 검색
if self._reranker_type in ("flashrank", "cross_encoder"):
child_docs = self._rerank_children(child_docs, query) # (3) rerank
elif self._embeddings is not None:
child_docs = self._compute_cosine_scores(child_docs, query) # (3') cosine
parents = self._map_to_parents_with_child_score(child_docs, k) # (4) parent 매핑
return parents따라서 7개 단계가 함수 안에 누적되어 있다. 단계마다 입출력 타입은 다음과 같다.
| 단계 | 입력 | 출력 | 책임 |
|---|---|---|---|
| (1) ensure_init | (없음) | (없음) | prompt·LLM·retriever lazy 로드 |
| (2) retrieve_children | str |
list[Document] (child, k2~k3개) |
hybrid/mmr/similarity 검색 |
| (3) rerank | list[Document] |
list[Document] (child + score) |
flashrank 또는 cosine |
| (4) map_to_parents | list[Document] (child) |
list[Document] (parent, k개) |
parent dedup + score 전파 |
| (5) build_inputs | Query, list[Document] |
dict |
context 직렬화 + chat_history 포맷 |
| (6) generate | dict |
str |
LLM 호출 |
| (7) build_response | str, list[Document], Query |
Response |
인용 필터 + Pydantic 조립 |
각 행이 LangGraph의 일급 노드 후보다.
3 State 정의 — 단계 입출력을 한 자료 구조로
먼저 노드 사이를 흘러다닐 State를 정의한다. State 설계의 깊은 패턴은 15편에서 다루므로, 여기서는 분해에 필요한 최소 형태만 둔다.
from typing import Annotated, TypedDict
from operator import add
from langchain_core.documents import Document
from core.contracts import Query, Response
class QnaState(TypedDict):
# 입력 — 그래프 시작 시 채워짐
query: Query
# 검색 단계 누적
child_docs: list[Document]
parent_docs: list[Document]
# 생성 단계
chain_inputs: dict
answer: str
# 출력
response: Response
# 관찰성 — 노드가 이어붙임
timings: Annotated[list[dict], add]
log_messages: Annotated[list[str], add]Annotated[..., add]로 표시한 두 필드는 노드 반환값이 누적된다. 나머지는 가장 최근 반환값으로 덮어쓴다.
4 노드 분해
각 노드를 state -> dict 시그니처로 작성한다. 모든 노드는 agent 인스턴스를 closure로 캡처한다 — 클래스 메서드 자원을 그대로 재사용하기 위해서다.
4.1 ensure_init_node
import time
def make_ensure_init_node(agent):
def ensure_init_node(state: QnaState) -> dict:
t0 = time.perf_counter()
agent._ensure_initialized()
return {
"timings": [{"node": "ensure_init", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": ["initialized"],
}
return ensure_init_node이미 _ensure_initialized()가 idempotent하므로 별도 가드는 필요 없다.
4.2 retrieve_children_node
def make_retrieve_children_node(agent):
def retrieve_children_node(state: QnaState) -> dict:
t0 = time.perf_counter()
query = state["query"]
document_id = query.agent_params.get("document_id", agent._default_document_id)
source_filter = query.agent_params.get("source_filter")
retriever = agent._get_retriever(document_id)
k = agent.config.retrieval.k
fetch_k = k * 3 if source_filter and source_filter != "all" else k
if agent.config.retrieval.search_type == "hybrid":
child_docs = retriever.vectorstore.hybrid_search(query=query.text, k=fetch_k * 2)
elif agent.config.retrieval.search_type == "mmr":
child_docs = retriever.vectorstore.max_marginal_relevance_search(
query=query.text, k=fetch_k * 2,
lambda_mult=agent.config.retrieval.mmr_diversity,
)
else:
child_docs = retriever.vectorstore.similarity_search(query=query.text, k=fetch_k * 2)
return {
"child_docs": child_docs,
"timings": [{"node": "retrieve_children", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": [f"retrieved {len(child_docs)} child docs"],
}
return retrieve_children_node기존 _retrieve_docs()에서 rerank·parent 매핑을 떼어 내고, child 검색만 남긴다.
4.3 rerank_node — 두 가지 변형
A/B 실험을 노드 교체로 표현하는 핵심 부분이다. 같은 시그니처의 노드 두 개를 만든다.
def make_rerank_flashrank_node(agent):
def rerank_flashrank_node(state: QnaState) -> dict:
t0 = time.perf_counter()
retriever = agent._get_retriever(state["query"].agent_params.get("document_id"))
reranked = retriever._rerank_children(state["child_docs"], state["query"].text)
return {
"child_docs": reranked,
"timings": [{"node": "rerank_flashrank", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": [f"flashrank reranked {len(reranked)} docs"],
}
return rerank_flashrank_node
def make_rerank_cosine_node(agent):
def rerank_cosine_node(state: QnaState) -> dict:
t0 = time.perf_counter()
retriever = agent._get_retriever(state["query"].agent_params.get("document_id"))
scored = retriever._compute_cosine_scores(state["child_docs"], state["query"].text)
return {
"child_docs": scored,
"timings": [{"node": "rerank_cosine", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": [f"cosine scored {len(scored)} docs"],
}
return rerank_cosine_node두 노드는 같은 입력을 받고 같은 형태의 출력을 낸다. 그래프 컴파일 시점에 어느 쪽을 등록할지만 바꾸면 된다.
4.4 map_to_parents_node
def make_map_to_parents_node(agent):
def map_to_parents_node(state: QnaState) -> dict:
t0 = time.perf_counter()
retriever = agent._get_retriever(state["query"].agent_params.get("document_id"))
k = agent.config.retrieval.k
parents = retriever._map_to_parents_with_child_score(state["child_docs"], k)
return {
"parent_docs": parents,
"timings": [{"node": "map_to_parents", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": [f"mapped to {len(parents)} parents"],
}
return map_to_parents_node4.5 build_inputs_node
def make_build_inputs_node(agent):
def build_inputs_node(state: QnaState) -> dict:
t0 = time.perf_counter()
chain_inputs = agent._build_chain_inputs(state["query"], state["parent_docs"])
return {
"chain_inputs": chain_inputs,
"timings": [{"node": "build_inputs", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": [
f"context={len(chain_inputs['context'])}c "
f"history={len(chain_inputs['chat_history'])}c"
],
}
return build_inputs_node기존 _build_chain_inputs()를 그대로 재사용한다 — Phase 10.84에서 추출해 둔 덕분에 LCEL 의존이 없다.
4.6 generate_node
from langchain_core.output_parsers import StrOutputParser
def make_generate_node(agent):
def generate_node(state: QnaState) -> dict:
t0 = time.perf_counter()
llm, model_used = agent._get_llm_for_query(state["query"])
chain = agent._prompt | llm | StrOutputParser()
answer = chain.invoke(state["chain_inputs"])
return {
"answer": answer,
"timings": [{
"node": "generate",
"ms": int((time.perf_counter() - t0) * 1000),
"model": model_used,
}],
"log_messages": [f"generated {len(answer)}c answer"],
}
return generate_node4.7 build_response_node
def make_build_response_node(agent):
def build_response_node(state: QnaState) -> dict:
t0 = time.perf_counter()
total_ms = sum(t["ms"] for t in state["timings"])
response = agent._build_response(
state["answer"],
state["parent_docs"],
state["query"],
latency_ms=total_ms,
ttft_ms=total_ms,
model_used=next((t.get("model") for t in state["timings"] if t.get("model")), None),
)
return {
"response": response,
"timings": [{"node": "build_response", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": [f"response built with {len(response.citations)} citations"],
}
return build_response_node5 그래프 조립
A/B arm에 따라 다른 그래프를 컴파일하는 빌더 함수를 만든다.
from langgraph.graph import StateGraph, START, END
def build_qna_graph(agent, reranker: str = "cosine"):
graph = StateGraph(QnaState)
graph.add_node("ensure_init", make_ensure_init_node(agent))
graph.add_node("retrieve_children", make_retrieve_children_node(agent))
if reranker == "flashrank":
graph.add_node("rerank", make_rerank_flashrank_node(agent))
else:
graph.add_node("rerank", make_rerank_cosine_node(agent))
graph.add_node("map_to_parents", make_map_to_parents_node(agent))
graph.add_node("build_inputs", make_build_inputs_node(agent))
graph.add_node("generate", make_generate_node(agent))
graph.add_node("build_response", make_build_response_node(agent))
graph.add_edge(START, "ensure_init")
graph.add_edge("ensure_init", "retrieve_children")
graph.add_edge("retrieve_children", "rerank")
graph.add_edge("rerank", "map_to_parents")
graph.add_edge("map_to_parents", "build_inputs")
graph.add_edge("build_inputs", "generate")
graph.add_edge("generate", "build_response")
graph.add_edge("build_response", END)
return graph.compile()A/B 실험 arm 결정 후 호출은 다음과 같이 바뀐다.
6 검색 실패 폴백 — Conditional Edge
retrieve_children이 빈 결과를 반환할 때 web search 노드로 우회한다. Chain에서는 함수 안 if 사슬로 처리해야 했지만, 그래프에서는 conditional edge로 표현된다.
def make_web_search_node(web_tool):
def web_search_node(state: QnaState) -> dict:
t0 = time.perf_counter()
results = web_tool.invoke(state["query"].text)
docs = [Document(page_content=r["snippet"], metadata={"source_name": "web"}) for r in results[:5]]
return {
"child_docs": docs,
"parent_docs": docs, # web 결과는 child=parent
"timings": [{"node": "web_search", "ms": int((time.perf_counter() - t0) * 1000)}],
"log_messages": [f"web fallback: {len(docs)} results"],
}
return web_search_node
def route_after_retrieve(state: QnaState) -> str:
if not state["child_docs"]:
return "fallback"
return "ok"
def build_qna_graph_with_fallback(agent, web_tool, reranker="cosine"):
graph = StateGraph(QnaState)
# ... (이전과 동일하게 노드 등록)
graph.add_node("web_search", make_web_search_node(web_tool))
graph.add_edge(START, "ensure_init")
graph.add_edge("ensure_init", "retrieve_children")
graph.add_conditional_edges(
"retrieve_children",
route_after_retrieve,
{"ok": "rerank", "fallback": "web_search"},
)
graph.add_edge("rerank", "map_to_parents")
graph.add_edge("web_search", "build_inputs") # rerank·parent 우회
graph.add_edge("map_to_parents", "build_inputs")
graph.add_edge("build_inputs", "generate")
graph.add_edge("generate", "build_response")
graph.add_edge("build_response", END)
return graph.compile()폴백 흐름은 이렇다.
START → ensure_init → retrieve_children ──ok──→ rerank → map_to_parents ─┐
│ ▼
└──fallback──→ web_search ──────────→ build_inputs
│
▼
generate
│
▼
build_response
│
▼
END
검색 실패라는 한 가지 분기를 함수 안 if가 아니라 그래프 구조로 명시했다. 폴백 경로가 추가되어도 기존 노드는 변경되지 않는다.
7 노드별 단위 테스트
Chain 시대에는 _prepare()를 테스트하려면 prompt·llm·retriever를 전부 mock해야 했다. 분해 후에는 노드 입출력만 mock한다.
def test_build_inputs_node_handles_empty_history():
fake_agent = Mock()
fake_agent._build_chain_inputs.return_value = {
"context": "sample context", "chat_history": "", "question": "Q?",
}
node = make_build_inputs_node(fake_agent)
state = {
"query": Query(text="Q?", history=[]),
"parent_docs": [Document(page_content="...")],
"timings": [], "log_messages": [],
}
result = node(state)
assert result["chain_inputs"]["chat_history"] == ""
assert result["timings"][0]["node"] == "build_inputs"
fake_agent._build_chain_inputs.assert_called_once()
def test_route_after_retrieve_picks_fallback_on_empty():
state = {"child_docs": [], "query": Query(text="Q?", history=[])}
assert route_after_retrieve(state) == "fallback"
def test_route_after_retrieve_picks_ok_on_results():
state = {"child_docs": [Document(page_content="x")], "query": Query(text="Q?", history=[])}
assert route_after_retrieve(state) == "ok"각 노드는 State dict 한 개를 받고 dict 한 개를 반환한다. 외부 의존이 명시적이라 mock 표면적이 좁다.
8 단계별 관찰성 — app.stream() 사용
[timing] 로그를 직접 찍지 않아도 LangGraph가 노드 단위 출력을 토해낸다.
app = build_qna_graph(agent, reranker="cosine")
for chunk in app.stream({"query": query}):
# chunk는 {노드_이름: 노드_반환값} 형태
for node_name, output in chunk.items():
if "timings" in output:
for t in output["timings"]:
print(f"[graph] {node_name}: {t['ms']}ms")출력 예 — 운영 디버깅에서 어느 단계에서 시간을 잃는지 즉시 보인다.
[graph] ensure_init: 2ms
[graph] retrieve_children: 187ms
[graph] rerank: 412ms
[graph] map_to_parents: 8ms
[graph] build_inputs: 1ms
[graph] generate: 2630ms
[graph] build_response: 4ms
토큰 단위 스트리밍은 astream_events()로, LLM 노드의 on_chat_model_stream 이벤트를 SSE로 그대로 흘려보낼 수 있다. 자세한 흐름은 16편 — Checkpointing과 HITL에서 다룬다.
9 분해의 트레이드오프
분해는 공짜가 아니다. 다음 비용을 받아들여야 한다.
| 항목 | Chain | Graph (분해 후) |
|---|---|---|
| 코드 라인 수 | ~80 (_prepare + run) |
~250 (7 노드 + State + 빌더) |
| 함수 시그니처 통일성 | 자유 | state -> dict 강제 |
| 디버깅 — 한 단계 들어가기 | 함수 추적 | 노드 closure + State 추적 |
| 새 단계 추가 | 함수 안 한 줄 | 노드 정의 + 등록 + 엣지 |
| A/B arm | 분기 처리 | 빌더 함수 인자 |
| 실패 폴백 | if 사슬 |
conditional edge |
| 노드별 timing | 수동 로그 | app.stream() 자동 |
| 단위 테스트 | 함수 통째 mock | 노드 단위 mock |
분해는 “수정 빈도가 낮은 코드”보다 “수정 빈도가 높고, 분기·폴백·관찰성이 자라는 코드”에서 회수가 빠르다. MINERVA의 RAG 파이프라인은 후자다.
이 분해는 QnA Chatbot이 단일 LCEL 체인이라 단계가 일직선이라 가능했다. Data Standardizer는 이미 RagRecommender → post_processing/{tables,code} → DomainAuditor(조건부) 4개 sub_agent의 정적 파이프라인이라, 같은 방식으로 평탄화하면 노드가 12개 이상으로 폭증한다(post_processing의 도메인 분류·물리명 부여·표 정렬을 다시 펼치면).
13편 callout에서 정리했듯, Data Standardizer는 (1) sub_agent를 sub-graph로 캡슐화하거나 (2) Phase C-3에서 ToolNode로 흡수하는 두 길이 자연스럽다. 본 14편의 평탄 분해 패턴은 QnA 같은 선형 체인 전용이고, supervisor 패턴 분해는 시리즈 후반부의 에이전트 위임에서 다룬다.
10 자주 발생하는 오류 패턴
def make_retrieve_node(agent):
config = agent.config # 빌더 호출 시점에 캡처
def retrieve_node(state):
return retriever_with(config).invoke(state["query"].text)
return retrieve_nodeCORRECT:
def make_retrieve_node(agent):
def retrieve_node(state):
config = agent.config # 노드 호출 시점에 참조
return retriever_with(config).invoke(state["query"].text)
return retrieve_node빌더 호출 시점에 agent.config를 캡처하면 핫 리로드(설정 변경)가 무시된다. 노드 함수 내부에서 참조해 호출마다 최신 값을 읽는다.
CORRECT:
TypedDict는 정의되지 않은 키를 반환해도 런타임 에러를 내지 않고 조용히 무시한다. State 정의와 노드 반환의 키가 일치하는지는 정적 검사(mypy, pyright) 또는 단위 테스트로 보강해야 한다.
def rerank_node(state):
state["child_docs"].sort(key=lambda d: d.metadata["score"], reverse=True) # 입력 변경
return {"child_docs": state["child_docs"]}CORRECT:
def rerank_node(state):
sorted_docs = sorted(state["child_docs"], key=lambda d: d.metadata["score"], reverse=True)
return {"child_docs": sorted_docs}입력 State를 직접 변경하면 LangGraph의 reducer가 정상 작동하지 않을 수 있다. 새 자료 구조로 반환한다.
11 다음 편 예고 — State 설계의 깊이
이번 편에서는 분해에 필요한 최소 State만 정의했다. 다음 편 15편 — State 설계에서는 다음을 다룬다.
Annotated[..., reducer]패턴: 누적, 덮어쓰기, 커스텀 병합Query/Response같은 Pydantic 객체를 State에 어떻게 보관하는가- 멀티 에이전트 위임 시 부모/자식 그래프의 State 분리 (Subgraph)
- 대화 히스토리, 도구 호출 이력의 State 표현
12 정리
| 항목 | 핵심 |
|---|---|
| 분해 단위 | _prepare() → 7개 일급 노드 (ensure_init, retrieve_children, rerank, map_to_parents, build_inputs, generate, build_response) |
| State 최소 형태 | Query·docs·chain_inputs·answer·response + 누적 timings/log |
| A/B arm | 같은 시그니처의 노드 2개 + 빌더 함수 인자로 교체 |
| 검색 실패 폴백 | conditional edge → web_search 노드 |
| 단위 테스트 | 노드별 State 입출력 mock — 표면적 좁아짐 |
| 관찰성 | app.stream()이 노드 단위 timing 자동 토출 |
분해는 코드 양을 늘리지만 표현력·테스트성·관찰성을 함께 산다. 50명 협업, 150+ 스킬 환경에서 이 비용은 회수된다. 다음 편에서 State 설계 패턴을 깊게 다룬다.
13 관련 주제
선행 학습
후속 주제
- State 설계 — TypedDict와 reducer 패턴
- Checkpointing과 Human-in-the-Loop
- BaseAgent 계약 v2 — LangGraph 호환 인터페이스
다른 카테고리 연결
- LangGraph Building Graphs — 그래프 빌드의 일반 절차
- LangGraph Naive RAG — 본 7노드 분해와 비교 가능한 기본형
- LangGraph Branching — Conditional Edge 심화 (검색 실패 폴백 등)
- LangGraph Adaptive RAG — 폴백·재검색 패턴
- LangChain to LangGraph 마이그레이션 — 일반적 전환 가이드