1 Lazy GraphRAG: 커뮤니티 기반 계층적 요약
1.1 LazyGraphRAG란
Microsoft Research(2024.11)가 발표한 GraphRAG 개선 방식이다.
기존 GraphRAG의 문제: - 전체 문서를 미리 LLM으로 분석하여 지식 그래프 구축 - Wikipedia 6M 문서 기준 $70,000 상당의 비용 추정 - 질의와 무관한 문서까지 모두 처리
LazyGraphRAG의 핵심: - 그래프를 사전에 완전히 구축하지 않음 - 질의가 들어온 후, 관련 문서만 on-demand로 탐색 - 탐색된 문서들에서 커뮤니티를 감지하고 각 커뮤니티에서 클레임 추출 - 기존 대비 1/100 수준의 비용으로 유사한 품질
1.2 전체 파이프라인
질의 입력
│
▼
1. GraphRetriever로 관련 문서 탐색 (mentions + entities 엣지 활용)
│
▼
2. 탐색된 문서들로 서브그래프 생성
│
▼
3. 커뮤니티 감지 (NetworkX, Louvain 알고리즘)
│
▼
4. 각 커뮤니티에서 클레임 추출 (LLM 병렬 호출)
│
▼
5. 클레임 관련성 랭킹 (RankRAG 방식)
│
▼
6. 상위 클레임으로 최종 답변 생성
1.3 데이터 준비: Wikipedia + SpacyNER
Wikipedia 문서를 로드하면서 mentions(하이퍼링크된 다른 문서 ID)와 SpacyNER로 추출한 entities를 메타데이터에 저장한다.
import json
from langchain_core.documents import Document
from langchain_graph_retriever.transformers.spacy import SpacyNERTransformer
def parse_document(line: bytes) -> Document:
"""2wikimultihop 데이터셋의 JSON 한 줄을 Document로 변환."""
para = json.loads(line)
mentioned_ids = [
ref_id
for mention in para["mentions"]
for ref_id in (mention["ref_ids"] or [])
]
return Document(
id=para["id"],
page_content=" ".join(para["sentences"]),
metadata={
"mentions": mentioned_ids, # 이 문서가 언급하는 다른 문서 ID
"title": para["title"],
},
)
# SpacyNER로 엔티티 추출
NER_TRANSFORMER = SpacyNERTransformer(
limit=1000,
exclude_labels={"CARDINAL", "MONEY", "QUANTITY", "TIME", "PERCENT", "ORDINAL"},
)
# 배치 처리
def prepare_batch(lines):
docs = [parse_document(line) for line in lines]
docs = NER_TRANSFORMER.transform_documents(docs)
return docs
# 결과 문서 구조
# doc.metadata = {
# "mentions": ["article_id_1", "article_id_2", ...], # 언급된 문서 ID
# "entities": [{"label": "PERSON", "text": "Elon Musk"}, ...], # 추출된 엔티티
# "title": "Tesla, Inc.",
# }1.4 GraphRetriever: mentions + entities 엣지
from langchain_astradb import AstraDBVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_graph_retriever import GraphRetriever
store = AstraDBVectorStore(
embedding=OpenAIEmbeddings(),
collection_name="lazy_graph_rag",
)
RETRIEVER = GraphRetriever(
store=store,
edges=[
("mentions", "$id"), # mentions 리스트의 값 → 다른 문서의 ID
("entities", "entities"), # 같은 엔티티를 공유하는 문서 연결
],
select_k=100,
start_k=30,
adjacent_k=20,
max_depth=3,
)엣지 설명: - ("mentions", "$id"): 문서의 mentions 메타데이터가 다른 문서의 ID와 일치하면 연결 - ("entities", "entities"): 같은 엔티티를 포함하는 문서끼리 연결
1.5 클레임 추출 체인
각 커뮤니티에서 질의와 관련된 클레임을 추출한다.
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnableLambda, chain
from langchain_openai import ChatOpenAI
class Claim(BaseModel):
claim: str = Field(description="문서에서 추출한 클레임.")
source_id: str = Field(description="클레임이 있는 문서 ID.")
class Claims(BaseModel):
claims: list[Claim] = Field(description="추출된 클레임 목록.")
MODEL = ChatOpenAI(model="gpt-4o", temperature=0)
CLAIMS_MODEL = MODEL.with_structured_output(Claims)
CLAIMS_PROMPT = ChatPromptTemplate.from_template("""
다음 관련 문서들에서 클레임을 추출하세요.
클레임은 질문과 직접 또는 간접적으로 관련된 것만 포함하세요.
관련 없는 클레임은 제외하세요.
질문: {question}
{formatted_documents}
""")
def format_documents_with_ids(documents):
return "\n\n".join(
f"문서 ID: {doc.id}\n내용: {doc.page_content}"
for doc in documents
)
# 커뮤니티별 클레임 추출 (병렬 실행)
CLAIM_CHAIN = (
RunnableParallel({
"question": lambda x: x["question"],
"formatted_documents": lambda x: format_documents_with_ids(x["documents"]),
})
| CLAIMS_PROMPT
| CLAIMS_MODEL
)
@chain
async def claims_chain(input):
question = input["question"]
communities = input["communities"]
# 모든 커뮤니티에 대해 병렬로 클레임 추출
community_claims = await CLAIM_CHAIN.abatch([
{"question": question, "documents": community}
for community in communities
])
return [
claim
for community in community_claims
for claim in community.claims
]1.6 클레임 랭킹 체인
RankRAG 방식으로 각 클레임의 관련성을 점수화한다.
import math
from langchain_core.runnables import chain
RANK_PROMPT = ChatPromptTemplate.from_template("""
다음 클레임이 질문과 관련이 있으면 "True", 없으면 "False"만 출력하세요.
질문: {question}
클레임: {claim}
관련성:
""")
def compute_rank(msg):
"""로그 확률로 관련성 점수 계산."""
logprob = msg.response_metadata["logprobs"]["content"][0]
prob = math.exp(logprob["logprob"])
token = logprob["token"]
if token == "True":
return prob
elif token == "False":
return 1.0 - prob
else:
raise ValueError(f"예상치 못한 토큰: {token}")
RANK_CHAIN = RANK_PROMPT | MODEL.bind(logprobs=True) | RunnableLambda(compute_rank)
@chain
async def rank_chain(input):
claims = input["claims"]
# 모든 클레임을 병렬로 랭킹
ranks = await RANK_CHAIN.abatch([
{"question": input["question"], "claim": claim.claim}
for claim in claims
])
# 점수 높은 순으로 정렬
ranked = sorted(
zip(ranks, claims),
key=lambda x: x[0],
reverse=True # 높은 점수 먼저
)
return [claim for _, claim in ranked]1.7 LazyGraphRAG 통합 체인
from langchain_graph_retriever.document_graph import create_graph, group_by_community
@chain
async def lazy_graph_rag(
question: str,
*,
retriever: GraphRetriever,
model,
max_tokens: int = 1000,
) -> str:
"""LazyGraphRAG 전체 파이프라인."""
# Step 1: 그래프 탐색으로 관련 문서 수집
documents = await retriever.ainvoke(question)
# Step 2: 탐색된 문서로 서브그래프 생성
document_graph = create_graph(documents, edges=retriever.edges)
# Step 3: 커뮤니티 감지 (NetworkX 기반)
communities = group_by_community(document_graph)
# Step 4: 각 커뮤니티에서 클레임 추출 (LLM 병렬 호출)
claims = await claims_chain.ainvoke({
"question": question,
"communities": communities,
})
# Step 5: 클레임 랭킹 및 토큰 제한 내에서 선택
result_claims = []
tokens = 0
for claim in await rank_chain.ainvoke({"question": question, "claims": claims}):
claim_str = f"- {claim.claim} (출처: {claim.source_id})"
tokens += model.get_num_tokens(claim_str)
if tokens > max_tokens:
break
result_claims.append(claim_str)
return "\n".join(result_claims)1.8 전체 RAG 체인 구성
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
ANSWER_PROMPT = PromptTemplate.from_template("""
지원 클레임을 바탕으로 질문에 답하세요.
클레임에 있는 정보만 사용하세요. 추측하거나 정보를 만들지 마세요.
가능한 경우 지원 클레임을 인용하세요.
질문: {question}
클레임:
{claims}
""")
LAZY_GRAPH_RAG_CHAIN = (
{
"question": RunnablePassthrough(),
"claims": RunnablePassthrough() | lazy_graph_rag.bind(
retriever=RETRIEVER,
model=MODEL,
max_tokens=1000,
),
}
| ANSWER_PROMPT
| MODEL
)
# 실행
question = "버뮤다 슬루프 선박이 다른 선박에 비해 높이 평가되는 이유는?"
result = await LAZY_GRAPH_RAG_CHAIN.ainvoke(question)
print(result.content)예상 출력:
버뮤다 슬루프 선박이 높이 평가되는 이유는 여러 가지입니다.
첫째, 버뮤다 리그를 사용하여 적은 선원으로도 항해 가능하고 비용이 저렴합니다
(출처: 48520).
둘째, 버뮤다 삼나무로 제작되어 내구성이 뛰어나고 부식에 강합니다
(출처: 17186373).
1.9 기존 RAG와 비교
# 일반 Vector RAG
VECTOR_CHAIN = (
{
"question": RunnablePassthrough(),
"documents": store.as_retriever() | (lambda docs: "\n\n".join(d.page_content for d in docs)),
}
| VECTOR_ANSWER_PROMPT
| MODEL
)
result_vector = VECTOR_CHAIN.invoke(question)
# → 단순 유사도 검색만으로는 multi-hop 추론 불가| 항목 | Vector RAG | LazyGraphRAG |
|---|---|---|
| 검색 방식 | 코사인 유사도 | 그래프 탐색 + 커뮤니티 |
| 커버리지 | 유사한 문서만 | 연결된 모든 관련 문서 |
| Multi-hop | 불가 | 가능 |
| 비용 | 낮음 | 중간 (클레임 추출에 LLM 사용) |
| 답변 품질 | 단순 질문에 적합 | 복잡한 추론 질문에 우수 |
1.10 정리
LazyGraphRAG는 다음 5단계로 작동한다:
1. GraphRetriever.invoke() → 관련 문서 수집 (그래프 탐색)
2. create_graph() → 서브그래프 생성
3. group_by_community() → 커뮤니티 감지 (NetworkX)
4. claims_chain() → 커뮤니티별 클레임 추출 (LLM 병렬)
5. rank_chain() + 답변 생성 → 관련성 높은 클레임으로 답변
장점: 사전 그래프 구축 불필요, 질의 시점 on-demand 처리 단점: 클레임 추출에 LLM 호출이 많아 응답 지연 가능
다음 파일에서는 Wikipedia Multi-hop QA에서 다단계 추론을 살펴본다.