End-to-End Azure RAG

전체 파이프라인 통합

Azure 생태계를 활용한 프로덕션급 RAG 시스템 전체 구축 및 운영 방법을 다룬다.

AI
RAG
Azure
MLOps
저자

Kwangmin Kim

공개

2025년 11월 10일

1 전체 아키텍처

지금까지 학습한 Azure RAG 컴포넌트를 통합한 엔터프라이즈 아키텍처다.

1.1 시스템 구성도

graph TB  
    subgraph "Data Ingestion"  
        A[Document Upload] --> B[Azure Blob Storage]  
        B --> C[Document Intelligence]  
        C --> D[Text Extraction]  
    end  
    
    subgraph "Embedding & Indexing"  
        D --> E[Azure OpenAI Embeddings]  
        E --> F[Azure AI Search]  
        F --> G[Vector Store]  
    end  
    
    subgraph "RAG Engine"  
        H[User Query] --> I[Query Embeddings]  
        I --> J[Retriever]  
        J --> K[LangGraph Workflow]  
        K --> L[Azure OpenAI GPT-4o]  
        L --> M[Response]  
    end  
    
    subgraph "Deployment"  
        N[Azure Functions] --> O[API Gateway]  
        P[Azure Container Apps] --> O  
        O --> Q[Application Gateway]  
    end  
    
    subgraph "Monitoring"  
        R[Application Insights]  
        S[Log Analytics]  
        T[Azure Monitor]  
    end  
    
    G --> J  
    M --> O  
    O --> R  
    O --> S  
    N --> T  
    P --> T  

1.2 핵심 컴포넌트

계층 Azure 서비스 역할
저장소 Blob Storage 원본 문서 저장
전처리 Document Intelligence OCR, 레이아웃 분석
임베딩 Azure OpenAI (Embeddings) 텍스트 → 벡터 변환
검색 Azure AI Search 벡터 저장 및 검색
RAG LangGraph 검색-생성 워크플로우
LLM Azure OpenAI (GPT-4o) 답변 생성
배포 Functions/Container Apps API 서빙
모니터링 Application Insights 로그, 메트릭, 추적

2 인프라 구성

2.1 전체 리소스 배포

deploy.bicep:

param location string = resourceGroup().location  
param projectName string = 'rag-prod'  

// Storage Account  
resource storageAccount 'Microsoft.Storage/storageAccounts@2023-01-01' = {  
  name: 'st${projectName}'  
  location: location  
  sku: {  
    name: 'Standard_LRS'  
  }  
  kind: 'StorageV2'  
  properties: {  
    accessTier: 'Hot'  
    supportsHttpsTrafficOnly: true  
    minimumTlsVersion: 'TLS1_2'  
  }  
}  

// Document Intelligence  
resource documentIntelligence 'Microsoft.CognitiveServices/accounts@2023-05-01' = {  
  name: 'di-${projectName}'  
  location: location  
  sku: {  
    name: 'S0'  
  }  
  kind: 'FormRecognizer'  
  properties: {  
    customSubDomainName: 'di-${projectName}'  
    publicNetworkAccess: 'Enabled'  
  }  
}  

// Azure OpenAI  
resource openai 'Microsoft.CognitiveServices/accounts@2023-05-01' = {  
  name: 'openai-${projectName}'  
  location: location  
  sku: {  
    name: 'S0'  
  }  
  kind: 'OpenAI'  
  properties: {  
    customSubDomainName: 'openai-${projectName}'  
    publicNetworkAccess: 'Enabled'  
  }  
}  

// Azure AI Search  
resource search 'Microsoft.Search/searchServices@2023-11-01' = {  
  name: 'search-${projectName}'  
  location: location  
  sku: {  
    name: 'standard'  
  }  
  properties: {  
    replicaCount: 1  
    partitionCount: 1  
    hostingMode: 'default'  
  }  
}  

// Container Apps Environment  
resource containerAppEnv 'Microsoft.App/managedEnvironments@2023-05-01' = {  
  name: 'cae-${projectName}'  
  location: location  
  properties: {  
    appLogsConfiguration: {  
      destination: 'log-analytics'  
      logAnalyticsConfiguration: {  
        customerId: logAnalytics.properties.customerId  
        sharedKey: logAnalytics.listKeys().primarySharedKey  
      }  
    }  
  }  
}  

// Log Analytics  
resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2022-10-01' = {  
  name: 'law-${projectName}'  
  location: location  
  properties: {  
    sku: {  
      name: 'PerGB2018'  
    }  
    retentionInDays: 30  
  }  
}  

// Application Insights  
resource appInsights 'Microsoft.Insights/components@2020-02-02' = {  
  name: 'appi-${projectName}'  
  location: location  
  kind: 'web'  
  properties: {  
    Application_Type: 'web'  
    WorkspaceResourceId: logAnalytics.id  
  }  
}  

// Container Registry  
resource containerRegistry 'Microsoft.ContainerRegistry/registries@2023-07-01' = {  
  name: 'acr${projectName}'  
  location: location  
  sku: {  
    name: 'Basic'  
  }  
  properties: {  
    adminUserEnabled: true  
  }  
}  

output storageAccountName string = storageAccount.name  
output documentIntelligenceEndpoint string = documentIntelligence.properties.endpoint  
output openaiEndpoint string = openai.properties.endpoint  
output searchEndpoint string = search.properties.endpoint  
output containerRegistryName string = containerRegistry.name  
output appInsightsConnectionString string = appInsights.properties.ConnectionString  

2.2 배포 실행

# 리소스 그룹 생성  
az group create --name rg-rag-prod --location koreacentral  

# Bicep 배포  
az deployment group create \  
  --resource-group rg-rag-prod \  
  --template-file deploy.bicep \  
  --parameters projectName=ragprod  

# 배포 출력 확인  
az deployment group show \  
  --resource-group rg-rag-prod \  
  --name deploy \  
  --query properties.outputs  

2.3 환경 변수 통합 관리

.env:

# Storage  
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=stragprod;..."  
AZURE_STORAGE_CONTAINER="documents"  

# Document Intelligence  
AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT="https://di-ragprod.cognitiveservices.azure.com/"  
AZURE_DOCUMENT_INTELLIGENCE_KEY="your-key"  

# Azure OpenAI  
AZURE_OPENAI_ENDPOINT="https://openai-ragprod.openai.azure.com/"  
AZURE_OPENAI_API_KEY="your-key"  
AZURE_OPENAI_DEPLOYMENT_EMBEDDINGS="text-embedding-3-small"  
AZURE_OPENAI_DEPLOYMENT_LLM="gpt-4o"  
AZURE_OPENAI_API_VERSION="2024-02-01"  

# Azure AI Search  
AZURE_SEARCH_ENDPOINT="https://search-ragprod.search.windows.net"  
AZURE_SEARCH_API_KEY="your-key"  
AZURE_SEARCH_INDEX_NAME="rag-documents"  

# Application Insights  
APPLICATIONINSIGHTS_CONNECTION_STRING="InstrumentationKey=..."  

# API Configuration  
API_RATE_LIMIT=100  
API_TIMEOUT=300  
MAX_TOKENS=1000  

3 데이터 파이프라인

3.1 문서 처리 워크플로우

import os  
from azure.storage.blob import BlobServiceClient  
from azure.ai.formrecognizer import DocumentAnalysisClient  
from azure.core.credentials import AzureKeyCredential  
from langchain_openai import AzureOpenAIEmbeddings  
from langchain_community.vectorstores.azuresearch import AzureSearch  
from langchain_text_splitters import RecursiveCharacterTextSplitter  
import logging  

class DocumentProcessor:  
    """엔드투엔드 문서 처리 파이프라인"""  
    
    def __init__(self):  
        # Blob Storage  
        self.blob_client = BlobServiceClient.from_connection_string(  
            os.getenv("AZURE_STORAGE_CONNECTION_STRING")  
        )  
        self.container_name = os.getenv("AZURE_STORAGE_CONTAINER")  
        
        # Document Intelligence  
        self.doc_intelligence = DocumentAnalysisClient(  
            endpoint=os.getenv("AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT"),  
            credential=AzureKeyCredential(os.getenv("AZURE_DOCUMENT_INTELLIGENCE_KEY"))  
        )  
        
        # Embeddings  
        self.embeddings = AzureOpenAIEmbeddings(  
            azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_EMBEDDINGS"),  
            openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),  
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),  
            api_key=os.getenv("AZURE_OPENAI_API_KEY")  
        )  
        
        # Vector Store  
        self.vector_store = AzureSearch(  
            azure_search_endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"),  
            azure_search_key=os.getenv("AZURE_SEARCH_API_KEY"),  
            index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"),  
            embedding_function=self.embeddings.embed_query  
        )  
        
        # Text Splitter  
        self.text_splitter = RecursiveCharacterTextSplitter(  
            chunk_size=1000,  
            chunk_overlap=200,  
            length_function=len  
        )  
        
        logging.info("문서 처리 파이프라인 초기화 완료")  
    
    def process_document(self, blob_name: str) -> dict:  
        """단일 문서 처리"""  
        logging.info(f"문서 처리 시작: {blob_name}")  
        
        # 1. Blob에서 문서 다운로드  
        blob_client = self.blob_client.get_blob_client(  
            container=self.container_name,  
            blob=blob_name  
        )  
        blob_data = blob_client.download_blob().readall()  
        
        # 2. Document Intelligence로 텍스트 추출  
        poller = self.doc_intelligence.begin_analyze_document(  
            "prebuilt-document",  
            document=blob_data  
        )  
        result = poller.result()  
        
        extracted_text = ""  
        for page in result.pages:  
            for line in page.lines:  
                extracted_text += line.content + "\n"  
        
        logging.info(f"텍스트 추출 완료: {len(extracted_text)} 문자")  
        
        # 3. 텍스트 분할  
        chunks = self.text_splitter.split_text(extracted_text)  
        logging.info(f"청크 생성: {len(chunks)}개")  
        
        # 4. 메타데이터 생성  
        from langchain_core.documents import Document  
        documents = []  
        for i, chunk in enumerate(chunks):  
            doc = Document(  
                page_content=chunk,  
                metadata={  
                    "source": blob_name,  
                    "chunk_id": i,  
                    "total_chunks": len(chunks),  
                    "blob_url": blob_client.url  
                }  
            )  
            documents.append(doc)  
        
        # 5. 벡터 저장소에 업로드  
        ids = self.vector_store.add_documents(documents)  
        logging.info(f"벡터 저장 완료: {len(ids)}개 문서")  
        
        return {  
            "blob_name": blob_name,  
            "chunks": len(chunks),  
            "vector_ids": ids,  
            "status": "success"  
        }  
    
    def process_all_documents(self):  
        """컨테이너의 모든 문서 처리"""  
        container_client = self.blob_client.get_container_client(  
            self.container_name  
        )  
        
        results = []  
        for blob in container_client.list_blobs():  
            try:  
                result = self.process_document(blob.name)  
                results.append(result)  
            except Exception as e:  
                logging.error(f"문서 처리 실패 ({blob.name}): {str(e)}")  
                results.append({  
                    "blob_name": blob.name,  
                    "status": "failed",  
                    "error": str(e)  
                })  
        
        return results  

3.2 Batch 처리 (Azure Functions)

function_app.py:

import azure.functions as func  
import logging  
from document_processor import DocumentProcessor  

app = func.FunctionApp()  

@app.blob_trigger(  
    arg_name="myblob",  
    path="documents/{name}",  
    connection="AzureWebJobsStorage"  
)  
def blob_trigger_process(myblob: func.InputStream):  
    """Blob 업로드 시 자동 처리"""  
    logging.info(f"새 문서 감지: {myblob.name}")  
    
    processor = DocumentProcessor()  
    try:  
        result = processor.process_document(myblob.name)  
        logging.info(f"처리 완료: {result}")  
    except Exception as e:  
        logging.error(f"처리 실패: {str(e)}")  
        raise  

@app.timer_trigger(  
    arg_name="timer",  
    schedule="0 0 2 * * *"  # 매일 새벽 2시  
)  
def scheduled_reindex(timer: func.TimerRequest):  
    """정기적인 재색인"""  
    logging.info("정기 재색인 시작")  
    
    processor = DocumentProcessor()  
    results = processor.process_all_documents()  
    
    success = sum(1 for r in results if r["status"] == "success")  
    failed = len(results) - success  
    
    logging.info(f"재색인 완료: 성공 {success}, 실패 {failed}")  

4 RAG 시스템 통합

4.1 LangGraph 워크플로우

from typing import TypedDict, List  
from langgraph.graph import StateGraph, END  
from langchain_core.documents import Document  
from langchain_openai import AzureChatOpenAI  
from langchain_core.prompts import ChatPromptTemplate  
from langchain_core.output_parsers import StrOutputParser  
import logging  

class RAGState(TypedDict):  
    """RAG 상태"""  
    question: str  
    context: List[Document]  
    answer: str  
    retrieval_success: bool  
    confidence: float  

class ProductionRAGSystem:  
    """프로덕션급 RAG 시스템"""  
    
    def __init__(self):  
        # Vector Store (이미 초기화된 것 사용)  
        processor = DocumentProcessor()  
        self.retriever = processor.vector_store.as_retriever(  
            search_type="similarity",  
            search_kwargs={"k": 5}  
        )  
        
        # LLM  
        self.llm = AzureChatOpenAI(  
            azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_LLM"),  
            openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),  
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),  
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
            temperature=0,  
            max_tokens=int(os.getenv("MAX_TOKENS", 1000))  
        )  
        
        # Prompt  
        self.prompt = ChatPromptTemplate.from_template(  
            """다음 컨텍스트를 참고하여 질문에 답변하세요.  

## 지침:  
1. 컨텍스트에 있는 정보만 사용하세요  
2. 확실하지 않으면 "잘 모르겠습니다"라고 답하세요  
3. 한국어로 답변하세요  
4. 출처를 명시하세요  

## 컨텍스트:  
{context}  

## 질문:  
{question}  

## 답변:"""  
        )  
        
        # Workflow  
        self.workflow = self._build_workflow()  
        self.app = self.workflow.compile()  
        
        logging.info("RAG 시스템 초기화 완료")  
    
    def _build_workflow(self):  
        """LangGraph 워크플로우 구성"""  
        workflow = StateGraph(RAGState)  
        
        # Nodes  
        workflow.add_node("retrieve", self._retrieve)  
        workflow.add_node("generate", self._generate)  
        workflow.add_node("evaluate", self._evaluate)  
        workflow.add_node("fallback", self._fallback)  
        
        # Edges  
        workflow.set_entry_point("retrieve")  
        workflow.add_conditional_edges(  
            "retrieve",  
            self._should_generate,  
            {  
                "generate": "generate",  
                "fallback": "fallback"  
            }  
        )  
        workflow.add_edge("generate", "evaluate")  
        workflow.add_conditional_edges(  
            "evaluate",  
            self._should_retry,  
            {  
                "end": END,  
                "fallback": "fallback"  
            }  
        )  
        workflow.add_edge("fallback", END)  
        
        return workflow  
    
    def _retrieve(self, state: RAGState) -> RAGState:  
        """문서 검색"""  
        logging.info(f"검색 중: {state['question']}")  
        
        docs = self.retriever.invoke(state["question"])  
        
        return {  
            **state,  
            "context": docs,  
            "retrieval_success": len(docs) > 0  
        }  
    
    def _generate(self, state: RAGState) -> RAGState:  
        """답변 생성"""  
        logging.info("답변 생성 중")  
        
        context_text = "\n\n".join([  
            f"[출처: {doc.metadata.get('source', 'Unknown')}]\n{doc.page_content}"  
            for doc in state["context"]  
        ])  
        
        chain = self.prompt | self.llm | StrOutputParser()  
        answer = chain.invoke({  
            "context": context_text,  
            "question": state["question"]  
        })  
        
        return {  
            **state,  
            "answer": answer  
        }  
    
    def _evaluate(self, state: RAGState) -> RAGState:  
        """답변 품질 평가"""  
        # 간단한 휴리스틱 평가  
        answer = state["answer"]  
        
        # 길이 체크  
        if len(answer) < 10:  
            confidence = 0.3  
        elif "잘 모르겠습니다" in answer or "확실하지 않습니다" in answer:  
            confidence = 0.5  
        else:  
            confidence = 0.8  
        
        logging.info(f"답변 신뢰도: {confidence}")  
        
        return {  
            **state,  
            "confidence": confidence  
        }  
    
    def _fallback(self, state: RAGState) -> RAGState:  
        """대체 응답"""  
        logging.warning("대체 응답 사용")  
        
        return {  
            **state,  
            "answer": "죄송합니다. 관련 정보를 찾지 못했습니다. 다른 질문을 해주시겠어요?",  
            "confidence": 0.0  
        }  
    
    def _should_generate(self, state: RAGState) -> str:  
        """생성 여부 결정"""  
        return "generate" if state["retrieval_success"] else "fallback"  
    
    def _should_retry(self, state: RAGState) -> str:  
        """재시도 여부 결정"""  
        return "end" if state["confidence"] >= 0.6 else "fallback"  
    
    def query(self, question: str) -> dict:  
        """RAG 쿼리 실행"""  
        initial_state = RAGState(  
            question=question,  
            context=[],  
            answer="",  
            retrieval_success=False,  
            confidence=0.0  
        )  
        
        final_state = self.app.invoke(initial_state)  
        
        return {  
            "question": question,  
            "answer": final_state["answer"],  
            "confidence": final_state["confidence"],  
            "sources": [  
                doc.metadata.get("source", "Unknown")  
                for doc in final_state["context"]  
            ]  
        }  

5 API 서버 배포

5.1 FastAPI 통합

main.py:

from fastapi import FastAPI, HTTPException, Depends  
from fastapi.middleware.cors import CORSMiddleware  
from pydantic import BaseModel  
from typing import Optional  
import os  
import logging  
from prometheus_client import Counter, Histogram, generate_latest  
from fastapi.responses import Response  
import time  

# 메트릭  
REQUEST_COUNT = Counter('rag_requests_total', 'Total RAG requests')  
REQUEST_DURATION = Histogram('rag_request_duration_seconds', 'RAG request duration')  
ERROR_COUNT = Counter('rag_errors_total', 'Total RAG errors')  

app = FastAPI(  
    title="Production RAG API",  
    version="1.0.0",  
    description="Azure 기반 프로덕션급 RAG API"  
)  

# CORS  
app.add_middleware(  
    CORSMiddleware,  
    allow_origins=["*"],  
    allow_credentials=True,  
    allow_methods=["*"],  
    allow_headers=["*"],  
)  

# RAG 시스템 (전역)  
rag_system = None  

@app.on_event("startup")  
async def startup_event():  
    """애플리케이션 시작 시 초기화"""  
    global rag_system  
    logging.info("RAG 시스템 초기화 중...")  
    rag_system = ProductionRAGSystem()  
    logging.info("RAG 시스템 초기화 완료")  

class QueryRequest(BaseModel):  
    question: str  
    include_sources: bool = True  

class QueryResponse(BaseModel):  
    question: str  
    answer: str  
    confidence: float  
    sources: Optional[list] = None  
    duration_ms: int  

@app.get("/")  
async def root():  
    return {  
        "service": "Production RAG API",  
        "version": "1.0.0",  
        "status": "running"  
    }  

@app.get("/health")  
async def health():  
    """Health check"""  
    return {"status": "healthy"}  

@app.get("/metrics")  
async def metrics():  
    """Prometheus 메트릭"""  
    return Response(generate_latest(), media_type="text/plain")  

@app.post("/query", response_model=QueryResponse)  
async def query(request: QueryRequest):  
    """RAG 쿼리"""  
    REQUEST_COUNT.inc()  
    start_time = time.time()  
    
    try:  
        result = rag_system.query(request.question)  
        
        duration_ms = int((time.time() - start_time) * 1000)  
        REQUEST_DURATION.observe(time.time() - start_time)  
        
        response = QueryResponse(  
            question=request.question,  
            answer=result["answer"],  
            confidence=result["confidence"],  
            sources=result["sources"] if request.include_sources else None,  
            duration_ms=duration_ms  
        )  
        
        logging.info(f"쿼리 완료: {duration_ms}ms, 신뢰도: {result['confidence']}")  
        
        return response  
    
    except Exception as e:  
        ERROR_COUNT.inc()  
        logging.error(f"쿼리 실패: {str(e)}")  
        raise HTTPException(status_code=500, detail=str(e))  

if __name__ == "__main__":  
    import uvicorn  
    uvicorn.run(app, host="0.0.0.0", port=8000)  

5.2 Container 배포

# 이미지 빌드  
docker build -t rag-api-prod:latest .  

# ACR 푸시  
az acr login --name acrragprod  
docker tag rag-api-prod:latest acrragprod.azurecr.io/rag-api-prod:latest  
docker push acrragprod.azurecr.io/rag-api-prod:latest  

# Container App 배포  
az containerapp create \  
  --name ca-rag-prod \  
  --resource-group rg-rag-prod \  
  --environment cae-ragprod \  
  --image acrragprod.azurecr.io/rag-api-prod:latest \  
  --target-port 8000 \  
  --ingress external \  
  --min-replicas 1 \  
  --max-replicas 10 \  
  --cpu 2.0 \  
  --memory 4.0Gi \  
  --env-vars-file env.yaml  

6 모니터링 및 관측성

6.1 Application Insights 통합

from opencensus.ext.azure.log_exporter import AzureLogHandler  
from opencensus.ext.azure import metrics_exporter  
from opencensus.stats import aggregation as aggregation_module  
from opencensus.stats import measure as measure_module  
from opencensus.stats import stats as stats_module  
from opencensus.stats import view as view_module  
from opencensus.tags import tag_map as tag_map_module  

# Application Insights 설정  
logger = logging.getLogger(__name__)  
logger.addHandler(AzureLogHandler(  
    connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")  
))  

# 커스텀 메트릭  
stats = stats_module.stats  
view_manager = stats.view_manager  
stats_recorder = stats.stats_recorder  

# 메트릭 정의  
rag_latency_measure = measure_module.MeasureFloat(  
    "rag_latency",  
    "RAG query latency",  
    "ms"  
)  

rag_latency_view = view_module.View(  
    "rag_latency_view",  
    "distribution of RAG latencies",  
    [],  
    rag_latency_measure,  
    aggregation_module.DistributionAggregation([10, 50, 100, 500, 1000, 5000])  
)  

view_manager.register_view(rag_latency_view)  

# 메트릭 Exporter  
exporter = metrics_exporter.new_metrics_exporter(  
    connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")  
)  
view_manager.register_exporter(exporter)  

# 사용  
def query_with_metrics(question: str):  
    start = time.time()  
    
    # RAG 실행  
    result = rag_system.query(question)  
    
    # 메트릭 기록  
    latency_ms = (time.time() - start) * 1000  
    mmap = stats_recorder.new_measurement_map()  
    tmap = tag_map_module.TagMap()  
    
    mmap.measure_float_put(rag_latency_measure, latency_ms)  
    mmap.record(tmap)  
    
    # 로그  
    logger.info(  
        f"RAG Query",  
        extra={  
            "custom_dimensions": {  
                "question_length": len(question),  
                "answer_length": len(result["answer"]),  
                "confidence": result["confidence"],  
                "latency_ms": latency_ms  
            }  
        }  
    )  
    
    return result  

6.2 Kusto 쿼리

응답 시간 분석:

customMetrics  
| where name == "rag_request_duration_seconds"  
| summarize   
    P50=percentile(value, 50),  
    P95=percentile(value, 95),  
    P99=percentile(value, 99),  
    Avg=avg(value)  
  by bin(timestamp, 5m)  
| render timechart  

오류율 모니터링:

customMetrics  
| where name in ("rag_requests_total", "rag_errors_total")  
| summarize   
    Total=sumif(value, name == "rag_requests_total"),  
    Errors=sumif(value, name == "rag_errors_total")  
  by bin(timestamp, 5m)  
| extend ErrorRate = (Errors * 100.0 / Total)  
| project timestamp, ErrorRate  
| render timechart  

사용자 쿼리 분석:

traces  
| where customDimensions.question_length > 0  
| summarize   
    Count=count(),  
    AvgLength=avg(toint(customDimensions.question_length)),  
    AvgConfidence=avg(todouble(customDimensions.confidence))  
  by bin(timestamp, 1h)  
| render timechart  

7 성능 최적화

7.1 캐싱 전략

import redis  
import json  
import hashlib  

class CachedRAGSystem(ProductionRAGSystem):  
    """캐시가 통합된 RAG 시스템"""  
    
    def __init__(self):  
        super().__init__()  
        self.redis_client = redis.Redis(  
            host=os.getenv("REDIS_HOST", "localhost"),  
            port=int(os.getenv("REDIS_PORT", 6379)),  
            decode_responses=True  
        )  
        self.cache_ttl = 3600  # 1시간  
    
    def _get_cache_key(self, question: str) -> str:  
        """질문을 캐시 키로 변환"""  
        return f"rag:{hashlib.md5(question.encode()).hexdigest()}"  
    
    def query(self, question: str) -> dict:  
        """캐시 우선 쿼리"""  
        cache_key = self._get_cache_key(question)  
        
        # 캐시 확인  
        cached = self.redis_client.get(cache_key)  
        if cached:  
            logging.info("캐시 히트")  
            return json.loads(cached)  
        
        # RAG 실행  
        result = super().query(question)  
        
        # 캐시 저장  
        self.redis_client.setex(  
            cache_key,  
            self.cache_ttl,  
            json.dumps(result)  
        )  
        
        return result  

7.2 배치 처리

from typing import List  

def batch_query(questions: List[str], batch_size: int = 10) -> List[dict]:  
    """배치 쿼리 처리"""  
    results = []  
    
    for i in range(0, len(questions), batch_size):  
        batch = questions[i:i+batch_size]  
        
        # 병렬 임베딩  
        embeddings = processor.embeddings.embed_documents(batch)  
        
        # 배치 검색  
        for question, embedding in zip(batch, embeddings):  
            docs = vector_store.similarity_search_by_vector(  
                embedding,  
                k=5  
            )  
            
            # 답변 생성  
            answer = llm.invoke(...)  
            
            results.append({  
                "question": question,  
                "answer": answer  
            })  
    
    return results  

8 보안 강화

8.1 Managed Identity 전환

from azure.identity import DefaultAzureCredential  

credential = DefaultAzureCredential()  

# Azure OpenAI  
llm = AzureChatOpenAI(  
    azure_deployment="gpt-4o",  
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),  
    azure_ad_token_provider=lambda: credential.get_token(  
        "https://cognitiveservices.azure.com/.default"  
    ).token  
)  

# Azure Search  
from azure.search.documents import SearchClient  

search_client = SearchClient(  
    endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"),  
    index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"),  
    credential=credential  
)  

8.2 API 인증

from fastapi import Security, HTTPException  
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials  
import jwt  

security = HTTPBearer()  

def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):  
    """JWT 토큰 검증"""  
    token = credentials.credentials  
    
    try:  
        payload = jwt.decode(  
            token,  
            os.getenv("JWT_SECRET"),  
            algorithms=["HS256"]  
        )  
        return payload  
    except jwt.ExpiredSignatureError:  
        raise HTTPException(status_code=401, detail="토큰 만료")  
    except jwt.InvalidTokenError:  
        raise HTTPException(status_code=401, detail="유효하지 않은 토큰")  

@app.post("/query")  
async def query(request: QueryRequest, user=Depends(verify_token)):  
    """인증된 쿼리"""  
    logging.info(f"사용자 {user['sub']} 쿼리")  
    return rag_system.query(request.question)  

9 비용 최적화

9.1 월별 비용 추정

서비스 사양 사용량 월 비용
Blob Storage Standard LRS 100GB $2.00
Document Intelligence S0 1,000 페이지 $10.00
Azure OpenAI (Embeddings) text-embedding-3-small 10M 토큰 $1.00
Azure OpenAI (LLM) GPT-4o 1M in, 500K out $12.50
Azure AI Search Standard S1 1 SU $250.00
Container Apps 1 vCPU, 2GB 1 인스턴스 항상 실행 $77.00
Application Insights - 5GB 로그 $12.50
Log Analytics - 5GB 저장 $12.50
총 비용 $377.50/월

9.2 비용 절감 팁

  1. Azure AI Search 최적화
# 개발 환경: Basic 사용 (월 $75)  
az search service update \  
  --name search-rag-dev \  
  --resource-group rg-rag-dev \  
  --sku basic  
  1. Container Apps 스케일링
# 오프피크 시간에 0으로 스케일  
az containerapp update \  
  --name ca-rag-prod \  
  --resource-group rg-rag-prod \  
  --min-replicas 0  
  1. 캐싱으로 OpenAI 호출 감소
  • Redis 캐시로 중복 쿼리 방지
  • 예상 절감: 월 $5-10
  1. Blob Storage 라이프사이클 정책
# 90일 후 Cool tier로 이동  
az storage account management-policy create \  
  --account-name stragprod \  
  --policy @lifecycle-policy.json  

10 배포 체크리스트

10.1 프로덕션 배포 전

11 참고 자료

11.1 공식 문서

11.2 GitHub 샘플

12 다음 단계

이제 완전한 Azure RAG 시스템을 구축했다. 지속적인 개선을 위해 다음을 고려하자:

  1. 모니터링 강화: 사용자 피드백 수집, A/B 테스트
  2. 고급 RAG 패턴: Self-RAG, Corrective RAG, Adaptive RAG
  3. 다국어 지원: 언어별 임베딩 모델
  4. 멀티모달: 이미지, 테이블 처리
  5. 에이전트 통합: LangGraph로 복잡한 워크플로우

Subscribe

Enjoy this blog? Get notified of new posts by email: