1 전체 아키텍처
지금까지 학습한 Azure RAG 컴포넌트를 통합한 엔터프라이즈 아키텍처다.
1.1 시스템 구성도
graph TB
subgraph "Data Ingestion"
A[Document Upload] --> B[Azure Blob Storage]
B --> C[Document Intelligence]
C --> D[Text Extraction]
end
subgraph "Embedding & Indexing"
D --> E[Azure OpenAI Embeddings]
E --> F[Azure AI Search]
F --> G[Vector Store]
end
subgraph "RAG Engine"
H[User Query] --> I[Query Embeddings]
I --> J[Retriever]
J --> K[LangGraph Workflow]
K --> L[Azure OpenAI GPT-4o]
L --> M[Response]
end
subgraph "Deployment"
N[Azure Functions] --> O[API Gateway]
P[Azure Container Apps] --> O
O --> Q[Application Gateway]
end
subgraph "Monitoring"
R[Application Insights]
S[Log Analytics]
T[Azure Monitor]
end
G --> J
M --> O
O --> R
O --> S
N --> T
P --> T 1.2 핵심 컴포넌트
| 계층 | Azure 서비스 | 역할 |
|---|---|---|
| 저장소 | Blob Storage | 원본 문서 저장 |
| 전처리 | Document Intelligence | OCR, 레이아웃 분석 |
| 임베딩 | Azure OpenAI (Embeddings) | 텍스트 → 벡터 변환 |
| 검색 | Azure AI Search | 벡터 저장 및 검색 |
| RAG | LangGraph | 검색-생성 워크플로우 |
| LLM | Azure OpenAI (GPT-4o) | 답변 생성 |
| 배포 | Functions/Container Apps | API 서빙 |
| 모니터링 | Application Insights | 로그, 메트릭, 추적 |
2 인프라 구성
2.1 전체 리소스 배포
deploy.bicep:
param location string = resourceGroup().location
param projectName string = 'rag-prod'
// Storage Account
resource storageAccount 'Microsoft.Storage/storageAccounts@2023-01-01' = {
name: 'st${projectName}'
location: location
sku: {
name: 'Standard_LRS'
}
kind: 'StorageV2'
properties: {
accessTier: 'Hot'
supportsHttpsTrafficOnly: true
minimumTlsVersion: 'TLS1_2'
}
}
// Document Intelligence
resource documentIntelligence 'Microsoft.CognitiveServices/accounts@2023-05-01' = {
name: 'di-${projectName}'
location: location
sku: {
name: 'S0'
}
kind: 'FormRecognizer'
properties: {
customSubDomainName: 'di-${projectName}'
publicNetworkAccess: 'Enabled'
}
}
// Azure OpenAI
resource openai 'Microsoft.CognitiveServices/accounts@2023-05-01' = {
name: 'openai-${projectName}'
location: location
sku: {
name: 'S0'
}
kind: 'OpenAI'
properties: {
customSubDomainName: 'openai-${projectName}'
publicNetworkAccess: 'Enabled'
}
}
// Azure AI Search
resource search 'Microsoft.Search/searchServices@2023-11-01' = {
name: 'search-${projectName}'
location: location
sku: {
name: 'standard'
}
properties: {
replicaCount: 1
partitionCount: 1
hostingMode: 'default'
}
}
// Container Apps Environment
resource containerAppEnv 'Microsoft.App/managedEnvironments@2023-05-01' = {
name: 'cae-${projectName}'
location: location
properties: {
appLogsConfiguration: {
destination: 'log-analytics'
logAnalyticsConfiguration: {
customerId: logAnalytics.properties.customerId
sharedKey: logAnalytics.listKeys().primarySharedKey
}
}
}
}
// Log Analytics
resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2022-10-01' = {
name: 'law-${projectName}'
location: location
properties: {
sku: {
name: 'PerGB2018'
}
retentionInDays: 30
}
}
// Application Insights
resource appInsights 'Microsoft.Insights/components@2020-02-02' = {
name: 'appi-${projectName}'
location: location
kind: 'web'
properties: {
Application_Type: 'web'
WorkspaceResourceId: logAnalytics.id
}
}
// Container Registry
resource containerRegistry 'Microsoft.ContainerRegistry/registries@2023-07-01' = {
name: 'acr${projectName}'
location: location
sku: {
name: 'Basic'
}
properties: {
adminUserEnabled: true
}
}
output storageAccountName string = storageAccount.name
output documentIntelligenceEndpoint string = documentIntelligence.properties.endpoint
output openaiEndpoint string = openai.properties.endpoint
output searchEndpoint string = search.properties.endpoint
output containerRegistryName string = containerRegistry.name
output appInsightsConnectionString string = appInsights.properties.ConnectionString 2.2 배포 실행
# 리소스 그룹 생성
az group create --name rg-rag-prod --location koreacentral
# Bicep 배포
az deployment group create \
--resource-group rg-rag-prod \
--template-file deploy.bicep \
--parameters projectName=ragprod
# 배포 출력 확인
az deployment group show \
--resource-group rg-rag-prod \
--name deploy \
--query properties.outputs 2.3 환경 변수 통합 관리
.env:
# Storage
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=stragprod;..."
AZURE_STORAGE_CONTAINER="documents"
# Document Intelligence
AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT="https://di-ragprod.cognitiveservices.azure.com/"
AZURE_DOCUMENT_INTELLIGENCE_KEY="your-key"
# Azure OpenAI
AZURE_OPENAI_ENDPOINT="https://openai-ragprod.openai.azure.com/"
AZURE_OPENAI_API_KEY="your-key"
AZURE_OPENAI_DEPLOYMENT_EMBEDDINGS="text-embedding-3-small"
AZURE_OPENAI_DEPLOYMENT_LLM="gpt-4o"
AZURE_OPENAI_API_VERSION="2024-02-01"
# Azure AI Search
AZURE_SEARCH_ENDPOINT="https://search-ragprod.search.windows.net"
AZURE_SEARCH_API_KEY="your-key"
AZURE_SEARCH_INDEX_NAME="rag-documents"
# Application Insights
APPLICATIONINSIGHTS_CONNECTION_STRING="InstrumentationKey=..."
# API Configuration
API_RATE_LIMIT=100
API_TIMEOUT=300
MAX_TOKENS=1000 3 데이터 파이프라인
3.1 문서 처리 워크플로우
import os
from azure.storage.blob import BlobServiceClient
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
from langchain_openai import AzureOpenAIEmbeddings
from langchain_community.vectorstores.azuresearch import AzureSearch
from langchain_text_splitters import RecursiveCharacterTextSplitter
import logging
class DocumentProcessor:
"""엔드투엔드 문서 처리 파이프라인"""
def __init__(self):
# Blob Storage
self.blob_client = BlobServiceClient.from_connection_string(
os.getenv("AZURE_STORAGE_CONNECTION_STRING")
)
self.container_name = os.getenv("AZURE_STORAGE_CONTAINER")
# Document Intelligence
self.doc_intelligence = DocumentAnalysisClient(
endpoint=os.getenv("AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT"),
credential=AzureKeyCredential(os.getenv("AZURE_DOCUMENT_INTELLIGENCE_KEY"))
)
# Embeddings
self.embeddings = AzureOpenAIEmbeddings(
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_EMBEDDINGS"),
openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY")
)
# Vector Store
self.vector_store = AzureSearch(
azure_search_endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"),
azure_search_key=os.getenv("AZURE_SEARCH_API_KEY"),
index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"),
embedding_function=self.embeddings.embed_query
)
# Text Splitter
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
logging.info("문서 처리 파이프라인 초기화 완료")
def process_document(self, blob_name: str) -> dict:
"""단일 문서 처리"""
logging.info(f"문서 처리 시작: {blob_name}")
# 1. Blob에서 문서 다운로드
blob_client = self.blob_client.get_blob_client(
container=self.container_name,
blob=blob_name
)
blob_data = blob_client.download_blob().readall()
# 2. Document Intelligence로 텍스트 추출
poller = self.doc_intelligence.begin_analyze_document(
"prebuilt-document",
document=blob_data
)
result = poller.result()
extracted_text = ""
for page in result.pages:
for line in page.lines:
extracted_text += line.content + "\n"
logging.info(f"텍스트 추출 완료: {len(extracted_text)} 문자")
# 3. 텍스트 분할
chunks = self.text_splitter.split_text(extracted_text)
logging.info(f"청크 생성: {len(chunks)}개")
# 4. 메타데이터 생성
from langchain_core.documents import Document
documents = []
for i, chunk in enumerate(chunks):
doc = Document(
page_content=chunk,
metadata={
"source": blob_name,
"chunk_id": i,
"total_chunks": len(chunks),
"blob_url": blob_client.url
}
)
documents.append(doc)
# 5. 벡터 저장소에 업로드
ids = self.vector_store.add_documents(documents)
logging.info(f"벡터 저장 완료: {len(ids)}개 문서")
return {
"blob_name": blob_name,
"chunks": len(chunks),
"vector_ids": ids,
"status": "success"
}
def process_all_documents(self):
"""컨테이너의 모든 문서 처리"""
container_client = self.blob_client.get_container_client(
self.container_name
)
results = []
for blob in container_client.list_blobs():
try:
result = self.process_document(blob.name)
results.append(result)
except Exception as e:
logging.error(f"문서 처리 실패 ({blob.name}): {str(e)}")
results.append({
"blob_name": blob.name,
"status": "failed",
"error": str(e)
})
return results 3.2 Batch 처리 (Azure Functions)
function_app.py:
import azure.functions as func
import logging
from document_processor import DocumentProcessor
app = func.FunctionApp()
@app.blob_trigger(
arg_name="myblob",
path="documents/{name}",
connection="AzureWebJobsStorage"
)
def blob_trigger_process(myblob: func.InputStream):
"""Blob 업로드 시 자동 처리"""
logging.info(f"새 문서 감지: {myblob.name}")
processor = DocumentProcessor()
try:
result = processor.process_document(myblob.name)
logging.info(f"처리 완료: {result}")
except Exception as e:
logging.error(f"처리 실패: {str(e)}")
raise
@app.timer_trigger(
arg_name="timer",
schedule="0 0 2 * * *" # 매일 새벽 2시
)
def scheduled_reindex(timer: func.TimerRequest):
"""정기적인 재색인"""
logging.info("정기 재색인 시작")
processor = DocumentProcessor()
results = processor.process_all_documents()
success = sum(1 for r in results if r["status"] == "success")
failed = len(results) - success
logging.info(f"재색인 완료: 성공 {success}, 실패 {failed}") 4 RAG 시스템 통합
4.1 LangGraph 워크플로우
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_core.documents import Document
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import logging
class RAGState(TypedDict):
"""RAG 상태"""
question: str
context: List[Document]
answer: str
retrieval_success: bool
confidence: float
class ProductionRAGSystem:
"""프로덕션급 RAG 시스템"""
def __init__(self):
# Vector Store (이미 초기화된 것 사용)
processor = DocumentProcessor()
self.retriever = processor.vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# LLM
self.llm = AzureChatOpenAI(
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_LLM"),
openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
temperature=0,
max_tokens=int(os.getenv("MAX_TOKENS", 1000))
)
# Prompt
self.prompt = ChatPromptTemplate.from_template(
"""다음 컨텍스트를 참고하여 질문에 답변하세요.
## 지침:
1. 컨텍스트에 있는 정보만 사용하세요
2. 확실하지 않으면 "잘 모르겠습니다"라고 답하세요
3. 한국어로 답변하세요
4. 출처를 명시하세요
## 컨텍스트:
{context}
## 질문:
{question}
## 답변:"""
)
# Workflow
self.workflow = self._build_workflow()
self.app = self.workflow.compile()
logging.info("RAG 시스템 초기화 완료")
def _build_workflow(self):
"""LangGraph 워크플로우 구성"""
workflow = StateGraph(RAGState)
# Nodes
workflow.add_node("retrieve", self._retrieve)
workflow.add_node("generate", self._generate)
workflow.add_node("evaluate", self._evaluate)
workflow.add_node("fallback", self._fallback)
# Edges
workflow.set_entry_point("retrieve")
workflow.add_conditional_edges(
"retrieve",
self._should_generate,
{
"generate": "generate",
"fallback": "fallback"
}
)
workflow.add_edge("generate", "evaluate")
workflow.add_conditional_edges(
"evaluate",
self._should_retry,
{
"end": END,
"fallback": "fallback"
}
)
workflow.add_edge("fallback", END)
return workflow
def _retrieve(self, state: RAGState) -> RAGState:
"""문서 검색"""
logging.info(f"검색 중: {state['question']}")
docs = self.retriever.invoke(state["question"])
return {
**state,
"context": docs,
"retrieval_success": len(docs) > 0
}
def _generate(self, state: RAGState) -> RAGState:
"""답변 생성"""
logging.info("답변 생성 중")
context_text = "\n\n".join([
f"[출처: {doc.metadata.get('source', 'Unknown')}]\n{doc.page_content}"
for doc in state["context"]
])
chain = self.prompt | self.llm | StrOutputParser()
answer = chain.invoke({
"context": context_text,
"question": state["question"]
})
return {
**state,
"answer": answer
}
def _evaluate(self, state: RAGState) -> RAGState:
"""답변 품질 평가"""
# 간단한 휴리스틱 평가
answer = state["answer"]
# 길이 체크
if len(answer) < 10:
confidence = 0.3
elif "잘 모르겠습니다" in answer or "확실하지 않습니다" in answer:
confidence = 0.5
else:
confidence = 0.8
logging.info(f"답변 신뢰도: {confidence}")
return {
**state,
"confidence": confidence
}
def _fallback(self, state: RAGState) -> RAGState:
"""대체 응답"""
logging.warning("대체 응답 사용")
return {
**state,
"answer": "죄송합니다. 관련 정보를 찾지 못했습니다. 다른 질문을 해주시겠어요?",
"confidence": 0.0
}
def _should_generate(self, state: RAGState) -> str:
"""생성 여부 결정"""
return "generate" if state["retrieval_success"] else "fallback"
def _should_retry(self, state: RAGState) -> str:
"""재시도 여부 결정"""
return "end" if state["confidence"] >= 0.6 else "fallback"
def query(self, question: str) -> dict:
"""RAG 쿼리 실행"""
initial_state = RAGState(
question=question,
context=[],
answer="",
retrieval_success=False,
confidence=0.0
)
final_state = self.app.invoke(initial_state)
return {
"question": question,
"answer": final_state["answer"],
"confidence": final_state["confidence"],
"sources": [
doc.metadata.get("source", "Unknown")
for doc in final_state["context"]
]
} 5 API 서버 배포
5.1 FastAPI 통합
main.py:
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import os
import logging
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import time
# 메트릭
REQUEST_COUNT = Counter('rag_requests_total', 'Total RAG requests')
REQUEST_DURATION = Histogram('rag_request_duration_seconds', 'RAG request duration')
ERROR_COUNT = Counter('rag_errors_total', 'Total RAG errors')
app = FastAPI(
title="Production RAG API",
version="1.0.0",
description="Azure 기반 프로덕션급 RAG API"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# RAG 시스템 (전역)
rag_system = None
@app.on_event("startup")
async def startup_event():
"""애플리케이션 시작 시 초기화"""
global rag_system
logging.info("RAG 시스템 초기화 중...")
rag_system = ProductionRAGSystem()
logging.info("RAG 시스템 초기화 완료")
class QueryRequest(BaseModel):
question: str
include_sources: bool = True
class QueryResponse(BaseModel):
question: str
answer: str
confidence: float
sources: Optional[list] = None
duration_ms: int
@app.get("/")
async def root():
return {
"service": "Production RAG API",
"version": "1.0.0",
"status": "running"
}
@app.get("/health")
async def health():
"""Health check"""
return {"status": "healthy"}
@app.get("/metrics")
async def metrics():
"""Prometheus 메트릭"""
return Response(generate_latest(), media_type="text/plain")
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""RAG 쿼리"""
REQUEST_COUNT.inc()
start_time = time.time()
try:
result = rag_system.query(request.question)
duration_ms = int((time.time() - start_time) * 1000)
REQUEST_DURATION.observe(time.time() - start_time)
response = QueryResponse(
question=request.question,
answer=result["answer"],
confidence=result["confidence"],
sources=result["sources"] if request.include_sources else None,
duration_ms=duration_ms
)
logging.info(f"쿼리 완료: {duration_ms}ms, 신뢰도: {result['confidence']}")
return response
except Exception as e:
ERROR_COUNT.inc()
logging.error(f"쿼리 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) 5.2 Container 배포
# 이미지 빌드
docker build -t rag-api-prod:latest .
# ACR 푸시
az acr login --name acrragprod
docker tag rag-api-prod:latest acrragprod.azurecr.io/rag-api-prod:latest
docker push acrragprod.azurecr.io/rag-api-prod:latest
# Container App 배포
az containerapp create \
--name ca-rag-prod \
--resource-group rg-rag-prod \
--environment cae-ragprod \
--image acrragprod.azurecr.io/rag-api-prod:latest \
--target-port 8000 \
--ingress external \
--min-replicas 1 \
--max-replicas 10 \
--cpu 2.0 \
--memory 4.0Gi \
--env-vars-file env.yaml 6 모니터링 및 관측성
6.1 Application Insights 통합
from opencensus.ext.azure.log_exporter import AzureLogHandler
from opencensus.ext.azure import metrics_exporter
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
from opencensus.stats import stats as stats_module
from opencensus.stats import view as view_module
from opencensus.tags import tag_map as tag_map_module
# Application Insights 설정
logger = logging.getLogger(__name__)
logger.addHandler(AzureLogHandler(
connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
))
# 커스텀 메트릭
stats = stats_module.stats
view_manager = stats.view_manager
stats_recorder = stats.stats_recorder
# 메트릭 정의
rag_latency_measure = measure_module.MeasureFloat(
"rag_latency",
"RAG query latency",
"ms"
)
rag_latency_view = view_module.View(
"rag_latency_view",
"distribution of RAG latencies",
[],
rag_latency_measure,
aggregation_module.DistributionAggregation([10, 50, 100, 500, 1000, 5000])
)
view_manager.register_view(rag_latency_view)
# 메트릭 Exporter
exporter = metrics_exporter.new_metrics_exporter(
connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
)
view_manager.register_exporter(exporter)
# 사용
def query_with_metrics(question: str):
start = time.time()
# RAG 실행
result = rag_system.query(question)
# 메트릭 기록
latency_ms = (time.time() - start) * 1000
mmap = stats_recorder.new_measurement_map()
tmap = tag_map_module.TagMap()
mmap.measure_float_put(rag_latency_measure, latency_ms)
mmap.record(tmap)
# 로그
logger.info(
f"RAG Query",
extra={
"custom_dimensions": {
"question_length": len(question),
"answer_length": len(result["answer"]),
"confidence": result["confidence"],
"latency_ms": latency_ms
}
}
)
return result 6.2 Kusto 쿼리
응답 시간 분석:
customMetrics
| where name == "rag_request_duration_seconds"
| summarize
P50=percentile(value, 50),
P95=percentile(value, 95),
P99=percentile(value, 99),
Avg=avg(value)
by bin(timestamp, 5m)
| render timechart 오류율 모니터링:
customMetrics
| where name in ("rag_requests_total", "rag_errors_total")
| summarize
Total=sumif(value, name == "rag_requests_total"),
Errors=sumif(value, name == "rag_errors_total")
by bin(timestamp, 5m)
| extend ErrorRate = (Errors * 100.0 / Total)
| project timestamp, ErrorRate
| render timechart 사용자 쿼리 분석:
7 성능 최적화
7.1 캐싱 전략
import redis
import json
import hashlib
class CachedRAGSystem(ProductionRAGSystem):
"""캐시가 통합된 RAG 시스템"""
def __init__(self):
super().__init__()
self.redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
decode_responses=True
)
self.cache_ttl = 3600 # 1시간
def _get_cache_key(self, question: str) -> str:
"""질문을 캐시 키로 변환"""
return f"rag:{hashlib.md5(question.encode()).hexdigest()}"
def query(self, question: str) -> dict:
"""캐시 우선 쿼리"""
cache_key = self._get_cache_key(question)
# 캐시 확인
cached = self.redis_client.get(cache_key)
if cached:
logging.info("캐시 히트")
return json.loads(cached)
# RAG 실행
result = super().query(question)
# 캐시 저장
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(result)
)
return result 7.2 배치 처리
from typing import List
def batch_query(questions: List[str], batch_size: int = 10) -> List[dict]:
"""배치 쿼리 처리"""
results = []
for i in range(0, len(questions), batch_size):
batch = questions[i:i+batch_size]
# 병렬 임베딩
embeddings = processor.embeddings.embed_documents(batch)
# 배치 검색
for question, embedding in zip(batch, embeddings):
docs = vector_store.similarity_search_by_vector(
embedding,
k=5
)
# 답변 생성
answer = llm.invoke(...)
results.append({
"question": question,
"answer": answer
})
return results 8 보안 강화
8.1 Managed Identity 전환
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Azure OpenAI
llm = AzureChatOpenAI(
azure_deployment="gpt-4o",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
azure_ad_token_provider=lambda: credential.get_token(
"https://cognitiveservices.azure.com/.default"
).token
)
# Azure Search
from azure.search.documents import SearchClient
search_client = SearchClient(
endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"),
index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"),
credential=credential
) 8.2 API 인증
from fastapi import Security, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
"""JWT 토큰 검증"""
token = credentials.credentials
try:
payload = jwt.decode(
token,
os.getenv("JWT_SECRET"),
algorithms=["HS256"]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="토큰 만료")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
@app.post("/query")
async def query(request: QueryRequest, user=Depends(verify_token)):
"""인증된 쿼리"""
logging.info(f"사용자 {user['sub']} 쿼리")
return rag_system.query(request.question) 9 비용 최적화
9.1 월별 비용 추정
| 서비스 | 사양 | 사용량 | 월 비용 |
|---|---|---|---|
| Blob Storage | Standard LRS | 100GB | $2.00 |
| Document Intelligence | S0 | 1,000 페이지 | $10.00 |
| Azure OpenAI (Embeddings) | text-embedding-3-small | 10M 토큰 | $1.00 |
| Azure OpenAI (LLM) | GPT-4o | 1M in, 500K out | $12.50 |
| Azure AI Search | Standard S1 | 1 SU | $250.00 |
| Container Apps | 1 vCPU, 2GB | 1 인스턴스 항상 실행 | $77.00 |
| Application Insights | - | 5GB 로그 | $12.50 |
| Log Analytics | - | 5GB 저장 | $12.50 |
| 총 비용 | $377.50/월 |
9.2 비용 절감 팁
- Azure AI Search 최적화
# 개발 환경: Basic 사용 (월 $75)
az search service update \
--name search-rag-dev \
--resource-group rg-rag-dev \
--sku basic - Container Apps 스케일링
# 오프피크 시간에 0으로 스케일
az containerapp update \
--name ca-rag-prod \
--resource-group rg-rag-prod \
--min-replicas 0 - 캐싱으로 OpenAI 호출 감소
- Redis 캐시로 중복 쿼리 방지
- 예상 절감: 월 $5-10
- Blob Storage 라이프사이클 정책
10 배포 체크리스트
10.1 프로덕션 배포 전
11 참고 자료
11.1 공식 문서
11.2 GitHub 샘플
12 다음 단계
이제 완전한 Azure RAG 시스템을 구축했다. 지속적인 개선을 위해 다음을 고려하자:
- 모니터링 강화: 사용자 피드백 수집, A/B 테스트
- 고급 RAG 패턴: Self-RAG, Corrective RAG, Adaptive RAG
- 다국어 지원: 언어별 임베딩 모델
- 멀티모달: 이미지, 테이블 처리
- 에이전트 통합: LangGraph로 복잡한 워크플로우