MINERVA RAG Chain 분해

_prepare() 하나를 7개 LangGraph 노드로 갈라 놓기

Phase B의 LCEL Chain은 _prepare() 한 함수 안에 검색·리랭크·Parent 매핑·컨텍스트 조립·생성·응답 후처리가 묶여 있다. 이 함수를 LangGraph의 일급 노드로 분해해 단위 테스트, 검색 실패 폴백, A/B arm 교체, 단계별 관찰성을 어떻게 얻는지 코드로 정리한다. 13편(LangGraph 기초)의 매핑을 실제 분해 코드로 옮기고, 15편(State 설계) 직전의 실용 단계다.

Agent
저자

Kwangmin Kim

공개

2026년 05월 06일

1 분해의 목표

13편에서 LangGraph가 무엇이고 왜 필요한지를 다뤘다. 이번 편은 그 이론을 MINERVA의 실제 코드에 적용한다.

분해 후 얻는 것은 네 가지다.

  1. 노드별 단위 테스트_prepare() 한 함수를 통째로 mock하지 않고, 노드 입출력만 mock하면 된다.
  2. 검색 실패 폴백retrieve_children 노드가 빈 결과를 반환할 때 conditional edge로 web search 노드로 우회한다.
  3. A/B arm을 노드 교체로 표현 — Reranker A/B를 두 가지 노드(rerank_cosine, rerank_flashrank)로 두고 arm에 따라 컴파일된 그래프를 바꾼다.
  4. 단계별 관찰성app.stream() 또는 astream_events()가 노드별 입출력을 자동으로 토해낸다. [timing] 로그를 직접 찍을 필요가 없다.
선행 학습

2 분해 전 — 현재 _prepare() 흐름

src/agents/qna_chatbot/agent.py의 핵심부는 다음과 같다.

def _prepare(self, query: Query):
    self._ensure_initialized()                                     # (1) lazy init
    document_id = query.agent_params.get("document_id", ...)
    source_filter = query.agent_params.get("source_filter")
    docs = self._retrieve_docs(query.text, document_id, source_filter=source_filter)  # (2)~(4)
    chain_inputs = self._build_chain_inputs(query, docs)            # (5)
    llm, model_used = self._get_llm_for_query(query)
    chain = self._prompt | llm | StrOutputParser()
    return docs, chain, chain_inputs, model_used

def run(self, query: Query) -> Response:
    docs, chain, chain_inputs, model_used = self._prepare(query)
    answer = chain.invoke(chain_inputs)                             # (6) generate
    return self._build_response(answer, docs, query, ...)           # (7) response

_retrieve_docs()는 안에서 다시 세 단계로 갈린다 (src/core/rag/retriever.py).

def similarity_search(self, query, k=4, **kwargs):
    child_docs = self.vectorstore.similarity_search(query, k=k * 2, **kwargs)  # (2) child 검색
    if self._reranker_type in ("flashrank", "cross_encoder"):
        child_docs = self._rerank_children(child_docs, query)                  # (3) rerank
    elif self._embeddings is not None:
        child_docs = self._compute_cosine_scores(child_docs, query)            # (3') cosine
    parents = self._map_to_parents_with_child_score(child_docs, k)             # (4) parent 매핑
    return parents

따라서 7개 단계가 함수 안에 누적되어 있다. 단계마다 입출력 타입은 다음과 같다.

단계 입력 출력 책임
(1) ensure_init (없음) (없음) prompt·LLM·retriever lazy 로드
(2) retrieve_children str list[Document] (child, k2~k3개) hybrid/mmr/similarity 검색
(3) rerank list[Document] list[Document] (child + score) flashrank 또는 cosine
(4) map_to_parents list[Document] (child) list[Document] (parent, k개) parent dedup + score 전파
(5) build_inputs Query, list[Document] dict context 직렬화 + chat_history 포맷
(6) generate dict str LLM 호출
(7) build_response str, list[Document], Query Response 인용 필터 + Pydantic 조립

각 행이 LangGraph의 일급 노드 후보다.

3 State 정의 — 단계 입출력을 한 자료 구조로

먼저 노드 사이를 흘러다닐 State를 정의한다. State 설계의 깊은 패턴은 15편에서 다루므로, 여기서는 분해에 필요한 최소 형태만 둔다.

from typing import Annotated, TypedDict
from operator import add
from langchain_core.documents import Document
from core.contracts import Query, Response


class QnaState(TypedDict):
    # 입력 — 그래프 시작 시 채워짐
    query: Query

    # 검색 단계 누적
    child_docs: list[Document]
    parent_docs: list[Document]

    # 생성 단계
    chain_inputs: dict
    answer: str

    # 출력
    response: Response

    # 관찰성 — 노드가 이어붙임
    timings: Annotated[list[dict], add]
    log_messages: Annotated[list[str], add]

Annotated[..., add]로 표시한 두 필드는 노드 반환값이 누적된다. 나머지는 가장 최근 반환값으로 덮어쓴다.

4 노드 분해

각 노드를 state -> dict 시그니처로 작성한다. 모든 노드는 agent 인스턴스를 closure로 캡처한다 — 클래스 메서드 자원을 그대로 재사용하기 위해서다.

4.1 ensure_init_node

import time

def make_ensure_init_node(agent):
    def ensure_init_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        agent._ensure_initialized()
        return {
            "timings": [{"node": "ensure_init", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": ["initialized"],
        }
    return ensure_init_node

이미 _ensure_initialized()가 idempotent하므로 별도 가드는 필요 없다.

4.2 retrieve_children_node

def make_retrieve_children_node(agent):
    def retrieve_children_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        query = state["query"]
        document_id = query.agent_params.get("document_id", agent._default_document_id)
        source_filter = query.agent_params.get("source_filter")

        retriever = agent._get_retriever(document_id)
        k = agent.config.retrieval.k
        fetch_k = k * 3 if source_filter and source_filter != "all" else k

        if agent.config.retrieval.search_type == "hybrid":
            child_docs = retriever.vectorstore.hybrid_search(query=query.text, k=fetch_k * 2)
        elif agent.config.retrieval.search_type == "mmr":
            child_docs = retriever.vectorstore.max_marginal_relevance_search(
                query=query.text, k=fetch_k * 2,
                lambda_mult=agent.config.retrieval.mmr_diversity,
            )
        else:
            child_docs = retriever.vectorstore.similarity_search(query=query.text, k=fetch_k * 2)

        return {
            "child_docs": child_docs,
            "timings": [{"node": "retrieve_children", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": [f"retrieved {len(child_docs)} child docs"],
        }
    return retrieve_children_node

기존 _retrieve_docs()에서 rerank·parent 매핑을 떼어 내고, child 검색만 남긴다.

4.3 rerank_node — 두 가지 변형

A/B 실험을 노드 교체로 표현하는 핵심 부분이다. 같은 시그니처의 노드 두 개를 만든다.

def make_rerank_flashrank_node(agent):
    def rerank_flashrank_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        retriever = agent._get_retriever(state["query"].agent_params.get("document_id"))
        reranked = retriever._rerank_children(state["child_docs"], state["query"].text)
        return {
            "child_docs": reranked,
            "timings": [{"node": "rerank_flashrank", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": [f"flashrank reranked {len(reranked)} docs"],
        }
    return rerank_flashrank_node


def make_rerank_cosine_node(agent):
    def rerank_cosine_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        retriever = agent._get_retriever(state["query"].agent_params.get("document_id"))
        scored = retriever._compute_cosine_scores(state["child_docs"], state["query"].text)
        return {
            "child_docs": scored,
            "timings": [{"node": "rerank_cosine", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": [f"cosine scored {len(scored)} docs"],
        }
    return rerank_cosine_node

두 노드는 같은 입력을 받고 같은 형태의 출력을 낸다. 그래프 컴파일 시점에 어느 쪽을 등록할지만 바꾸면 된다.

4.4 map_to_parents_node

def make_map_to_parents_node(agent):
    def map_to_parents_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        retriever = agent._get_retriever(state["query"].agent_params.get("document_id"))
        k = agent.config.retrieval.k
        parents = retriever._map_to_parents_with_child_score(state["child_docs"], k)
        return {
            "parent_docs": parents,
            "timings": [{"node": "map_to_parents", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": [f"mapped to {len(parents)} parents"],
        }
    return map_to_parents_node

4.5 build_inputs_node

def make_build_inputs_node(agent):
    def build_inputs_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        chain_inputs = agent._build_chain_inputs(state["query"], state["parent_docs"])
        return {
            "chain_inputs": chain_inputs,
            "timings": [{"node": "build_inputs", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": [
                f"context={len(chain_inputs['context'])}c "
                f"history={len(chain_inputs['chat_history'])}c"
            ],
        }
    return build_inputs_node

기존 _build_chain_inputs()를 그대로 재사용한다 — Phase 10.84에서 추출해 둔 덕분에 LCEL 의존이 없다.

4.6 generate_node

from langchain_core.output_parsers import StrOutputParser

def make_generate_node(agent):
    def generate_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        llm, model_used = agent._get_llm_for_query(state["query"])
        chain = agent._prompt | llm | StrOutputParser()
        answer = chain.invoke(state["chain_inputs"])
        return {
            "answer": answer,
            "timings": [{
                "node": "generate",
                "ms": int((time.perf_counter() - t0) * 1000),
                "model": model_used,
            }],
            "log_messages": [f"generated {len(answer)}c answer"],
        }
    return generate_node

4.7 build_response_node

def make_build_response_node(agent):
    def build_response_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        total_ms = sum(t["ms"] for t in state["timings"])
        response = agent._build_response(
            state["answer"],
            state["parent_docs"],
            state["query"],
            latency_ms=total_ms,
            ttft_ms=total_ms,
            model_used=next((t.get("model") for t in state["timings"] if t.get("model")), None),
        )
        return {
            "response": response,
            "timings": [{"node": "build_response", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": [f"response built with {len(response.citations)} citations"],
        }
    return build_response_node

5 그래프 조립

A/B arm에 따라 다른 그래프를 컴파일하는 빌더 함수를 만든다.

from langgraph.graph import StateGraph, START, END


def build_qna_graph(agent, reranker: str = "cosine"):
    graph = StateGraph(QnaState)

    graph.add_node("ensure_init", make_ensure_init_node(agent))
    graph.add_node("retrieve_children", make_retrieve_children_node(agent))

    if reranker == "flashrank":
        graph.add_node("rerank", make_rerank_flashrank_node(agent))
    else:
        graph.add_node("rerank", make_rerank_cosine_node(agent))

    graph.add_node("map_to_parents", make_map_to_parents_node(agent))
    graph.add_node("build_inputs", make_build_inputs_node(agent))
    graph.add_node("generate", make_generate_node(agent))
    graph.add_node("build_response", make_build_response_node(agent))

    graph.add_edge(START, "ensure_init")
    graph.add_edge("ensure_init", "retrieve_children")
    graph.add_edge("retrieve_children", "rerank")
    graph.add_edge("rerank", "map_to_parents")
    graph.add_edge("map_to_parents", "build_inputs")
    graph.add_edge("build_inputs", "generate")
    graph.add_edge("generate", "build_response")
    graph.add_edge("build_response", END)

    return graph.compile()

A/B 실험 arm 결정 후 호출은 다음과 같이 바뀐다.

# Phase B (Chain 시대)
agent = QnaChatbotAgent(...)
response = agent.run(query)

# Phase C-2 (Graph 시대)
app = build_qna_graph(agent, reranker=arm_config.reranker)
final_state = app.invoke({"query": query})
response = final_state["response"]

6 검색 실패 폴백 — Conditional Edge

retrieve_children이 빈 결과를 반환할 때 web search 노드로 우회한다. Chain에서는 함수 안 if 사슬로 처리해야 했지만, 그래프에서는 conditional edge로 표현된다.

def make_web_search_node(web_tool):
    def web_search_node(state: QnaState) -> dict:
        t0 = time.perf_counter()
        results = web_tool.invoke(state["query"].text)
        docs = [Document(page_content=r["snippet"], metadata={"source_name": "web"}) for r in results[:5]]
        return {
            "child_docs": docs,
            "parent_docs": docs,        # web 결과는 child=parent
            "timings": [{"node": "web_search", "ms": int((time.perf_counter() - t0) * 1000)}],
            "log_messages": [f"web fallback: {len(docs)} results"],
        }
    return web_search_node


def route_after_retrieve(state: QnaState) -> str:
    if not state["child_docs"]:
        return "fallback"
    return "ok"


def build_qna_graph_with_fallback(agent, web_tool, reranker="cosine"):
    graph = StateGraph(QnaState)
    # ... (이전과 동일하게 노드 등록)
    graph.add_node("web_search", make_web_search_node(web_tool))

    graph.add_edge(START, "ensure_init")
    graph.add_edge("ensure_init", "retrieve_children")
    graph.add_conditional_edges(
        "retrieve_children",
        route_after_retrieve,
        {"ok": "rerank", "fallback": "web_search"},
    )
    graph.add_edge("rerank", "map_to_parents")
    graph.add_edge("web_search", "build_inputs")    # rerank·parent 우회
    graph.add_edge("map_to_parents", "build_inputs")
    graph.add_edge("build_inputs", "generate")
    graph.add_edge("generate", "build_response")
    graph.add_edge("build_response", END)
    return graph.compile()

폴백 흐름은 이렇다.

START → ensure_init → retrieve_children ──ok──→ rerank → map_to_parents ─┐
                              │                                          ▼
                              └──fallback──→ web_search ──────────→ build_inputs
                                                                          │
                                                                          ▼
                                                                       generate
                                                                          │
                                                                          ▼
                                                                     build_response
                                                                          │
                                                                          ▼
                                                                         END

검색 실패라는 한 가지 분기를 함수 안 if가 아니라 그래프 구조로 명시했다. 폴백 경로가 추가되어도 기존 노드는 변경되지 않는다.

7 노드별 단위 테스트

Chain 시대에는 _prepare()를 테스트하려면 prompt·llm·retriever를 전부 mock해야 했다. 분해 후에는 노드 입출력만 mock한다.

def test_build_inputs_node_handles_empty_history():
    fake_agent = Mock()
    fake_agent._build_chain_inputs.return_value = {
        "context": "sample context", "chat_history": "", "question": "Q?",
    }
    node = make_build_inputs_node(fake_agent)

    state = {
        "query": Query(text="Q?", history=[]),
        "parent_docs": [Document(page_content="...")],
        "timings": [], "log_messages": [],
    }
    result = node(state)

    assert result["chain_inputs"]["chat_history"] == ""
    assert result["timings"][0]["node"] == "build_inputs"
    fake_agent._build_chain_inputs.assert_called_once()


def test_route_after_retrieve_picks_fallback_on_empty():
    state = {"child_docs": [], "query": Query(text="Q?", history=[])}
    assert route_after_retrieve(state) == "fallback"


def test_route_after_retrieve_picks_ok_on_results():
    state = {"child_docs": [Document(page_content="x")], "query": Query(text="Q?", history=[])}
    assert route_after_retrieve(state) == "ok"

각 노드는 State dict 한 개를 받고 dict 한 개를 반환한다. 외부 의존이 명시적이라 mock 표면적이 좁다.

8 단계별 관찰성 — app.stream() 사용

[timing] 로그를 직접 찍지 않아도 LangGraph가 노드 단위 출력을 토해낸다.

app = build_qna_graph(agent, reranker="cosine")

for chunk in app.stream({"query": query}):
    # chunk는 {노드_이름: 노드_반환값} 형태
    for node_name, output in chunk.items():
        if "timings" in output:
            for t in output["timings"]:
                print(f"[graph] {node_name}: {t['ms']}ms")

출력 예 — 운영 디버깅에서 어느 단계에서 시간을 잃는지 즉시 보인다.

[graph] ensure_init: 2ms
[graph] retrieve_children: 187ms
[graph] rerank: 412ms
[graph] map_to_parents: 8ms
[graph] build_inputs: 1ms
[graph] generate: 2630ms
[graph] build_response: 4ms

토큰 단위 스트리밍은 astream_events()로, LLM 노드의 on_chat_model_stream 이벤트를 SSE로 그대로 흘려보낼 수 있다. 자세한 흐름은 16편 — Checkpointing과 HITL에서 다룬다.

9 분해의 트레이드오프

분해는 공짜가 아니다. 다음 비용을 받아들여야 한다.

항목 Chain Graph (분해 후)
코드 라인 수 ~80 (_prepare + run) ~250 (7 노드 + State + 빌더)
함수 시그니처 통일성 자유 state -> dict 강제
디버깅 — 한 단계 들어가기 함수 추적 노드 closure + State 추적
새 단계 추가 함수 안 한 줄 노드 정의 + 등록 + 엣지
A/B arm 분기 처리 빌더 함수 인자
실패 폴백 if 사슬 conditional edge
노드별 timing 수동 로그 app.stream() 자동
단위 테스트 함수 통째 mock 노드 단위 mock

분해는 “수정 빈도가 낮은 코드”보다 “수정 빈도가 높고, 분기·폴백·관찰성이 자라는 코드”에서 회수가 빠르다. MINERVA의 RAG 파이프라인은 후자다.

Data Standardizer는 같은 분해를 적용하기 어렵다

이 분해는 QnA Chatbot이 단일 LCEL 체인이라 단계가 일직선이라 가능했다. Data Standardizer는 이미 RagRecommenderpost_processing/{tables,code}DomainAuditor(조건부) 4개 sub_agent의 정적 파이프라인이라, 같은 방식으로 평탄화하면 노드가 12개 이상으로 폭증한다(post_processing의 도메인 분류·물리명 부여·표 정렬을 다시 펼치면).

13편 callout에서 정리했듯, Data Standardizer는 (1) sub_agent를 sub-graph로 캡슐화하거나 (2) Phase C-3에서 ToolNode로 흡수하는 두 길이 자연스럽다. 본 14편의 평탄 분해 패턴은 QnA 같은 선형 체인 전용이고, supervisor 패턴 분해는 시리즈 후반부의 에이전트 위임에서 다룬다.

10 자주 발생하는 오류 패턴

WRONG:

def make_retrieve_node(agent):
    config = agent.config        # 빌더 호출 시점에 캡처
    def retrieve_node(state):
        return retriever_with(config).invoke(state["query"].text)
    return retrieve_node

CORRECT:

def make_retrieve_node(agent):
    def retrieve_node(state):
        config = agent.config    # 노드 호출 시점에 참조
        return retriever_with(config).invoke(state["query"].text)
    return retrieve_node

빌더 호출 시점에 agent.config를 캡처하면 핫 리로드(설정 변경)가 무시된다. 노드 함수 내부에서 참조해 호출마다 최신 값을 읽는다.

WRONG:

def retrieve_children_node(state):
    return {"child_doc": [...]}        # State 정의에는 child_docs (s 빠짐)

CORRECT:

def retrieve_children_node(state):
    return {"child_docs": [...]}       # State 키와 일치

TypedDict는 정의되지 않은 키를 반환해도 런타임 에러를 내지 않고 조용히 무시한다. State 정의와 노드 반환의 키가 일치하는지는 정적 검사(mypy, pyright) 또는 단위 테스트로 보강해야 한다.

WRONG:

def rerank_node(state):
    state["child_docs"].sort(key=lambda d: d.metadata["score"], reverse=True)  # 입력 변경
    return {"child_docs": state["child_docs"]}

CORRECT:

def rerank_node(state):
    sorted_docs = sorted(state["child_docs"], key=lambda d: d.metadata["score"], reverse=True)
    return {"child_docs": sorted_docs}

입력 State를 직접 변경하면 LangGraph의 reducer가 정상 작동하지 않을 수 있다. 새 자료 구조로 반환한다.

11 다음 편 예고 — State 설계의 깊이

이번 편에서는 분해에 필요한 최소 State만 정의했다. 다음 편 15편 — State 설계에서는 다음을 다룬다.

  • Annotated[..., reducer] 패턴: 누적, 덮어쓰기, 커스텀 병합
  • Query/Response 같은 Pydantic 객체를 State에 어떻게 보관하는가
  • 멀티 에이전트 위임 시 부모/자식 그래프의 State 분리 (Subgraph)
  • 대화 히스토리, 도구 호출 이력의 State 표현

12 정리

항목 핵심
분해 단위 _prepare() → 7개 일급 노드 (ensure_init, retrieve_children, rerank, map_to_parents, build_inputs, generate, build_response)
State 최소 형태 Query·docs·chain_inputs·answer·response + 누적 timings/log
A/B arm 같은 시그니처의 노드 2개 + 빌더 함수 인자로 교체
검색 실패 폴백 conditional edge → web_search 노드
단위 테스트 노드별 State 입출력 mock — 표면적 좁아짐
관찰성 app.stream()이 노드 단위 timing 자동 토출

분해는 코드 양을 늘리지만 표현력·테스트성·관찰성을 함께 산다. 50명 협업, 150+ 스킬 환경에서 이 비용은 회수된다. 다음 편에서 State 설계 패턴을 깊게 다룬다.

13 관련 주제

선행 학습

후속 주제

다른 카테고리 연결

Subscribe

Enjoy this blog? Get notified of new posts by email: