MINERVA Checkpointing과 Human-in-the-Loop

State 영속화로 중단·재개·사람 개입을 일급으로

잘 정의된 State는 “영속화”와 “사람 개입”의 토대다. LangGraph Checkpointer로 노드 단위 State를 저장·재개하고, interrupt로 사람 검토 지점을 그래프에 박는 방법을 정리한다. MINERVA의 Data Standardizer 승인 흐름, QnaChatbot의 인용 부족 검토, A/B 실험과 thread 통합까지 다룬다.

Agent
저자

Kwangmin Kim

공개

2026년 05월 06일

1 왜 Checkpointing과 HITL이 필요한가

15편에서 State를 잘 설계하면 노드 분해의 비용이 회수된다고 했다. State가 잘 정의되면 다음 두 능력이 자연스럽게 따라온다.

  1. Checkpointing — State를 외부 저장소에 영속화해 그래프 실행을 끊었다 이을 수 있다. 긴 LLM 호출·외부 API 실패에서 처음부터 다시 돌리지 않아도 된다.
  2. Human-in-the-Loop (HITL) — 특정 노드 직전·직후에서 그래프를 멈추고, 사람이 State를 검토·수정한 뒤 재개할 수 있다.

MINERVA의 두 에이전트는 모두 HITL이 필요하다.

에이전트 HITL 시나리오
Data Standardizer LLM이 제안한 표준 후보를 운영자가 검토·승인 후 마스터 데이터에 반영
QnA Chatbot 인용이 부족한 답변을 발화 데이터 분석가가 라벨링 — 후속 학습 데이터로
공통 — A/B 실험 신규 arm을 소규모 트래픽으로 노출 시 실패 답변 인간 검토

이 글에서는 Checkpointer 기초부터 MINERVA 적용까지를 코드 단위로 정리한다.

현재 MINERVA는 Checkpointer 미적용

이 글의 적용 코드는 Phase 11 LangGraph 전환 후 도입할 청사진이다. 현 시점에는 그래프 자체가 없으니 Checkpointer도 없다. 대신 09편에서 본 runs.jsonl(응답 기록)과 feedback.jsonl(사용자 피드백)이 부분적 감사 역할을 한다. 두 시스템의 책임 차이는 다음과 같다:

시스템 단위 목적 가시성
runs.jsonl / feedback.jsonl 요청 1건 사후 분석·메트릭·실험 응답 시점 1회 + 피드백 1회
LangGraph Checkpointer 노드 1단계 중단·재개·HITL·되돌림 노드마다 자동

Checkpointer는 응답 단위 로그를 대체하지 않고 보완한다. 노드별 스냅샷이 디스크 부담을 키우므로, 전환 시 runs.jsonl은 운영 메트릭 채널로 유지하고 Checkpointer는 HITL이 필요한 thread에만 활성화하는 방향이 합리적이다.

선행 학습

2 Checkpointer 기초

2.1 무엇을 저장하는가

Checkpointer는 노드 실행 직후의 State 스냅샷을 저장한다. 그래프가 다음 노드로 넘어가기 직전에 자동으로 한 번씩 저장된다. 저장 단위는 두 키로 식별된다.

의미
thread_id 한 사용자 또는 한 대화 세션 (멀티턴 묶음)
checkpoint_id 그래프 실행 중 한 시점의 스냅샷 ID (자동 생성, ULID)

같은 thread_id 안에 여러 checkpoint_id가 시간 순으로 쌓인다. 재개·되돌림이 모두 가능하다.

2.2 Checkpointer 종류

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
종류 용도 특성
MemorySaver 개발·테스트 프로세스 재시작 시 사라짐
SqliteSaver 단일 노드 PoC, 로컬 운영 파일 기반, 동시성 제한
PostgresSaver 멀티 인스턴스 프로덕션 동시 접근, 영속성, 운영 도구

MINERVA는 현재 단일 컨테이너로 운영되지만 50명·1000명 단계에서는 PostgresSaver로 가야 한다. 마이그레이션 비용은 connection string 한 줄이다.

2.3 기본 사용

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph

memory = MemorySaver()
app = graph.compile(checkpointer=memory)

# thread_id를 config에 담아 호출
config = {"configurable": {"thread_id": "user-anon-9d3f2a"}}
result = app.invoke({"query": query}, config=config)

# 같은 thread_id로 다시 호출 — 이전 State에 이어서 실행 가능
next_result = app.invoke({"query": next_query}, config=config)

thread_id만 같으면 같은 State 스레드로 인식한다. MINERVA에서는 user_id를 그대로 쓰거나, 대화 단위로 conversation_id를 발급해 쓴다.

3 멀티턴 대화 — Checkpointer로 자연스럽게

15편에서 본 두 가지 히스토리 보관 방식 중 “방식 B (MessagesState)”는 Checkpointer와 자연스럽게 어울린다.

from langgraph.graph import MessagesState, StateGraph, START, END
from langchain_core.messages import HumanMessage, AIMessage


class ChatState(MessagesState):
    parent_docs: list[Document]


def retrieve_node(state: ChatState) -> dict:
    last_user_msg = state["messages"][-1].content
    docs = retriever.invoke(last_user_msg)
    return {"parent_docs": docs}


def generate_node(state: ChatState) -> dict:
    context = "\n\n".join(d.page_content for d in state["parent_docs"])
    answer = llm.invoke([
        *state["messages"],
        HumanMessage(content=f"위 대화에 대해 다음 문서를 참고하여 답변하라.\n\n{context}"),
    ])
    return {"messages": [AIMessage(content=answer.content)]}


graph = StateGraph(ChatState)
graph.add_node("retrieve", retrieve_node)
graph.add_node("generate", generate_node)
graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "generate")
graph.add_edge("generate", END)

app = graph.compile(checkpointer=MemorySaver())

config = {"configurable": {"thread_id": "session-001"}}

# 첫 번째 턴
app.invoke({"messages": [HumanMessage(content="MINERVA가 뭐야?")]}, config=config)

# 두 번째 턴 — 이전 messages를 자동으로 이어받음
app.invoke({"messages": [HumanMessage(content="그럼 사용 사례는?")]}, config=config)

두 번째 호출에서 입력은 새 메시지 한 개만 넘긴다. Checkpointer가 이전 State를 불러오고, add_messages reducer가 새 메시지를 누적한다. 09편 — 상태 관리 해부에서 본 “프론트엔드가 history 전체를 매번 보냄” 패턴을 백엔드 Checkpointer로 옮길 수 있다.

4 interrupt — 노드 경계에서 멈추기

HITL의 핵심은 “사람이 검토할 지점”을 그래프 차원에서 명시하는 것이다. LangGraph는 두 가지 방식을 제공한다.

4.1 컴파일 시점에 정적 지정

app = graph.compile(
    checkpointer=memory,
    interrupt_before=["generate"],         # generate 노드 직전에 멈춤
    # interrupt_after=["build_response"],  # 또는 직후에 멈춤
)

4.2 실행 시점에 동적 지정 — Command(goto="...") 또는 interrupt()

LangGraph 0.2+에서는 노드 안에서 interrupt() 함수를 호출해 특정 입력이 들어올 때만 멈추도록 할 수 있다.

from langgraph.errors import GraphInterrupt

def review_node(state: AgentState) -> dict:
    if state["confidence"] < 0.6:
        raise GraphInterrupt({"reason": "low_confidence", "answer": state["answer"]})
    return {"approved": True}

이 노드는 confidence가 낮을 때만 그래프를 멈춘다. 호출자는 GraphInterrupt를 잡아 사람에게 검토를 요청하고, State를 업데이트한 뒤 재개한다.

5 HITL 흐름 — 멈춤·검토·재개의 사이클

[client] ──invoke──→ [graph] ──→ retrieve ──→ rerank ──→ ... ──→ [interrupt before generate]
                                                                              │
                                                                              ▼
                                                                        [멈춘 상태]
                                                                              │
                                                       사람이 State 검토·수정
                                                                              │
                                                                              ▼
[client] ──update_state──→ [graph] ──invoke(None)──→ generate ──→ build_response ──→ END

코드로 풀면 다음과 같다.

config = {"configurable": {"thread_id": "review-001"}}

# 1. 첫 invoke — interrupt 지점까지 실행
result = app.invoke({"query": query}, config=config)
# generate 직전에 멈춤 — result는 chain_inputs까지 채워진 상태

# 2. 사람이 검토 — 현재 State 조회
state_snapshot = app.get_state(config)
print(state_snapshot.values["chain_inputs"]["context"])
# 운영자가 컨텍스트를 보고 일부 문서를 제외하고 싶다고 판단

# 3. State 수정 — 사람이 결정한 변경 반영
app.update_state(config, {"chain_inputs": modified_inputs})

# 4. 재개 — 입력으로 None을 넘기면 멈춘 지점부터 이어 실행
final = app.invoke(None, config=config)
print(final["response"])

get_stateupdate_stateinvoke(None)이 HITL의 표준 사이클이다. 사람이 State를 수정한 흔적은 Checkpointer에 새 checkpoint_id로 기록되어 감사 추적이 자연스럽게 된다.

6 MINERVA 적용 — Data Standardizer 승인 흐름

가장 명확한 HITL 사례는 표준화 에이전트다. LLM이 제안한 표준 후보를 운영자가 승인해야 마스터 데이터에 반영된다.

class StandardizerState(TypedDict):
    raw_term: str                       # 입력 — 원시 용어
    candidates: list[dict]              # LLM이 제안한 표준 후보들
    approved: Optional[dict]            # 운영자가 선택한 후보
    persisted: bool                     # 마스터 DB 반영 여부


def propose_candidates_node(state):
    candidates = llm_propose(state["raw_term"])     # LLM 호출
    return {"candidates": candidates}


def persist_node(state):
    write_to_master(state["approved"])
    return {"persisted": True}


graph = StateGraph(StandardizerState)
graph.add_node("propose", propose_candidates_node)
graph.add_node("persist", persist_node)
graph.add_edge(START, "propose")
graph.add_edge("propose", "persist")
graph.add_edge("persist", END)

app = graph.compile(
    checkpointer=PostgresSaver.from_conn_string(connection_string),
    interrupt_before=["persist"],         # 마스터 DB 반영 직전에 멈춤
)

운영 흐름은 다음과 같다.

1. POST /standardize { "raw_term": "고객명" }
   → 백엔드: app.invoke({...}, config={thread_id})
   → propose 노드 실행 → persist 직전에 interrupt
   → 응답: { "candidates": [...], "thread_id": "..." }

2. 운영자가 UI에서 후보 중 하나 선택
   → POST /standardize/approve { "thread_id": "...", "approved": {...} }
   → 백엔드: app.update_state(config, {"approved": chosen})
            app.invoke(None, config=config)
   → persist 노드 실행 → END
   → 응답: { "persisted": true }

핵심은 interrupt_before=["persist"] 한 줄로 “DB 반영은 사람 승인 후”라는 안전 정책이 그래프에 박힌다는 점이다. 코드 어디에도 if approved: write() 같은 분기를 직접 쓸 필요가 없다.

7 QnaChatbot 적용 — 인용 부족 시 검토

QnaChatbot에서는 인용이 부족한 답변을 발화 데이터 분석가가 검토하도록 만들 수 있다.

def grade_citations_node(state: QnaState) -> dict:
    cited = extract_cited_indices(state["answer"])
    if len(cited) < 2 and state["query"].agent_params.get("review_low_citation", False):
        # 분석 모드 트래픽에 한해 인터럽트
        raise GraphInterrupt({
            "reason": "low_citation",
            "answer": state["answer"],
            "candidate_docs": [d.page_content for d in state["parent_docs"]],
        })
    return {"citation_count": len(cited)}

이 노드는 운영 일반 트래픽에서는 그냥 통과하고, review_low_citation 플래그가 켜진 분석용 트래픽에서만 멈춘다. 분석가는 답변·후보 문서를 보고 라벨을 달거나 답변을 수정하고, 그 결과는 Phase C-5(발화 데이터 분석)의 학습 데이터로 흘러간다.

8 A/B 실험과 thread 통합

06편 — A/B 실험 프레임워크에서 sticky_hash로 사용자별 arm을 결정했다. Checkpointer 환경에서는 thread_id에 arm 정보를 함께 담아 두면 재개 시에도 동일 arm이 유지된다.

# 호출 시점에 arm 결정
arm_id = sticky_hash_assign(user_id, experiment="reranker_ab_test")

config = {
    "configurable": {
        "thread_id": user_id,
        "experiment": "reranker_ab_test",
        "arm": arm_id,
    }
}

# 그래프는 arm에 따라 다르게 컴파일된 인스턴스를 사용
app = build_qna_graph_with_arm(agent, arm=arm_id, checkpointer=memory)
result = app.invoke({"query": query}, config=config)

같은 thread를 다시 호출할 때 arm이 바뀌면 안 된다. arm 일관성은 thread_id + 별도 매핑(redis 또는 사용자 메타)으로 보장한다. MINERVA의 06편이 사용하는 sticky_hash는 deterministic이므로 arm 매핑을 별도 저장하지 않아도 같은 user_id면 같은 arm이 결정된다.

A/B 분기를 conditional edge로 빼면 그래프 한 개가 모든 arm을 표현할 수도 있다.

def route_by_arm(state, config):
    arm = config["configurable"]["arm"]
    return f"rerank_{arm}"     # rerank_cosine 또는 rerank_flashrank


graph.add_node("rerank_cosine", make_rerank_cosine_node(agent))
graph.add_node("rerank_flashrank", make_rerank_flashrank_node(agent))
graph.add_conditional_edges(
    "retrieve_children",
    route_by_arm,
    {"rerank_cosine": "rerank_cosine", "rerank_flashrank": "rerank_flashrank"},
)
graph.add_edge("rerank_cosine", "map_to_parents")
graph.add_edge("rerank_flashrank", "map_to_parents")

14편의 “빌더 함수에서 노드를 바꿔 끼우기”와 차이점은 다음과 같다.

방식 컴파일 단위 변경 비용 관찰성
빌더 함수 (14편) arm마다 다른 그래프 인스턴스 arm 추가 시 빌더 분기 arm별 독립 실행 흔적
Conditional Edge 그래프 한 개 + 라우팅 arm 추가 시 노드 + 매핑 한 줄 thread당 라우팅 흔적이 State에 남음

소규모 실험은 빌더, 다수 arm·동적 추가는 conditional edge가 자연스럽다.

9 운영 고려 — TTL, 동시성, 정리

Checkpointer는 “그냥 켜면 다 되는” 도구가 아니다. 다음을 결정한다.

9.1 TTL (Time-to-Live)

활성 thread는 보존하지만 영구히 둘 수는 없다. 정책 예시.

데이터 TTL 근거
진행 중 thread 7일 일주일 안에 재개되지 않으면 죽은 thread
완료 thread (END 도달) 30일 디버깅·감사
HITL 대기 thread 30일 운영자 처리 시간 여유
실패 thread 90일 사후 분석 (postmortem)

PostgresSaver는 직접 정리 작업을 제공하지 않는다. cron job으로 checkpoints 테이블을 주기적으로 청소한다.

9.2 동시성 — 같은 thread 동시 호출 방지

같은 thread_id로 동시에 두 호출이 들어오면 race condition이 생긴다. MINERVA는 다음 두 가지로 보호한다.

  • 프론트엔드 — 중복 클릭 방지: 메시지 전송 후 입력란 비활성화 (이미 09편 참조).
  • 백엔드 — thread 락: Postgres advisory lock 또는 in-memory dict[thread_id, asyncio.Lock]. 두 번째 요청은 첫 번째가 끝날 때까지 대기.

9.3 마이그레이션 — Memory에서 Postgres로

# 개발
checkpointer = MemorySaver()

# 운영
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(os.environ["POSTGRES_URL"])
checkpointer.setup()        # 첫 실행 시 테이블 생성

코드 변경은 두 줄이다. 다만 운영 환경에서는 connection pool, retry, telemetry를 추가로 감싸야 한다.

10 자주 발생하는 오류 패턴

WRONG:

app = graph.compile(checkpointer=memory)
result = app.invoke({"query": query})           # config 없이 호출

CORRECT:

app = graph.compile(checkpointer=memory)
config = {"configurable": {"thread_id": user_id}}
result = app.invoke({"query": query}, config=config)

Checkpointer를 등록한 그래프는 thread_id 없이 호출하면 ValueError를 발생시킨다. config에 반드시 thread_id를 담는다.

WRONG:

# interrupt에서 멈춘 후 재개
final = app.invoke({"query": query}, config=config)   # 입력을 다시 넘김

CORRECT:

# interrupt에서 멈춘 후 재개 — None을 넘김
final = app.invoke(None, config=config)

재개 시 입력에 새 값을 넘기면 그래프는 처음부터 다시 시작한다. 멈춘 지점부터 이어 실행하려면 None을 넘긴다.

WRONG:

# log_messages가 Annotated[list, add]인데 update_state로 통째로 교체 시도
app.update_state(config, {"log_messages": ["수정됨"]})
# reducer가 작동해 [...이전 + "수정됨"]이 됨 — 의도와 다를 수 있음

CORRECT:

# 의도: log_messages 전체 교체
app.update_state(config, {"log_messages": ["수정됨"]}, as_node="manual_edit")
# 또는 reducer를 우회하려면 새 thread를 만든다

update_state도 reducer를 거친다. 누적 필드를 강제 교체하려면 as_node를 명시하거나 별도 패턴을 써야 한다.

11 다음 편 예고 — BaseAgent 계약 v2

13~16편에서 LangGraph 기초·노드 분해·State 설계·Checkpointing/HITL을 다뤘다. Phase C-2의 마무리는 Phase B의 02-1편 BaseAgent 계약 v2이며, 위 결과물을 BaseAgent 계약으로 묶는다. 번호상 v1과 인접 배치되어 있지만 내용상 본 16편의 후속이다.

  • v1 (현재): run(query) -> Response / stream(query) -> Iterator[StreamEvent]
  • v2 (LangGraph 호환): graph 속성 노출, astream_events로 SSE, Checkpointer 옵션, Subgraph로의 분해 지원
  • v1 에이전트가 v2 프레임워크에서도 동작하도록 어댑터 패턴

12 정리

항목 핵심
Checkpointer 종류 Memory(개발), SQLite(PoC), Postgres(운영)
식별 키 thread_id(세션) + checkpoint_id(스냅샷)
멀티턴 MessagesState + Checkpointer로 자동 누적
Interrupt 컴파일 시점 정적 / 실행 시점 동적 (GraphInterrupt)
HITL 사이클 invoke → 멈춤 → get_state → update_state → invoke(None)
MINERVA 적용 Standardizer 승인, QnA 분석 라벨링, A/B arm 일관성
운영 TTL 정책, thread 락, Postgres 마이그레이션 두 줄

Checkpointing과 HITL은 그래프를 “장시간 실행 + 사람 협업”의 무대로 격상시킨다. 1.5명 PoC에서는 사치였던 영속화·감사·승인이 50명·1000명 단계에서는 전제가 된다. 다음 편에서 BaseAgent 계약 v2로 이 모든 것을 묶는다.

13 관련 주제

선행 학습

후속 주제

다른 카테고리 연결

Subscribe

Enjoy this blog? Get notified of new posts by email: