1 왜 Checkpointing과 HITL이 필요한가
15편에서 State를 잘 설계하면 노드 분해의 비용이 회수된다고 했다. State가 잘 정의되면 다음 두 능력이 자연스럽게 따라온다.
- Checkpointing — State를 외부 저장소에 영속화해 그래프 실행을 끊었다 이을 수 있다. 긴 LLM 호출·외부 API 실패에서 처음부터 다시 돌리지 않아도 된다.
- Human-in-the-Loop (HITL) — 특정 노드 직전·직후에서 그래프를 멈추고, 사람이 State를 검토·수정한 뒤 재개할 수 있다.
MINERVA의 두 에이전트는 모두 HITL이 필요하다.
| 에이전트 | HITL 시나리오 |
|---|---|
| Data Standardizer | LLM이 제안한 표준 후보를 운영자가 검토·승인 후 마스터 데이터에 반영 |
| QnA Chatbot | 인용이 부족한 답변을 발화 데이터 분석가가 라벨링 — 후속 학습 데이터로 |
| 공통 — A/B 실험 | 신규 arm을 소규모 트래픽으로 노출 시 실패 답변 인간 검토 |
이 글에서는 Checkpointer 기초부터 MINERVA 적용까지를 코드 단위로 정리한다.
이 글의 적용 코드는 Phase 11 LangGraph 전환 후 도입할 청사진이다. 현 시점에는 그래프 자체가 없으니 Checkpointer도 없다. 대신 09편에서 본 runs.jsonl(응답 기록)과 feedback.jsonl(사용자 피드백)이 부분적 감사 역할을 한다. 두 시스템의 책임 차이는 다음과 같다:
| 시스템 | 단위 | 목적 | 가시성 |
|---|---|---|---|
runs.jsonl / feedback.jsonl |
요청 1건 | 사후 분석·메트릭·실험 | 응답 시점 1회 + 피드백 1회 |
| LangGraph Checkpointer | 노드 1단계 | 중단·재개·HITL·되돌림 | 노드마다 자동 |
Checkpointer는 응답 단위 로그를 대체하지 않고 보완한다. 노드별 스냅샷이 디스크 부담을 키우므로, 전환 시 runs.jsonl은 운영 메트릭 채널로 유지하고 Checkpointer는 HITL이 필요한 thread에만 활성화하는 방향이 합리적이다.
- MINERVA RAG Chain 분해 — 7 노드 그래프
- MINERVA State 설계 — TypedDict와 reducer
- MINERVA 상태 관리 해부 — 4계층 상태 분포
- MINERVA A/B 실험 프레임워크 — sticky_hash와 arm 할당
2 Checkpointer 기초
2.1 무엇을 저장하는가
Checkpointer는 노드 실행 직후의 State 스냅샷을 저장한다. 그래프가 다음 노드로 넘어가기 직전에 자동으로 한 번씩 저장된다. 저장 단위는 두 키로 식별된다.
| 키 | 의미 |
|---|---|
thread_id |
한 사용자 또는 한 대화 세션 (멀티턴 묶음) |
checkpoint_id |
그래프 실행 중 한 시점의 스냅샷 ID (자동 생성, ULID) |
같은 thread_id 안에 여러 checkpoint_id가 시간 순으로 쌓인다. 재개·되돌림이 모두 가능하다.
2.2 Checkpointer 종류
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver| 종류 | 용도 | 특성 |
|---|---|---|
MemorySaver |
개발·테스트 | 프로세스 재시작 시 사라짐 |
SqliteSaver |
단일 노드 PoC, 로컬 운영 | 파일 기반, 동시성 제한 |
PostgresSaver |
멀티 인스턴스 프로덕션 | 동시 접근, 영속성, 운영 도구 |
MINERVA는 현재 단일 컨테이너로 운영되지만 50명·1000명 단계에서는 PostgresSaver로 가야 한다. 마이그레이션 비용은 connection string 한 줄이다.
2.3 기본 사용
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
memory = MemorySaver()
app = graph.compile(checkpointer=memory)
# thread_id를 config에 담아 호출
config = {"configurable": {"thread_id": "user-anon-9d3f2a"}}
result = app.invoke({"query": query}, config=config)
# 같은 thread_id로 다시 호출 — 이전 State에 이어서 실행 가능
next_result = app.invoke({"query": next_query}, config=config)thread_id만 같으면 같은 State 스레드로 인식한다. MINERVA에서는 user_id를 그대로 쓰거나, 대화 단위로 conversation_id를 발급해 쓴다.
3 멀티턴 대화 — Checkpointer로 자연스럽게
15편에서 본 두 가지 히스토리 보관 방식 중 “방식 B (MessagesState)”는 Checkpointer와 자연스럽게 어울린다.
from langgraph.graph import MessagesState, StateGraph, START, END
from langchain_core.messages import HumanMessage, AIMessage
class ChatState(MessagesState):
parent_docs: list[Document]
def retrieve_node(state: ChatState) -> dict:
last_user_msg = state["messages"][-1].content
docs = retriever.invoke(last_user_msg)
return {"parent_docs": docs}
def generate_node(state: ChatState) -> dict:
context = "\n\n".join(d.page_content for d in state["parent_docs"])
answer = llm.invoke([
*state["messages"],
HumanMessage(content=f"위 대화에 대해 다음 문서를 참고하여 답변하라.\n\n{context}"),
])
return {"messages": [AIMessage(content=answer.content)]}
graph = StateGraph(ChatState)
graph.add_node("retrieve", retrieve_node)
graph.add_node("generate", generate_node)
graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "generate")
graph.add_edge("generate", END)
app = graph.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "session-001"}}
# 첫 번째 턴
app.invoke({"messages": [HumanMessage(content="MINERVA가 뭐야?")]}, config=config)
# 두 번째 턴 — 이전 messages를 자동으로 이어받음
app.invoke({"messages": [HumanMessage(content="그럼 사용 사례는?")]}, config=config)두 번째 호출에서 입력은 새 메시지 한 개만 넘긴다. Checkpointer가 이전 State를 불러오고, add_messages reducer가 새 메시지를 누적한다. 09편 — 상태 관리 해부에서 본 “프론트엔드가 history 전체를 매번 보냄” 패턴을 백엔드 Checkpointer로 옮길 수 있다.
4 interrupt — 노드 경계에서 멈추기
HITL의 핵심은 “사람이 검토할 지점”을 그래프 차원에서 명시하는 것이다. LangGraph는 두 가지 방식을 제공한다.
4.1 컴파일 시점에 정적 지정
4.2 실행 시점에 동적 지정 — Command(goto="...") 또는 interrupt()
LangGraph 0.2+에서는 노드 안에서 interrupt() 함수를 호출해 특정 입력이 들어올 때만 멈추도록 할 수 있다.
from langgraph.errors import GraphInterrupt
def review_node(state: AgentState) -> dict:
if state["confidence"] < 0.6:
raise GraphInterrupt({"reason": "low_confidence", "answer": state["answer"]})
return {"approved": True}이 노드는 confidence가 낮을 때만 그래프를 멈춘다. 호출자는 GraphInterrupt를 잡아 사람에게 검토를 요청하고, State를 업데이트한 뒤 재개한다.
5 HITL 흐름 — 멈춤·검토·재개의 사이클
[client] ──invoke──→ [graph] ──→ retrieve ──→ rerank ──→ ... ──→ [interrupt before generate]
│
▼
[멈춘 상태]
│
사람이 State 검토·수정
│
▼
[client] ──update_state──→ [graph] ──invoke(None)──→ generate ──→ build_response ──→ END
코드로 풀면 다음과 같다.
config = {"configurable": {"thread_id": "review-001"}}
# 1. 첫 invoke — interrupt 지점까지 실행
result = app.invoke({"query": query}, config=config)
# generate 직전에 멈춤 — result는 chain_inputs까지 채워진 상태
# 2. 사람이 검토 — 현재 State 조회
state_snapshot = app.get_state(config)
print(state_snapshot.values["chain_inputs"]["context"])
# 운영자가 컨텍스트를 보고 일부 문서를 제외하고 싶다고 판단
# 3. State 수정 — 사람이 결정한 변경 반영
app.update_state(config, {"chain_inputs": modified_inputs})
# 4. 재개 — 입력으로 None을 넘기면 멈춘 지점부터 이어 실행
final = app.invoke(None, config=config)
print(final["response"])get_state → update_state → invoke(None)이 HITL의 표준 사이클이다. 사람이 State를 수정한 흔적은 Checkpointer에 새 checkpoint_id로 기록되어 감사 추적이 자연스럽게 된다.
6 MINERVA 적용 — Data Standardizer 승인 흐름
가장 명확한 HITL 사례는 표준화 에이전트다. LLM이 제안한 표준 후보를 운영자가 승인해야 마스터 데이터에 반영된다.
class StandardizerState(TypedDict):
raw_term: str # 입력 — 원시 용어
candidates: list[dict] # LLM이 제안한 표준 후보들
approved: Optional[dict] # 운영자가 선택한 후보
persisted: bool # 마스터 DB 반영 여부
def propose_candidates_node(state):
candidates = llm_propose(state["raw_term"]) # LLM 호출
return {"candidates": candidates}
def persist_node(state):
write_to_master(state["approved"])
return {"persisted": True}
graph = StateGraph(StandardizerState)
graph.add_node("propose", propose_candidates_node)
graph.add_node("persist", persist_node)
graph.add_edge(START, "propose")
graph.add_edge("propose", "persist")
graph.add_edge("persist", END)
app = graph.compile(
checkpointer=PostgresSaver.from_conn_string(connection_string),
interrupt_before=["persist"], # 마스터 DB 반영 직전에 멈춤
)운영 흐름은 다음과 같다.
1. POST /standardize { "raw_term": "고객명" }
→ 백엔드: app.invoke({...}, config={thread_id})
→ propose 노드 실행 → persist 직전에 interrupt
→ 응답: { "candidates": [...], "thread_id": "..." }
2. 운영자가 UI에서 후보 중 하나 선택
→ POST /standardize/approve { "thread_id": "...", "approved": {...} }
→ 백엔드: app.update_state(config, {"approved": chosen})
app.invoke(None, config=config)
→ persist 노드 실행 → END
→ 응답: { "persisted": true }
핵심은 interrupt_before=["persist"] 한 줄로 “DB 반영은 사람 승인 후”라는 안전 정책이 그래프에 박힌다는 점이다. 코드 어디에도 if approved: write() 같은 분기를 직접 쓸 필요가 없다.
7 QnaChatbot 적용 — 인용 부족 시 검토
QnaChatbot에서는 인용이 부족한 답변을 발화 데이터 분석가가 검토하도록 만들 수 있다.
def grade_citations_node(state: QnaState) -> dict:
cited = extract_cited_indices(state["answer"])
if len(cited) < 2 and state["query"].agent_params.get("review_low_citation", False):
# 분석 모드 트래픽에 한해 인터럽트
raise GraphInterrupt({
"reason": "low_citation",
"answer": state["answer"],
"candidate_docs": [d.page_content for d in state["parent_docs"]],
})
return {"citation_count": len(cited)}이 노드는 운영 일반 트래픽에서는 그냥 통과하고, review_low_citation 플래그가 켜진 분석용 트래픽에서만 멈춘다. 분석가는 답변·후보 문서를 보고 라벨을 달거나 답변을 수정하고, 그 결과는 Phase C-5(발화 데이터 분석)의 학습 데이터로 흘러간다.
8 A/B 실험과 thread 통합
06편 — A/B 실험 프레임워크에서 sticky_hash로 사용자별 arm을 결정했다. Checkpointer 환경에서는 thread_id에 arm 정보를 함께 담아 두면 재개 시에도 동일 arm이 유지된다.
# 호출 시점에 arm 결정
arm_id = sticky_hash_assign(user_id, experiment="reranker_ab_test")
config = {
"configurable": {
"thread_id": user_id,
"experiment": "reranker_ab_test",
"arm": arm_id,
}
}
# 그래프는 arm에 따라 다르게 컴파일된 인스턴스를 사용
app = build_qna_graph_with_arm(agent, arm=arm_id, checkpointer=memory)
result = app.invoke({"query": query}, config=config)같은 thread를 다시 호출할 때 arm이 바뀌면 안 된다. arm 일관성은 thread_id + 별도 매핑(redis 또는 사용자 메타)으로 보장한다. MINERVA의 06편이 사용하는 sticky_hash는 deterministic이므로 arm 매핑을 별도 저장하지 않아도 같은 user_id면 같은 arm이 결정된다.
A/B 분기를 conditional edge로 빼면 그래프 한 개가 모든 arm을 표현할 수도 있다.
def route_by_arm(state, config):
arm = config["configurable"]["arm"]
return f"rerank_{arm}" # rerank_cosine 또는 rerank_flashrank
graph.add_node("rerank_cosine", make_rerank_cosine_node(agent))
graph.add_node("rerank_flashrank", make_rerank_flashrank_node(agent))
graph.add_conditional_edges(
"retrieve_children",
route_by_arm,
{"rerank_cosine": "rerank_cosine", "rerank_flashrank": "rerank_flashrank"},
)
graph.add_edge("rerank_cosine", "map_to_parents")
graph.add_edge("rerank_flashrank", "map_to_parents")14편의 “빌더 함수에서 노드를 바꿔 끼우기”와 차이점은 다음과 같다.
| 방식 | 컴파일 단위 | 변경 비용 | 관찰성 |
|---|---|---|---|
| 빌더 함수 (14편) | arm마다 다른 그래프 인스턴스 | arm 추가 시 빌더 분기 | arm별 독립 실행 흔적 |
| Conditional Edge | 그래프 한 개 + 라우팅 | arm 추가 시 노드 + 매핑 한 줄 | thread당 라우팅 흔적이 State에 남음 |
소규모 실험은 빌더, 다수 arm·동적 추가는 conditional edge가 자연스럽다.
9 운영 고려 — TTL, 동시성, 정리
Checkpointer는 “그냥 켜면 다 되는” 도구가 아니다. 다음을 결정한다.
9.1 TTL (Time-to-Live)
활성 thread는 보존하지만 영구히 둘 수는 없다. 정책 예시.
| 데이터 | TTL | 근거 |
|---|---|---|
| 진행 중 thread | 7일 | 일주일 안에 재개되지 않으면 죽은 thread |
| 완료 thread (END 도달) | 30일 | 디버깅·감사 |
| HITL 대기 thread | 30일 | 운영자 처리 시간 여유 |
| 실패 thread | 90일 | 사후 분석 (postmortem) |
PostgresSaver는 직접 정리 작업을 제공하지 않는다. cron job으로 checkpoints 테이블을 주기적으로 청소한다.
9.2 동시성 — 같은 thread 동시 호출 방지
같은 thread_id로 동시에 두 호출이 들어오면 race condition이 생긴다. MINERVA는 다음 두 가지로 보호한다.
- 프론트엔드 — 중복 클릭 방지: 메시지 전송 후 입력란 비활성화 (이미 09편 참조).
- 백엔드 — thread 락: Postgres advisory lock 또는 in-memory
dict[thread_id, asyncio.Lock]. 두 번째 요청은 첫 번째가 끝날 때까지 대기.
9.3 마이그레이션 — Memory에서 Postgres로
# 개발
checkpointer = MemorySaver()
# 운영
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(os.environ["POSTGRES_URL"])
checkpointer.setup() # 첫 실행 시 테이블 생성코드 변경은 두 줄이다. 다만 운영 환경에서는 connection pool, retry, telemetry를 추가로 감싸야 한다.
10 자주 발생하는 오류 패턴
CORRECT:
app = graph.compile(checkpointer=memory)
config = {"configurable": {"thread_id": user_id}}
result = app.invoke({"query": query}, config=config)Checkpointer를 등록한 그래프는 thread_id 없이 호출하면 ValueError를 발생시킨다. config에 반드시 thread_id를 담는다.
CORRECT:
재개 시 입력에 새 값을 넘기면 그래프는 처음부터 다시 시작한다. 멈춘 지점부터 이어 실행하려면 None을 넘긴다.
# log_messages가 Annotated[list, add]인데 update_state로 통째로 교체 시도
app.update_state(config, {"log_messages": ["수정됨"]})
# reducer가 작동해 [...이전 + "수정됨"]이 됨 — 의도와 다를 수 있음CORRECT:
# 의도: log_messages 전체 교체
app.update_state(config, {"log_messages": ["수정됨"]}, as_node="manual_edit")
# 또는 reducer를 우회하려면 새 thread를 만든다update_state도 reducer를 거친다. 누적 필드를 강제 교체하려면 as_node를 명시하거나 별도 패턴을 써야 한다.
11 다음 편 예고 — BaseAgent 계약 v2
13~16편에서 LangGraph 기초·노드 분해·State 설계·Checkpointing/HITL을 다뤘다. Phase C-2의 마무리는 Phase B의 02-1편 BaseAgent 계약 v2이며, 위 결과물을 BaseAgent 계약으로 묶는다. 번호상 v1과 인접 배치되어 있지만 내용상 본 16편의 후속이다.
- v1 (현재):
run(query) -> Response/stream(query) -> Iterator[StreamEvent] - v2 (LangGraph 호환):
graph속성 노출,astream_events로 SSE, Checkpointer 옵션, Subgraph로의 분해 지원 - v1 에이전트가 v2 프레임워크에서도 동작하도록 어댑터 패턴
12 정리
| 항목 | 핵심 |
|---|---|
| Checkpointer 종류 | Memory(개발), SQLite(PoC), Postgres(운영) |
| 식별 키 | thread_id(세션) + checkpoint_id(스냅샷) |
| 멀티턴 | MessagesState + Checkpointer로 자동 누적 |
| Interrupt | 컴파일 시점 정적 / 실행 시점 동적 (GraphInterrupt) |
| HITL 사이클 | invoke → 멈춤 → get_state → update_state → invoke(None) |
| MINERVA 적용 | Standardizer 승인, QnA 분석 라벨링, A/B arm 일관성 |
| 운영 | TTL 정책, thread 락, Postgres 마이그레이션 두 줄 |
Checkpointing과 HITL은 그래프를 “장시간 실행 + 사람 협업”의 무대로 격상시킨다. 1.5명 PoC에서는 사치였던 영속화·감사·승인이 50명·1000명 단계에서는 전제가 된다. 다음 편에서 BaseAgent 계약 v2로 이 모든 것을 묶는다.
13 관련 주제
선행 학습
후속 주제
다른 카테고리 연결