LangChain to LangGraph

RAG 구현 - LangChain에서 LangGraph로 전환

LangChain 체인에서 LangGraph 상태 머신으로 RAG 시스템을 전환하는 방법을 다룬다.

AI
RAG
LangChain
LangGraph
저자

Kwangmin Kim

공개

2025년 11월 06일

1 LangChain vs LangGraph

1.1 LangChain의 한계

LangChain (LCEL):
- 선형적인 체인 구조
- 단방향 데이터 흐름
- 조건부 로직 구현 어려움
- 복잡한 워크플로우 제한적

# LangChain 체인 예시  
chain = (  
    {"context": retriever, "question": RunnablePassthrough()}  
    | prompt  
    | llm  
    | StrOutputParser()  
)  

문제점:
- 검색 실패 시 재시도 불가
- 답변 품질 검증 없음
- 대화 히스토리 관리 복잡
- 멀티턴 대화 구현 어려움

1.2 LangGraph의 장점

LangGraph:
- 상태 기반 그래프 구조
- 조건부 라우팅
- 루프 및 재시도 로직
- 복잡한 워크플로우 구현

주요 개념:
- State: 상태 관리 (TypedDict)
- Nodes: 작업 단위 (함수)
- Edges: 노드 간 연결
- Conditional Edges: 조건부 분기

2 환경 설정

2.1 설치

pip install langgraph  
pip install langchain-openai  
pip install azure-search-documents  

2.2 환경 변수

.env 파일:

AZURE_OPENAI_ENDPOINT=https://openai-rag-prod.openai.azure.com/  
AZURE_OPENAI_API_KEY=your-key  
AZURE_OPENAI_DEPLOYMENT=gpt-4  
AZURE_OPENAI_API_VERSION=2024-02-01  

AZURE_SEARCH_ENDPOINT=https://search-rag-prod.search.windows.net  
AZURE_SEARCH_API_KEY=your-key  
AZURE_SEARCH_INDEX_NAME=rag-documents  

3 기본 LangChain RAG

먼저 기본 LangChain 체인을 구현한다.

from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI  
from langchain_community.vectorstores.azuresearch import AzureSearch  
from langchain_core.prompts import ChatPromptTemplate  
from langchain_core.output_parsers import StrOutputParser  
from langchain_core.runnables import RunnablePassthrough  
from dotenv import load_dotenv  
import os  

load_dotenv()  

# Embeddings  
embeddings = AzureOpenAIEmbeddings(  
    azure_deployment="text-embedding-3-small",  
    openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),  
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),  
    api_key=os.getenv("AZURE_OPENAI_API_KEY")  
)  

# Vector Store  
vector_store = AzureSearch(  
    azure_search_endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"),  
    azure_search_key=os.getenv("AZURE_SEARCH_API_KEY"),  
    index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"),  
    embedding_function=embeddings.embed_query  
)  

# Retriever  
retriever = vector_store.as_retriever(search_kwargs={"k": 3})  

# LLM  
llm = AzureChatOpenAI(  
    azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),  
    openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),  
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),  
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
    temperature=0  
)  

# Prompt  
prompt = ChatPromptTemplate.from_template(  
    """다음 컨텍스트를 참고하여 질문에 답변하세요.  

컨텍스트:  
{context}  

질문: {question}  

답변:"""  
)  

# Chain  
def format_docs(docs):  
    return "\n\n".join([doc.page_content for doc in docs])  

rag_chain = (  
    {"context": retriever | format_docs, "question": RunnablePassthrough()}  
    | prompt  
    | llm  
    | StrOutputParser()  
)  

# 실행  
answer = rag_chain.invoke("Azure AI Search란 무엇인가?")  
print(answer)  

4 LangGraph로 전환

4.1 상태 정의

from typing import TypedDict, List  
from langchain_core.documents import Document  

class RAGState(TypedDict):  
    """RAG 상태"""  
    question: str  # 사용자 질문  
    context: List[Document]  # 검색된 문서  
    answer: str  # 생성된 답변  
    retrieval_success: bool  # 검색 성공 여부  

4.2 노드 정의

def retrieve(state: RAGState) -> RAGState:  
    """문서 검색 노드"""  
    question = state["question"]  
    
    # 검색 실행  
    docs = retriever.invoke(question)  
    
    # 검색 성공 여부 확인  
    success = len(docs) > 0  
    
    return {  
        **state,  
        "context": docs,  
        "retrieval_success": success  
    }  

def generate(state: RAGState) -> RAGState:  
    """답변 생성 노드"""  
    question = state["question"]  
    context = state["context"]  
    
    # 컨텍스트 포맷팅  
    context_text = "\n\n".join([doc.page_content for doc in context])  
    
    # 프롬프트 생성  
    messages = prompt.invoke({  
        "context": context_text,  
        "question": question  
    })  
    
    # 답변 생성  
    response = llm.invoke(messages)  
    
    return {  
        **state,  
        "answer": response.content  
    }  

def no_context_fallback(state: RAGState) -> RAGState:  
    """컨텍스트 없을 때 기본 답변"""  
    return {  
        **state,  
        "answer": "관련 정보를 찾을 수 없습니다. 질문을 다시 입력해주세요."  
    }  

4.3 그래프 구성

from langgraph.graph import StateGraph, END  

# 그래프 생성  
workflow = StateGraph(RAGState)  

# 노드 추가  
workflow.add_node("retrieve", retrieve)  
workflow.add_node("generate", generate)  
workflow.add_node("fallback", no_context_fallback)  

# 시작점 설정  
workflow.set_entry_point("retrieve")  

# 조건부 엣지 (검색 성공 여부)  
def should_generate(state: RAGState) -> str:  
    """검색 성공 시 generate, 실패 시 fallback"""  
    if state["retrieval_success"]:  
        return "generate"  
    else:  
        return "fallback"  

workflow.add_conditional_edges(  
    "retrieve",  
    should_generate,  
    {  
        "generate": "generate",  
        "fallback": "fallback"  
    }  
)  

# 종료 엣지  
workflow.add_edge("generate", END)  
workflow.add_edge("fallback", END)  

# 컴파일  
app = workflow.compile()  

4.4 실행

# 그래프 실행  
result = app.invoke({  
    "question": "Azure AI Search란 무엇인가?"  
})  

print(f"질문: {result['question']}")  
print(f"검색 성공: {result['retrieval_success']}")  
print(f"답변: {result['answer']}")  

5 대화 히스토리 추가

5.1 상태 확장

from langchain_core.messages import BaseMessage  

class ConversationState(TypedDict):  
    """대화 상태"""  
    question: str  
    chat_history: List[BaseMessage]  # 대화 히스토리  
    context: List[Document]  
    answer: str  
    retrieval_success: bool  

5.2 히스토리 기반 검색

from langchain_core.messages import HumanMessage, AIMessage  

def retrieve_with_history(state: ConversationState) -> ConversationState:  
    """대화 히스토리를 고려한 검색"""  
    question = state["question"]  
    chat_history = state.get("chat_history", [])  
    
    # 히스토리를 컨텍스트로 변환  
    history_text = "\n".join([  
        f"{'User' if isinstance(msg, HumanMessage) else 'AI'}: {msg.content}"  
        for msg in chat_history[-3:]  # 최근 3개만  
    ])  
    
    # 질문 재구성 (히스토리 포함)  
    if history_text:  
        enhanced_question = f"대화 히스토리:\n{history_text}\n\n현재 질문: {question}"  
    else:  
        enhanced_question = question  
    
    # 검색  
    docs = retriever.invoke(enhanced_question)  
    
    return {  
        **state,  
        "context": docs,  
        "retrieval_success": len(docs) > 0  
    }  

def generate_with_history(state: ConversationState) -> ConversationState:  
    """히스토리를 고려한 답변 생성"""  
    question = state["question"]  
    context = state["context"]  
    chat_history = state.get("chat_history", [])  
    
    # 컨텍스트 포맷팅  
    context_text = "\n\n".join([doc.page_content for doc in context])  
    
    # 히스토리 포함 프롬프트  
    history_prompt = ChatPromptTemplate.from_template(  
        """이전 대화:  
{history}  

컨텍스트:  
{context}  

질문: {question}  

답변:"""  
    )  
    
    history_text = "\n".join([  
        f"User: {msg.content}" if isinstance(msg, HumanMessage) else f"AI: {msg.content}"  
        for msg in chat_history[-3:]  
    ])  
    
    messages = history_prompt.invoke({  
        "history": history_text or "없음",  
        "context": context_text,  
        "question": question  
    })  
    
    response = llm.invoke(messages)  
    
    # 히스토리 업데이트  
    updated_history = chat_history + [  
        HumanMessage(content=question),  
        AIMessage(content=response.content)  
    ]  
    
    return {  
        **state,  
        "answer": response.content,  
        "chat_history": updated_history  
    }  

5.3 대화형 그래프

# 대화형 워크플로우  
conversation_workflow = StateGraph(ConversationState)  

conversation_workflow.add_node("retrieve", retrieve_with_history)  
conversation_workflow.add_node("generate", generate_with_history)  
conversation_workflow.add_node("fallback", no_context_fallback)  

conversation_workflow.set_entry_point("retrieve")  

conversation_workflow.add_conditional_edges(  
    "retrieve",  
    should_generate,  
    {  
        "generate": "generate",  
        "fallback": "fallback"  
    }  
)  

conversation_workflow.add_edge("generate", END)  
conversation_workflow.add_edge("fallback", END)  

conversation_app = conversation_workflow.compile()  

# 멀티턴 대화 실행  
state = {  
    "question": "Azure AI Search란 무엇인가?",  
    "chat_history": []  
}  

result1 = conversation_app.invoke(state)  
print(f"답변 1: {result1['answer']}\n")  

# 후속 질문  
state2 = {  
    "question": "그것의 주요 기능은?",  
    "chat_history": result1["chat_history"]  
}  

result2 = conversation_app.invoke(state2)  
print(f"답변 2: {result2['answer']}")  

6 재시도 로직 추가

6.1 재시도 상태

class RetryState(TypedDict):  
    """재시도 가능한 RAG 상태"""  
    question: str  
    context: List[Document]  
    answer: str  
    retrieval_success: bool  
    retry_count: int  # 재시도 횟수  
    max_retries: int  # 최대 재시도  

6.2 재시도 노드

def retrieve_with_retry(state: RetryState) -> RetryState:  
    """재시도 가능한 검색"""  
    question = state["question"]  
    retry_count = state.get("retry_count", 0)  
    
    # 재시도 시 쿼리 확장  
    if retry_count > 0:  
        expanded_question = f"{question} (관련 정보, 설명, 개요 포함)"  
    else:  
        expanded_question = question  
    
    docs = retriever.invoke(expanded_question)  
    
    return {  
        **state,  
        "context": docs,  
        "retrieval_success": len(docs) > 0,  
        "retry_count": retry_count + 1  
    }  

def should_retry(state: RetryState) -> str:  
    """재시도 여부 결정"""  
    if state["retrieval_success"]:  
        return "generate"  
    elif state["retry_count"] < state.get("max_retries", 2):  
        return "retry"  
    else:  
        return "fallback"  

6.3 재시도 그래프

retry_workflow = StateGraph(RetryState)  

retry_workflow.add_node("retrieve", retrieve_with_retry)  
retry_workflow.add_node("generate", generate)  
retry_workflow.add_node("fallback", no_context_fallback)  

retry_workflow.set_entry_point("retrieve")  

retry_workflow.add_conditional_edges(  
    "retrieve",  
    should_retry,  
    {  
        "generate": "generate",  
        "retry": "retrieve",  # 루프백  
        "fallback": "fallback"  
    }  
)  

retry_workflow.add_edge("generate", END)  
retry_workflow.add_edge("fallback", END)  

retry_app = retry_workflow.compile()  

# 실행  
result = retry_app.invoke({  
    "question": "희귀한 주제 검색",  
    "max_retries": 2  
})  

print(f"재시도 횟수: {result['retry_count']}")  
print(f"답변: {result['answer']}")  

7 답변 품질 검증

7.1 검증 상태

class QualityState(TypedDict):  
    """품질 검증 상태"""  
    question: str  
    context: List[Document]  
    answer: str  
    retrieval_success: bool  
    answer_quality: str  # "good", "bad", "unknown"  

7.2 품질 평가 노드

def evaluate_answer(state: QualityState) -> QualityState:  
    """답변 품질 평가"""  
    answer = state["answer"]  
    question = state["question"]  
    
    # 평가 프롬프트  
    eval_prompt = ChatPromptTemplate.from_template(  
        """다음 질문과 답변의 품질을 평가하세요.  

질문: {question}  
답변: {answer}  

답변이 질문에 적절히 대답하고 있나요?  
- "good": 적절한 답변  
- "bad": 부적절하거나 관련 없는 답변  
- "unknown": 판단 불가  

평가 결과 (good/bad/unknown만 출력):"""  
    )  
    
    messages = eval_prompt.invoke({  
        "question": question,  
        "answer": answer  
    })  
    
    evaluation = llm.invoke(messages).content.strip().lower()  
    
    return {  
        **state,  
        "answer_quality": evaluation  
    }  

def regenerate_answer(state: QualityState) -> QualityState:  
    """답변 재생성"""  
    question = state["question"]  
    context = state["context"]  
    
    # 더 상세한 프롬프트  
    detailed_prompt = ChatPromptTemplate.from_template(  
        """다음 컨텍스트를 참고하여 질문에 상세히 답변하세요.  
반드시 컨텍스트의 정보를 활용하고, 구체적으로 설명하세요.  

컨텍스트:  
{context}  

질문: {question}  

상세 답변:"""  
    )  
    
    context_text = "\n\n".join([doc.page_content for doc in context])  
    messages = detailed_prompt.invoke({  
        "context": context_text,  
        "question": question  
    })  
    
    response = llm.invoke(messages)  
    
    return {  
        **state,  
        "answer": response.content  
    }  

def route_by_quality(state: QualityState) -> str:  
    """품질에 따라 라우팅"""  
    quality = state["answer_quality"]  
    
    if quality == "good":  
        return "end"  
    elif quality == "bad":  
        return "regenerate"  
    else:  
        return "end"  

7.3 품질 검증 그래프

quality_workflow = StateGraph(QualityState)  

quality_workflow.add_node("retrieve", retrieve)  
quality_workflow.add_node("generate", generate)  
quality_workflow.add_node("evaluate", evaluate_answer)  
quality_workflow.add_node("regenerate", regenerate_answer)  
quality_workflow.add_node("fallback", no_context_fallback)  

quality_workflow.set_entry_point("retrieve")  

quality_workflow.add_conditional_edges(  
    "retrieve",  
    should_generate,  
    {  
        "generate": "generate",  
        "fallback": "fallback"  
    }  
)  

quality_workflow.add_edge("generate", "evaluate")  

quality_workflow.add_conditional_edges(  
    "evaluate",  
    route_by_quality,  
    {  
        "end": END,  
        "regenerate": "regenerate"  
    }  
)  

quality_workflow.add_edge("regenerate", END)  
quality_workflow.add_edge("fallback", END)  

quality_app = quality_workflow.compile()  

# 실행  
result = quality_app.invoke({  
    "question": "Azure AI Search의 장점은?"  
})  

print(f"답변 품질: {result['answer_quality']}")  
print(f"최종 답변: {result['answer']}")  

8 시각화

8.1 그래프 구조 확인

from IPython.display import Image, display  

# 그래프 시각화  
try:  
    display(Image(app.get_graph().draw_mermaid_png()))  
except Exception:  
    print(app.get_graph().draw_ascii())  

9 스트리밍

9.1 스트리밍 실행

# 스트리밍으로 중간 단계 확인  
for event in app.stream({  
    "question": "Azure AI Search란?"  
}):  
    print(f"이벤트: {event}")  
    print("---")  

10 참고 자료

10.1 공식 문서

10.2 예제

11 다음 단계

LangGraph로 RAG 로직을 구현했다면, 이제 Azure OpenAI LLM을 최적화하자:

👉 06-Azure-OpenAI-LLM.qmd - Azure OpenAI LLM 최적화 및 프롬프트 엔지니어링

Subscribe

Enjoy this blog? Get notified of new posts by email: