AI Agent 플랫폼 데이터 표준화 계층

프롬프트, 벡터 데이터, 메타데이터 관리

AI Agent 플랫폼의 데이터 표준화 계층을 설계한다. 프롬프트 버전 관리와 A/B 테스트, 벡터 데이터 자동 업데이트, 표준 메타데이터 스키마, 설정 관리 방법을 구체적으로 다룬다. 실제 PromptRegistry, VectorStoreManager 구현과 함께 Agent 간 데이터 일관성을 유지하고, 변경 추적 및 롤백이 가능한 체계적인 데이터 관리 전략을 제시한다.

Engineering
System
Architecture Design
Agent
Platform
저자

Kwangmin Kim

공개

2026년 01월 30일

1 들어가며

1.1 왜 데이터 표준화 계층이 필요한가?

Phase 4 플랫폼을 운영하다 보면 발생하는 문제들:

질문들: - “프롬프트를 수정했는데, 어떤 Agent가 영향받는지 모르겠어요” - “RAG 데이터를 업데이트하면 Agent를 재배포해야 하나요?” - “Agent 실행 결과를 어떻게 일관되게 기록하나요?” - “프롬프트 A/B 테스트는 어떻게 하나요?”

이 글의 목표: - 프롬프트 버전 관리 시스템 구축 - 벡터 데이터 자동 업데이트 전략 - 표준 메타데이터 스키마 설계 - 설정 관리 모범 사례

1.2 앞선 글 요약

4번 글 (인터페이스 설계): - BaseAgent 추상 클래스 - Template Method Pattern - AgentRegistry + Orchestrator - Agent 구현 예시

핵심 질문: “Agent들이 사용하는 데이터를 어떻게 통일하고 관리할 것인가?”

2 프롬프트 관리 시스템

2.1 문제: 프롬프트 변경의 어려움

2.1.1 초기 방식 (Phase 1-2)

# ❌ 나쁜 예: 프롬프트가 코드에 하드코딩
class DataAgent:
    def process(self, input):
        prompt = """다음 스키마를 표준화하세요:
{schema}

규칙:
- 컬럼명: snake_case
- 날짜: ISO 8601"""
        
        result = llm.generate(prompt.format(schema=input['data']))
        return result

문제점: 1. 변경 추적 불가: 누가, 언제, 왜 수정했는지 모름 2. A/B 테스트 불가: 새 프롬프트 성능 비교 어려움 3. 롤백 불가: 문제 발생 시 이전 버전으로 돌아갈 수 없음 4. 재사용 불가: 같은 프롬프트를 여러 Agent가 복사-붙여넣기

2.1.2 개선 방식: PromptRegistry

# ✅ 좋은 예: 프롬프트를 외부에서 관리
class DataAgent:
    def __init__(self):
        self.prompt_registry = PromptRegistry()
    
    def process(self, input):
        # 버전 지정 가능
        prompt_template = self.prompt_registry.get(
            name="data_standardization",
            version="1.2.0"
        )
        
        prompt = prompt_template.format(schema=input['data'])
        result = llm.generate(prompt)
        return result

2.2 PromptRegistry 설계

2.2.1 YAML 기반 프롬프트 저장

# shared/prompts/data_standardization.yaml
name: data_standardization
domain: data
description: "데이터 스키마 표준화 프롬프트"
tags: ["schema", "normalization"]

# 변수 정의
variables:
  - name: schema
    type: dict
    required: true
    description: "입력 스키마"
  
  - name: target_format
    type: string
    required: false
    default: "snake_case"
    description: "컬럼명 포맷"

# 버전 이력
versions:
  - version: "1.0.0"
    created_at: "2026-01-28"
    author: "Platform Team"
    performance:
      accuracy: 0.85
      avg_time: 2.3
    content: |
      다음 데이터 스키마를 표준화하세요:
      
      입력 스키마:
      {schema}
      
      표준 형식:
      - 컬럼명: {target_format}
      - 날짜: ISO 8601
      - 문자열: UTF-8
      
      출력 형식: JSON
  
  - version: "1.1.0"
    created_at: "2026-02-01"
    author: "Platform Team"
    changes: "타입 검증 규칙 추가"
    performance:
      accuracy: 0.89
      avg_time: 2.5
    content: |
      다음 데이터 스키마를 표준화하세요:
      
      입력 스키마:
      {schema}
      
      표준 형식:
      - 컬럼명: {target_format} (영문 소문자만)
      - 날짜: ISO 8601 (YYYY-MM-DD)
      - 문자열: UTF-8 (특수문자 제거)
      - 타입: string, int, float, date만 허용
      
      출력 형식: JSON (필드명: name, type, description)
  
  - version: "1.2.0"
    created_at: "2026-02-02"
    author: "Data Team"
    changes: "예시 추가로 성능 개선"
    performance:
      accuracy: 0.93
      avg_time: 2.4
    content: |
      다음 데이터 스키마를 표준화하세요:
      
      입력 스키마:
      {schema}
      
      표준 형식:
      - 컬럼명: {target_format} (예: '환자번호' → 'patient_id')
      - 날짜: ISO 8601 (예: '2026-02-02')
      - 문자열: UTF-8
      - 타입: string, int, float, date만 허용
      
      예시:
      입력: {{"환자번호": "12345", "진료일": "2026/02/02"}}
      출력: {{"patient_id": {{"type": "string"}}, "visit_date": {{"type": "date"}}}}
      
      출력 형식: JSON

2.2.2 PromptRegistry 구현

# shared/prompt_management/registry.py
from typing import Dict, Optional, List
from pathlib import Path
import yaml
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

@dataclass
class PromptVersion:
    """프롬프트 버전 정보"""
    version: str
    content: str
    created_at: str
    author: str
    changes: Optional[str] = None
    performance: Optional[Dict] = None

@dataclass
class PromptMetadata:
    """프롬프트 메타데이터"""
    name: str
    domain: str
    description: str
    tags: List[str]
    variables: List[Dict]
    versions: List[PromptVersion]

class PromptRegistry:
    """프롬프트 저장소
    
    기능:
    1. YAML에서 프롬프트 로드
    2. 버전별 조회
    3. 변경 이력 추적
    4. 성능 메트릭 기록
    """
    
    def __init__(self, prompts_dir: str = "shared/prompts"):
        self.prompts_dir = Path(prompts_dir)
        self._prompts: Dict[str, PromptMetadata] = {}
        self._load_prompts()
    
    def _load_prompts(self):
        """YAML 파일에서 모든 프롬프트 로드"""
        if not self.prompts_dir.exists():
            logger.warning(f"Prompts directory not found: {self.prompts_dir}")
            return
        
        for yaml_file in self.prompts_dir.glob("*.yaml"):
            try:
                with open(yaml_file) as f:
                    data = yaml.safe_load(f)
                
                # PromptVersion 객체 생성
                versions = [
                    PromptVersion(**v) for v in data['versions']
                ]
                
                # PromptMetadata 생성
                metadata = PromptMetadata(
                    name=data['name'],
                    domain=data['domain'],
                    description=data['description'],
                    tags=data.get('tags', []),
                    variables=data.get('variables', []),
                    versions=versions
                )
                
                self._prompts[data['name']] = metadata
                logger.info(f"Loaded prompt: {data['name']} ({len(versions)} versions)")
                
            except Exception as e:
                logger.error(f"Failed to load {yaml_file}: {e}")
    
    def get(
        self,
        name: str,
        version: str = "latest"
    ) -> str:
        """프롬프트 조회
        
        Args:
            name: 프롬프트 이름
            version: 버전 (latest, 1.0.0, 1.1.0 등)
        
        Returns:
            프롬프트 템플릿 문자열
        """
        if name not in self._prompts:
            available = ', '.join(self._prompts.keys())
            raise ValueError(f"Prompt {name} not found. Available: {available}")
        
        prompt_metadata = self._prompts[name]
        
        if version == "latest":
            return prompt_metadata.versions[-1].content
        
        for v in prompt_metadata.versions:
            if v.version == version:
                return v.content
        
        raise ValueError(f"Prompt {name} version {version} not found")
    
    def get_metadata(self, name: str) -> PromptMetadata:
        """프롬프트 메타데이터 조회"""
        if name not in self._prompts:
            raise ValueError(f"Prompt {name} not found")
        return self._prompts[name]
    
    def list_versions(self, name: str) -> List[PromptVersion]:
        """프롬프트의 모든 버전 조회"""
        metadata = self.get_metadata(name)
        return metadata.versions
    
    def get_performance(self, name: str, version: str) -> Optional[Dict]:
        """특정 버전의 성능 메트릭"""
        versions = self.list_versions(name)
        
        for v in versions:
            if v.version == version:
                return v.performance
        
        return None
    
    def list_all(self) -> List[str]:
        """모든 프롬프트 이름 목록"""
        return list(self._prompts.keys())
    
    def search_by_tag(self, tag: str) -> List[str]:
        """태그로 프롬프트 검색"""
        return [
            name
            for name, metadata in self._prompts.items()
            if tag in metadata.tags
        ]

2.3 프롬프트 A/B 테스트

# shared/prompt_management/ab_testing.py
from typing import Dict, Any
import random
from .registry import PromptRegistry

class PromptABTester:
    """프롬프트 A/B 테스트
    
    기능:
    1. 두 버전 랜덤 배정
    2. 성능 메트릭 수집
    3. 통계적 유의성 검증
    """
    
    def __init__(self, registry: PromptRegistry):
        self.registry = registry
        self.results: Dict[str, List[float]] = {}
    
    def get_test_prompt(
        self,
        name: str,
        version_a: str,
        version_b: str,
        traffic_split: float = 0.5
    ) -> tuple[str, str]:
        """A/B 테스트용 프롬프트 반환
        
        Args:
            name: 프롬프트 이름
            version_a: 버전 A
            version_b: 버전 B
            traffic_split: A에 할당할 트래픽 비율 (0-1)
        
        Returns:
            (선택된 프롬프트, 선택된 버전)
        """
        # 랜덤 배정
        if random.random() < traffic_split:
            selected_version = version_a
        else:
            selected_version = version_b
        
        prompt = self.registry.get(name, selected_version)
        return prompt, selected_version
    
    def record_result(
        self,
        name: str,
        version: str,
        score: float
    ):
        """테스트 결과 기록"""
        key = f"{name}:{version}"
        
        if key not in self.results:
            self.results[key] = []
        
        self.results[key].append(score)
    
    def compare(
        self,
        name: str,
        version_a: str,
        version_b: str
    ) -> Dict[str, Any]:
        """두 버전 비교
        
        Returns:
            {
                'version_a': {'mean': 0.85, 'std': 0.05, 'n': 100},
                'version_b': {'mean': 0.89, 'std': 0.04, 'n': 100},
                'winner': 'version_b',
                'p_value': 0.001
            }
        """
        import numpy as np
        from scipy import stats
        
        key_a = f"{name}:{version_a}"
        key_b = f"{name}:{version_b}"
        
        results_a = self.results.get(key_a, [])
        results_b = self.results.get(key_b, [])
        
        if len(results_a) < 30 or len(results_b) < 30:
            return {
                'error': 'Not enough samples (minimum 30 per version)',
                'samples_a': len(results_a),
                'samples_b': len(results_b)
            }
        
        # t-test
        t_stat, p_value = stats.ttest_ind(results_a, results_b)
        
        mean_a = np.mean(results_a)
        mean_b = np.mean(results_b)
        
        winner = version_a if mean_a > mean_b else version_b
        
        return {
            'version_a': {
                'mean': mean_a,
                'std': np.std(results_a),
                'n': len(results_a)
            },
            'version_b': {
                'mean': mean_b,
                'std': np.std(results_b),
                'n': len(results_b)
            },
            'winner': winner if p_value < 0.05 else 'no significant difference',
            'p_value': p_value,
            'significant': p_value < 0.05
        }

2.4 실전 예시: Agent에서 프롬프트 사용

# agents/data_standardization/agent.py
from core.base_agent import BaseAgent, AgentMetadata
from shared.llm.client import LLMClient
from shared.prompt_management.registry import PromptRegistry
from typing import Dict, Any

class DataStandardizationAgent(BaseAgent):
    """프롬프트 버전 관리를 사용하는 Agent"""
    
    def __init__(self, metadata: AgentMetadata):
        super().__init__(metadata)
        
        self.llm = LLMClient()
        self.prompt_registry = PromptRegistry()
        
        # 사용할 프롬프트 버전 지정
        self.prompt_name = "data_standardization"
        self.prompt_version = "1.2.0"  # 또는 "latest"
    
    def process(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """표준화 로직"""
        schema = input['data']
        target_format = input.get('config', {}).get('target_format', 'snake_case')
        
        # 1. 프롬프트 템플릿 가져오기
        prompt_template = self.prompt_registry.get(
            name=self.prompt_name,
            version=self.prompt_version
        )
        
        # 2. 변수 치환
        prompt = prompt_template.format(
            schema=schema,
            target_format=target_format
        )
        
        # 3. LLM 호출
        result = self.llm.generate(prompt, temperature=0.0)
        
        # 4. 결과 파싱
        import json
        standardized = json.loads(result)
        
        return {
            'result': standardized,
            'confidence': self._calculate_confidence(standardized),
            'prompt_version': self.prompt_version  # 사용한 프롬프트 버전 기록
        }
    
    def _calculate_confidence(self, result: Dict) -> float:
        """신뢰도 계산"""
        if 'fields' not in result:
            return 0.0
        
        fields = result['fields']
        complete_fields = sum(
            1 for f in fields
            if 'type' in f and 'description' in f
        )
        
        return complete_fields / len(fields) if fields else 0.0
    
    def evaluate(self, ground_truth: Dict, prediction: Dict) -> float:
        """평가"""
        truth_fields = {f['name'] for f in ground_truth['fields']}
        pred_fields = {f['name'] for f in prediction['fields']}
        
        intersection = len(truth_fields & pred_fields)
        union = len(truth_fields | pred_fields)
        
        return intersection / union if union > 0 else 0.0

사용 예시:

# 프롬프트 A/B 테스트 실행
from shared.prompt_management.ab_testing import PromptABTester

# Tester 생성
tester = PromptABTester(PromptRegistry())

# 100번 테스트
for i in range(100):
    # A/B 버전 랜덤 선택
    prompt, version = tester.get_test_prompt(
        name="data_standardization",
        version_a="1.1.0",
        version_b="1.2.0",
        traffic_split=0.5
    )
    
    # Agent 실행
    result = agent.process(test_data)
    score = agent.evaluate(ground_truth, result['result'])
    
    # 결과 기록
    tester.record_result("data_standardization", version, score)

# 결과 비교
comparison = tester.compare("data_standardization", "1.1.0", "1.2.0")
print(comparison)
# {
#     'version_a': {'mean': 0.89, 'std': 0.04, 'n': 50},
#     'version_b': {'mean': 0.93, 'std': 0.03, 'n': 50},
#     'winner': '1.2.0',
#     'p_value': 0.001,
#     'significant': True
# }

3 벡터 데이터 관리

3.1 문제: RAG 데이터 업데이트

3.1.1 초기 방식 (Phase 1-2)

# ❌ 나쁜 예: 벡터 데이터가 Agent 코드에 포함
class KnowledgeAgent:
    def __init__(self):
        # Agent 초기화 시 벡터 DB 생성
        self.vector_db = ChromaDB()
        self.vector_db.add_documents([
            "문서 1 내용...",
            "문서 2 내용...",
        ])
    
    def process(self, input):
        # RAG 검색
        docs = self.vector_db.search(input['query'])
        return generate_answer(docs)

문제점: 1. 업데이트 어려움: 문서 추가 시 Agent 재배포 필요 2. 중복: 여러 Agent가 같은 문서 저장 3. 동기화 불가: Agent A와 Agent B의 벡터 DB 버전 다름

3.1.2 개선 방식: VectorStoreManager

# ✅ 좋은 예: 중앙집중식 벡터 저장소
class KnowledgeAgent:
    def __init__(self):
        # 공통 벡터 저장소 사용
        self.vector_manager = VectorStoreManager(
            collection_name="knowledge_base"
        )
    
    def process(self, input):
        # 검색만 수행 (업데이트는 별도 프로세스)
        docs = self.vector_manager.search(input['query'])
        return generate_answer(docs)

# 별도 업데이트 스크립트
def update_vector_store():
    manager = VectorStoreManager("knowledge_base")
    
    # 새 문서 추가 (Agent 재배포 불필요)
    manager.add_documents([
        {"id": "doc_123", "text": "새 문서...", "metadata": {...}}
    ])

3.2 VectorStoreManager 구현

# shared/vector_store/manager.py
from typing import List, Dict, Any, Optional
from chromadb import Client, Collection
import logging

logger = logging.getLogger(__name__)

class VectorStoreManager:
    """벡터 저장소 관리자
    
    기능:
    1. 문서 추가/업데이트/삭제
    2. 검색 (유사도 기반)
    3. 메타데이터 필터링
    4. 버전 관리
    """
    
    def __init__(
        self,
        collection_name: str,
        embedding_model: str = "text-embedding-ada-002"
    ):
        self.collection_name = collection_name
        self.embedding_model = embedding_model
        
        # ChromaDB 클라이언트
        self.client = Client()
        self.collection: Collection = self.client.get_or_create_collection(
            name=collection_name
        )
        
        logger.info(f"Initialized VectorStore: {collection_name}")
    
    def add_documents(
        self,
        documents: List[Dict[str, Any]],
        batch_size: int = 100
    ):
        """문서 추가 (배치 처리)
        
        Args:
            documents: 문서 목록
                [
                    {
                        'id': 'doc_1',
                        'text': '문서 내용',
                        'metadata': {'source': 'wiki', 'date': '2026-02-02'}
                    }
                ]
            batch_size: 배치 크기
        """
        total = len(documents)
        logger.info(f"Adding {total} documents to {self.collection_name}")
        
        for i in range(0, total, batch_size):
            batch = documents[i:i+batch_size]
            
            ids = [doc['id'] for doc in batch]
            texts = [doc['text'] for doc in batch]
            metadatas = [doc.get('metadata', {}) for doc in batch]
            
            self.collection.add(
                ids=ids,
                documents=texts,
                metadatas=metadatas
            )
            
            logger.info(f"Added batch {i//batch_size + 1}/{(total + batch_size - 1)//batch_size}")
        
        logger.info(f"Successfully added {total} documents")
    
    def update_document(
        self,
        doc_id: str,
        new_text: Optional[str] = None,
        new_metadata: Optional[Dict] = None
    ):
        """문서 업데이트
        
        Args:
            doc_id: 문서 ID
            new_text: 새 텍스트 (선택)
            new_metadata: 새 메타데이터 (선택)
        """
        update_kwargs = {'ids': [doc_id]}
        
        if new_text:
            update_kwargs['documents'] = [new_text]
        
        if new_metadata:
            update_kwargs['metadatas'] = [new_metadata]
        
        self.collection.update(**update_kwargs)
        logger.info(f"Updated document: {doc_id}")
    
    def delete_documents(self, doc_ids: List[str]):
        """문서 삭제"""
        self.collection.delete(ids=doc_ids)
        logger.info(f"Deleted {len(doc_ids)} documents")
    
    def search(
        self,
        query: str,
        n_results: int = 5,
        metadata_filter: Optional[Dict] = None
    ) -> List[Dict[str, Any]]:
        """유사도 검색
        
        Args:
            query: 검색 쿼리
            n_results: 반환할 문서 수
            metadata_filter: 메타데이터 필터
                예: {'source': 'wiki', 'date': {'$gte': '2026-01-01'}}
        
        Returns:
            검색 결과 목록
                [
                    {
                        'id': 'doc_1',
                        'text': '문서 내용',
                        'metadata': {...},
                        'distance': 0.15
                    }
                ]
        """
        query_kwargs = {
            'query_texts': [query],
            'n_results': n_results
        }
        
        if metadata_filter:
            query_kwargs['where'] = metadata_filter
        
        results = self.collection.query(**query_kwargs)
        
        # 결과 포맷팅
        formatted_results = []
        for i in range(len(results['ids'][0])):
            formatted_results.append({
                'id': results['ids'][0][i],
                'text': results['documents'][0][i],
                'metadata': results['metadatas'][0][i],
                'distance': results['distances'][0][i]
            })
        
        return formatted_results
    
    def get_stats(self) -> Dict[str, Any]:
        """저장소 통계"""
        count = self.collection.count()
        
        return {
            'collection_name': self.collection_name,
            'document_count': count,
            'embedding_model': self.embedding_model
        }
    
    def export_metadata(self) -> List[Dict]:
        """모든 문서의 메타데이터 추출"""
        results = self.collection.get()
        
        metadata_list = []
        for i in range(len(results['ids'])):
            metadata_list.append({
                'id': results['ids'][i],
                'metadata': results['metadatas'][i]
            })
        
        return metadata_list

3.3 자동 업데이트 스크립트

# shared/vector_store/updater.py
from pathlib import Path
from typing import List, Dict, Any
import hashlib
import json
import logging

logger = logging.getLogger(__name__)

class VectorStoreUpdater:
    """벡터 저장소 자동 업데이트
    
    기능:
    1. 파일 변경 감지 (해시 기반)
    2. 증분 업데이트
    3. 변경 이력 추적
    """
    
    def __init__(
        self,
        manager: VectorStoreManager,
        data_dir: str,
        cache_file: str = ".vector_cache.json"
    ):
        self.manager = manager
        self.data_dir = Path(data_dir)
        self.cache_file = cache_file
        
        # 이전 해시 로드
        self.previous_hashes = self._load_cache()
    
    def _load_cache(self) -> Dict[str, str]:
        """캐시 파일 로드 (파일 → 해시)"""
        if not Path(self.cache_file).exists():
            return {}
        
        with open(self.cache_file) as f:
            return json.load(f)
    
    def _save_cache(self):
        """캐시 파일 저장"""
        with open(self.cache_file, 'w') as f:
            json.dump(self.previous_hashes, f, indent=2)
    
    def _calculate_hash(self, file_path: Path) -> str:
        """파일 해시 계산"""
        with open(file_path, 'rb') as f:
            return hashlib.md5(f.read()).hexdigest()
    
    def detect_changes(self) -> tuple[List[Path], List[Path], List[str]]:
        """변경 감지
        
        Returns:
            (추가된 파일, 수정된 파일, 삭제된 파일 ID)
        """
        added = []
        modified = []
        
        current_files = {}
        
        # 현재 파일 스캔
        for file_path in self.data_dir.rglob("*.txt"):
            file_hash = self._calculate_hash(file_path)
            current_files[str(file_path)] = file_hash
            
            if str(file_path) not in self.previous_hashes:
                # 신규 파일
                added.append(file_path)
            elif self.previous_hashes[str(file_path)] != file_hash:
                # 수정된 파일
                modified.append(file_path)
        
        # 삭제된 파일 찾기
        deleted_files = set(self.previous_hashes.keys()) - set(current_files.keys())
        deleted_ids = [self._file_to_id(Path(f)) for f in deleted_files]
        
        return added, modified, deleted_ids
    
    def _file_to_id(self, file_path: Path) -> str:
        """파일 경로 → 문서 ID"""
        return f"doc_{file_path.stem}"
    
    def _file_to_document(self, file_path: Path) -> Dict[str, Any]:
        """파일 → 문서 딕셔너리"""
        with open(file_path) as f:
            text = f.read()
        
        return {
            'id': self._file_to_id(file_path),
            'text': text,
            'metadata': {
                'source': str(file_path),
                'updated_at': file_path.stat().st_mtime
            }
        }
    
    def update(self) -> Dict[str, int]:
        """증분 업데이트 실행
        
        Returns:
            {'added': 5, 'modified': 2, 'deleted': 1}
        """
        logger.info("Detecting changes...")
        added_files, modified_files, deleted_ids = self.detect_changes()
        
        # 추가
        if added_files:
            docs = [self._file_to_document(f) for f in added_files]
            self.manager.add_documents(docs)
            logger.info(f"Added {len(added_files)} documents")
        
        # 수정
        if modified_files:
            for file_path in modified_files:
                doc = self._file_to_document(file_path)
                self.manager.update_document(
                    doc_id=doc['id'],
                    new_text=doc['text'],
                    new_metadata=doc['metadata']
                )
            logger.info(f"Modified {len(modified_files)} documents")
        
        # 삭제
        if deleted_ids:
            self.manager.delete_documents(deleted_ids)
            logger.info(f"Deleted {len(deleted_ids)} documents")
        
        # 캐시 업데이트
        for file_path in added_files + modified_files:
            file_hash = self._calculate_hash(file_path)
            self.previous_hashes[str(file_path)] = file_hash
        
        for deleted_file in deleted_ids:
            if deleted_file in self.previous_hashes:
                del self.previous_hashes[deleted_file]
        
        self._save_cache()
        
        return {
            'added': len(added_files),
            'modified': len(modified_files),
            'deleted': len(deleted_ids)
        }

3.4 사용 예시

# 1. VectorStore 초기화
manager = VectorStoreManager(collection_name="knowledge_base")

# 2. Updater 생성
updater = VectorStoreUpdater(
    manager=manager,
    data_dir="data/documents"
)

# 3. 초기 로드
initial_docs = [
    {"id": "doc_1", "text": "문서 1", "metadata": {"source": "wiki"}},
    {"id": "doc_2", "text": "문서 2", "metadata": {"source": "manual"}}
]
manager.add_documents(initial_docs)

# 4. 자동 업데이트 (크론잡 등으로 정기 실행)
stats = updater.update()
print(f"Updated: {stats}")
# {'added': 3, 'modified': 1, 'deleted': 0}

# 5. Agent에서 검색
results = manager.search(
    query="벡터 데이터 관리 방법",
    n_results=3,
    metadata_filter={'source': 'wiki'}
)

for result in results:
    print(f"[{result['distance']:.2f}] {result['id']}: {result['text'][:50]}...")

4 표준 메타데이터 스키마

4.1 설계 원칙

모든 Agent 실행 결과는 일관된 메타데이터 구조를 가져야 한다:

# shared/metadata/schema.py
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any
from datetime import datetime

@dataclass
class AgentInfo:
    """Agent 정보"""
    name: str
    version: str
    domain: str

@dataclass
class ExecutionInfo:
    """실행 정보"""
    timestamp: str  # ISO 8601
    duration_ms: float
    status: str  # success, failure, timeout
    error_message: Optional[str] = None

@dataclass
class InputInfo:
    """입력 정보"""
    task: str
    source: str  # user_upload, api_call, scheduled, chained
    parent_agent: Optional[str] = None  # 체이닝 시 이전 Agent

@dataclass
class OutputInfo:
    """출력 정보"""
    confidence: float  # 0-1
    warnings: List[str]
    errors: List[str]

@dataclass
class ResourceUsage:
    """리소스 사용량"""
    llm_provider: Optional[str] = None  # openai, anthropic, etc.
    llm_model: Optional[str] = None
    llm_tokens: Optional[int] = None
    llm_cost: Optional[float] = None
    cache_hit: bool = False
    memory_mb: Optional[float] = None

@dataclass
class StandardMetadata:
    """표준 메타데이터 스키마"""
    agent: AgentInfo
    execution: ExecutionInfo
    input: InputInfo
    output: OutputInfo
    resources: ResourceUsage
    custom: Dict[str, Any] = None  # Agent별 커스텀 필드
    
    def to_dict(self) -> Dict[str, Any]:
        """딕셔너리로 변환"""
        return asdict(self)
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'StandardMetadata':
        """딕셔너리에서 생성"""
        return cls(
            agent=AgentInfo(**data['agent']),
            execution=ExecutionInfo(**data['execution']),
            input=InputInfo(**data['input']),
            output=OutputInfo(**data['output']),
            resources=ResourceUsage(**data['resources']),
            custom=data.get('custom')
        )

4.2 BaseAgent에 메타데이터 통합

# core/base_agent.py 수정
import time
from datetime import datetime
from shared.metadata.schema import (
    StandardMetadata, AgentInfo, ExecutionInfo,
    InputInfo, OutputInfo, ResourceUsage
)

class BaseAgent(ABC):
    """메타데이터 자동 생성"""
    
    def execute(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """실행 + 메타데이터 자동 추가"""
        start_time = time.time()
        status = "success"
        error_message = None
        
        try:
            # 검증
            self._validate_input(input)
            
            # 전처리
            processed_input = self._pre_process(input)
            
            # 핵심 로직
            result = self.process(processed_input)
            
            # 후처리
            processed_result = self._post_process(result)
            
        except Exception as e:
            status = "failure"
            error_message = str(e)
            self.metrics.record_error(error_message)
            raise
        
        finally:
            # 메타데이터 생성
            duration_ms = (time.time() - start_time) * 1000
            
            metadata = StandardMetadata(
                agent=AgentInfo(
                    name=self.metadata.name,
                    version=self.metadata.version,
                    domain=self.metadata.domain
                ),
                execution=ExecutionInfo(
                    timestamp=datetime.utcnow().isoformat(),
                    duration_ms=duration_ms,
                    status=status,
                    error_message=error_message
                ),
                input=InputInfo(
                    task=input.get('task', 'unknown'),
                    source=input.get('source', 'unknown'),
                    parent_agent=input.get('metadata', {}).get('agent')
                ),
                output=OutputInfo(
                    confidence=processed_result.get('confidence', 0.0),
                    warnings=processed_result.get('warnings', []),
                    errors=processed_result.get('errors', [])
                ),
                resources=self._get_resource_usage(),
                custom=self._get_custom_metadata()
            )
            
            processed_result['metadata'] = metadata.to_dict()
        
        return processed_result
    
    def _get_resource_usage(self) -> ResourceUsage:
        """리소스 사용량 (하위 클래스에서 override 가능)"""
        return ResourceUsage()
    
    def _get_custom_metadata(self) -> Dict[str, Any]:
        """커스텀 메타데이터 (하위 클래스에서 override 가능)"""
        return {}

4.3 메타데이터 활용 예시

4.3.1 1. 로깅

# shared/logging/structured_logger.py
import logging
import json

class StructuredLogger:
    """구조화된 로깅"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
    
    def log_execution(self, result: Dict[str, Any]):
        """Agent 실행 결과 로깅"""
        metadata = result.get('metadata', {})
        
        log_data = {
            'agent': metadata['agent']['name'],
            'status': metadata['execution']['status'],
            'duration_ms': metadata['execution']['duration_ms'],
            'confidence': metadata['output']['confidence'],
            'llm_cost': metadata['resources'].get('llm_cost', 0)
        }
        
        self.logger.info(json.dumps(log_data))

4.3.2 2. 비용 추적

# shared/monitoring/cost_tracker.py
from typing import List, Dict, Any

class CostTracker:
    """LLM 비용 추적"""
    
    def __init__(self):
        self.costs: List[Dict] = []
    
    def record(self, metadata: Dict[str, Any]):
        """메타데이터에서 비용 추출"""
        resources = metadata.get('resources', {})
        
        if resources.get('llm_cost'):
            self.costs.append({
                'agent': metadata['agent']['name'],
                'timestamp': metadata['execution']['timestamp'],
                'cost': resources['llm_cost'],
                'tokens': resources.get('llm_tokens', 0)
            })
    
    def get_total_cost(self, agent_name: Optional[str] = None) -> float:
        """총 비용 계산"""
        filtered = self.costs
        
        if agent_name:
            filtered = [c for c in self.costs if c['agent'] == agent_name]
        
        return sum(c['cost'] for c in filtered)
    
    def get_summary(self) -> Dict[str, Any]:
        """비용 요약"""
        by_agent = {}
        
        for cost in self.costs:
            agent = cost['agent']
            if agent not in by_agent:
                by_agent[agent] = {'cost': 0, 'tokens': 0, 'calls': 0}
            
            by_agent[agent]['cost'] += cost['cost']
            by_agent[agent]['tokens'] += cost['tokens']
            by_agent[agent]['calls'] += 1
        
        return {
            'total_cost': self.get_total_cost(),
            'by_agent': by_agent
        }

4.3.3 3. 성능 분석

# shared/monitoring/performance_analyzer.py
from typing import List, Dict, Any
import numpy as np

class PerformanceAnalyzer:
    """성능 분석"""
    
    def __init__(self):
        self.executions: List[Dict] = []
    
    def record(self, metadata: Dict[str, Any]):
        """메타데이터 기록"""
        self.executions.append({
            'agent': metadata['agent']['name'],
            'duration_ms': metadata['execution']['duration_ms'],
            'confidence': metadata['output']['confidence'],
            'status': metadata['execution']['status']
        })
    
    def get_agent_stats(self, agent_name: str) -> Dict[str, Any]:
        """Agent별 통계"""
        agent_execs = [
            e for e in self.executions
            if e['agent'] == agent_name
        ]
        
        if not agent_execs:
            return {}
        
        durations = [e['duration_ms'] for e in agent_execs]
        confidences = [e['confidence'] for e in agent_execs]
        successes = sum(1 for e in agent_execs if e['status'] == 'success')
        
        return {
            'total_executions': len(agent_execs),
            'success_rate': successes / len(agent_execs),
            'avg_duration_ms': np.mean(durations),
            'p95_duration_ms': np.percentile(durations, 95),
            'avg_confidence': np.mean(confidences)
        }

5 설정 관리

5.1 환경별 설정 분리

# config/
├── base.yaml              # 공통 설정
├── development.yaml       # 개발 환경
├── staging.yaml           # 스테이징 환경
└── production.yaml        # 프로덕션 환경

5.1.1 base.yaml

# config/base.yaml
platform:
  name: "ai-agent-platform"
  version: "1.0.0"

agents:
  default_timeout: 30
  max_retries: 3

llm:
  default_provider: "openai"
  default_model: "gpt-4"
  temperature: 0.0
  max_tokens: 2000

vector_store:
  default_collection: "knowledge_base"
  embedding_model: "text-embedding-ada-002"

monitoring:
  enabled: true
  log_level: "INFO"
  metrics_port: 9090

prompts:
  directory: "shared/prompts"
  default_version: "latest"

5.1.2 production.yaml

# config/production.yaml
# base.yaml 상속 + 오버라이드

llm:
  cache_enabled: true
  retry_backoff: 2.0

monitoring:
  log_level: "WARNING"
  log_format: "json"
  
security:
  api_key_required: true
  rate_limit: 1000  # per hour

5.2 Config 로더

# shared/config/loader.py
from typing import Dict, Any
from pathlib import Path
import yaml
import os

class ConfigLoader:
    """설정 로더
    
    환경 변수 ENV에 따라 설정 로드:
    - development: base + development
    - production: base + production
    """
    
    def __init__(self, config_dir: str = "config"):
        self.config_dir = Path(config_dir)
        self.env = os.getenv("ENV", "development")
        self.config = self._load_config()
    
    def _load_yaml(self, file_name: str) -> Dict[str, Any]:
        """YAML 파일 로드"""
        file_path = self.config_dir / file_name
        
        if not file_path.exists():
            return {}
        
        with open(file_path) as f:
            return yaml.safe_load(f) or {}
    
    def _merge_configs(
        self,
        base: Dict[str, Any],
        override: Dict[str, Any]
    ) -> Dict[str, Any]:
        """설정 병합 (재귀적)"""
        result = base.copy()
        
        for key, value in override.items():
            if key in result and isinstance(result[key], dict) and isinstance(value, dict):
                result[key] = self._merge_configs(result[key], value)
            else:
                result[key] = value
        
        return result
    
    def _load_config(self) -> Dict[str, Any]:
        """설정 로드 (base + env)"""
        base = self._load_yaml("base.yaml")
        env_config = self._load_yaml(f"{self.env}.yaml")
        
        return self._merge_configs(base, env_config)
    
    def get(self, key: str, default: Any = None) -> Any:
        """설정 값 조회 (dot notation)
        
        Example:
            config.get("llm.default_model")  # "gpt-4"
        """
        keys = key.split('.')
        value = self.config
        
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        
        return value
    
    def get_all(self) -> Dict[str, Any]:
        """전체 설정 반환"""
        return self.config.copy()

# 전역 설정 인스턴스
_config = None

def get_config() -> ConfigLoader:
    """전역 설정 조회 (Singleton)"""
    global _config
    if _config is None:
        _config = ConfigLoader()
    return _config

5.3 사용 예시

# Agent에서 설정 사용
from shared.config.loader import get_config

class DataAgent(BaseAgent):
    def __init__(self, metadata):
        super().__init__(metadata)
        
        config = get_config()
        
        # LLM 설정
        self.llm = LLMClient(
            provider=config.get("llm.default_provider"),
            model=config.get("llm.default_model"),
            temperature=config.get("llm.temperature"),
            cache_enabled=config.get("llm.cache_enabled", False)
        )
        
        # Prompt 설정
        self.prompt_registry = PromptRegistry(
            prompts_dir=config.get("prompts.directory")
        )
        
        # 타임아웃 설정
        self.timeout = config.get("agents.default_timeout")

6 핵심 설계 결정 요약

6.1 프롬프트 관리

  1. YAML 기반 저장: 버전 이력, 성능 메트릭 포함
  2. PromptRegistry: 중앙집중식 관리
  3. A/B 테스트: 통계적 유의성 검증
  4. Agent 독립성: 프롬프트 변경 시 Agent 재배포 불필요

6.2 벡터 데이터 관리

  1. VectorStoreManager: 공통 벡터 저장소
  2. 증분 업데이트: 변경된 문서만 업데이트
  3. 해시 기반 감지: 파일 변경 자동 감지
  4. 메타데이터 필터링: 효율적인 검색

6.3 메타데이터 표준

  1. StandardMetadata: 모든 Agent 공통 스키마
  2. 자동 생성: BaseAgent가 메타데이터 추가
  3. 구조화 로깅: JSON 형식
  4. 비용/성능 추적: 메타데이터 활용

6.4 설정 관리

  1. 환경별 분리: base + development/production
  2. YAML 형식: 가독성
  3. Dot notation: 계층 구조 접근
  4. Singleton 패턴: 전역 설정

6.5 다음 단계

이 글에서 데이터 표준화 계층을 완성했다. 다음 글에서는:

  • 6번 글: 플랫폼 운영 (CI/CD, 모니터링, 배포 전략, 보안)

6.6 참고문헌

Data Management: - Kleppmann, M. (2017). “Designing Data-Intensive Applications.” O’Reilly. - Reis, J., & Housley, M. (2022). “Fundamentals of Data Engineering.” O’Reilly.

Configuration Management: - Humble, J., & Farley, D. (2010). “Continuous Delivery.” Addison-Wesley. - Newman, S. (2021). “Building Microservices.” O’Reilly.

Vector Databases: - ChromaDB Documentation. https://docs.trychroma.com/ - Pinecone Documentation. https://docs.pinecone.io/

Prompt Engineering: - OpenAI Cookbook. https://cookbook.openai.com/ - Anthropic Claude Documentation. https://docs.anthropic.com/

shared/prompt_management/
├── registry.py                # 프롬프트 저장소
├── version_control.py         # 버전 관리
└── ab_testing.py             # A/B 테스트 도구

6.6.1 프롬프트 레지스트리

# shared/prompt_management/registry.py
from typing import Dict
import yaml

class PromptRegistry:
    """프롬프트 버전 관리"""
    
    def __init__(self, prompts_dir: str):
        self.prompts_dir = prompts_dir
        self._prompts: Dict[str, Dict] = {}
        self._load_prompts()
    
    def _load_prompts(self):
        """YAML에서 프롬프트 로드"""
        for file in Path(self.prompts_dir).glob("*.yaml"):
            with open(file) as f:
                data = yaml.safe_load(f)
                self._prompts[data['name']] = data
    
    def get(self, name: str, version: str = "latest") -> str:
        """프롬프트 조회"""
        prompt_data = self._prompts[name]
        
        if version == "latest":
            return prompt_data['versions'][-1]['content']
        else:
            for v in prompt_data['versions']:
                if v['version'] == version:
                    return v['content']
        
        raise ValueError(f"Prompt {name} version {version} not found")

6.6.2 프롬프트 YAML 형식

# shared/prompt_management/prompts/data_standardization.yaml
name: data_standardization
domain: data
description: "데이터 스키마 표준화 프롬프트"

versions:
  - version: "1.0.0"
    created_at: "2026-01-28"
    author: "Platform Team"
    content: |
      다음 데이터 스키마를 표준화하세요:
      {schema}
      
      표준 형식:
      - 컬럼명: snake_case
      - 날짜: ISO 8601
      - 문자열: UTF-8
  
  - version: "1.1.0"
    created_at: "2026-02-01"
    author: "Platform Team"
    changes: "타입 검증 규칙 추가"
    content: |
      다음 데이터 스키마를 표준화하세요:
      {schema}
      
      표준 형식:
      - 컬럼명: snake_case
      - 날짜: ISO 8601 (YYYY-MM-DD)
      - 문자열: UTF-8
      - 타입: string, int, float, date만 허용

6.7 벡터 데이터 관리

문제: RAG 기반 Agent의 벡터 데이터 업데이트 자동화

shared/vector_store/
├── manager.py                # 벡터 저장소 관리
├── updater.py                # 자동 업데이트
└── index.py                  # 검색 인덱스

6.7.1 벡터 저장소 관리자

# shared/vector_store/manager.py
from typing import List, Dict, Any
from chromadb import Client

class VectorStoreManager:
    """벡터 데이터 관리"""
    
    def __init__(self, collection_name: str):
        self.client = Client()
        self.collection = self.client.get_or_create_collection(collection_name)
    
    def add_documents(self, documents: List[Dict[str, Any]]):
        """문서 추가 (증분 업데이트)"""
        ids = [doc['id'] for doc in documents]
        texts = [doc['text'] for doc in documents]
        metadatas = [doc['metadata'] for doc in documents]
        
        self.collection.add(
            ids=ids,
            documents=texts,
            metadatas=metadatas
        )
    
    def update_document(self, doc_id: str, new_text: str):
        """문서 업데이트"""
        self.collection.update(
            ids=[doc_id],
            documents=[new_text]
        )
    
    def search(self, query: str, n_results: int = 5) -> List[Dict]:
        """검색"""
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )
        return results

6.8 메타데이터 표준

모든 Agent 실행 결과에 포함되는 메타데이터 스키마:

{
    "agent": {
        "name": "data_standardization",
        "version": "1.0.0",
        "domain": "data"
    },
    "execution": {
        "timestamp": "2026-02-02T10:30:00Z",
        "duration_ms": 1250,
        "status": "success"
    },
    "input": {
        "task": "standardize",
        "source": "user_upload"
    },
    "output": {
        "confidence": 0.95,
        "warnings": [],
        "errors": []
    },
    "resources": {
        "llm_tokens": 1500,
        "llm_cost": 0.045,
        "cache_hit": false
    }
}

7 최종 디렉토리 구조

2번 글의 Phase 4 결과를 구체화:

ai-agent-platform/                    # Monorepo 루트
├── pyproject.toml                   # 전체 의존성 관리
├── README.md
│
├── core/                            # 플랫폼 핵심
│   ├── agent/
│   │   ├── base_agent.py           # BaseAgent 추상 클래스
│   │   ├── lifecycle.py
│   │   ├── registry.py             # AgentRegistry (Singleton)
│   │   └── metadata.py
│   ├── orchestrator.py             # AgentOrchestrator
│   ├── monitoring.py
│   └── evaluation.py
│
├── shared/                          # 공통 라이브러리
│   ├── llm/
│   │   ├── client.py               # LLMClient (캐싱, 재시도)
│   │   ├── prompt_engine.py
│   │   └── context_manager.py
│   ├── validation/
│   │   ├── json_parser.py
│   │   └── schema_validator.py
│   ├── logging/
│   │   └── structured_logger.py
│   ├── monitoring/
│   │   ├── metrics.py
│   │   └── dashboard.py
│   ├── document_processing/
│   │   ├── pdf_parser.py
│   │   ├── code_parser.py
│   │   └── text_extractor.py
│   ├── vector_store/
│   │   ├── manager.py              # VectorStoreManager
│   │   ├── updater.py
│   │   └── index.py
│   └── prompt_management/
│       ├── registry.py             # PromptRegistry
│       ├── version_control.py
│       ├── ab_testing.py
│       └── prompts/                # YAML 프롬프트 모음
│           ├── data_standardization.yaml
│           ├── code_analysis.yaml
│           └── knowledge_qna.yaml
│
├── agents/                          # 도메인 Agent들
│   ├── data_standardization/
│   │   ├── __init__.py             # Agent 자동 등록
│   │   ├── agent.py                # DataStandardizationAgent
│   │   ├── manifest.yaml           # Agent 메타데이터
│   │   ├── rules/                  # 도메인 특화 규칙
│   │   │   └── schema_rules.py
│   │   └── tests/
│   │       └── test_agent.py
│   ├── code_analysis/
│   │   ├── __init__.py
│   │   ├── agent.py
│   │   ├── manifest.yaml
│   │   ├── parsers/                # 도메인 특화 파서
│   │   │   ├── python_parser.py
│   │   │   └── java_parser.py
│   │   └── tests/
│   └── knowledge_qna/
│       ├── __init__.py
│       ├── agent.py
│       ├── manifest.yaml
│       ├── rag_pipeline.py         # 도메인 특화 RAG
│       └── tests/
│
├── platform-api/                    # API Gateway
│   ├── main.py                     # FastAPI 앱
│   ├── routes/
│   │   ├── agents.py               # /api/agents/*
│   │   ├── monitoring.py           # /api/metrics/*
│   │   └── management.py           # /api/manage/*
│   └── middleware/
│       ├── auth.py
│       └── logging.py
│
├── infra/                           # 인프라 코드
│   ├── docker/
│   │   ├── Dockerfile.base
│   │   ├── Dockerfile.agent
│   │   └── docker-compose.yml
│   ├── k8s/                        # Kubernetes 매니페스트
│   │   ├── core-deployment.yaml
│   │   ├── agent-deployment.yaml
│   │   └── service.yaml
│   └── ci_cd/
│       └── .github/
│           └── workflows/
│               ├── test.yml        # 변경 감지 + 테스트
│               └── deploy.yml
│
└── tests/                           # 통합 테스트
    ├── integration/
    │   ├── test_agent_chain.py     # Agent 체이닝 테스트
    │   └── test_orchestrator.py
    └── performance/
        └── test_load.py

8 CI/CD 파이프라인

8.1 변경 감지 및 선택적 테스트

# .github/workflows/test.yml
name: Test Affected Components
on: [push, pull_request]

jobs:
  detect-changes:
    runs-on: ubuntu-latest
    outputs:
      affected: ${{ steps.detect.outputs.affected }}
    steps:
      - uses: actions/checkout@v2
      - name: Detect changed modules
        id: detect
        run: |
          CHANGED=$(git diff --name-only HEAD~1)
          
          if echo "$CHANGED" | grep -E '^(core|shared)/'; then
            echo "affected=all" >> $GITHUB_OUTPUT
          else
            AGENTS=$(echo "$CHANGED" | grep 'agents/' | cut -d'/' -f2 | sort -u)
            echo "affected=$AGENTS" >> $GITHUB_OUTPUT
          fi
  
  test:
    needs: detect-changes
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-python@v2
      - name: Install dependencies
        run: |
          pip install poetry
          poetry install
      
      - name: Run tests
        run: |
          if [ "${{ needs.detect-changes.outputs.affected }}" = "all" ]; then
            poetry run pytest
          else
            for agent in ${{ needs.detect-changes.outputs.affected }}; do
              poetry run pytest agents/$agent/tests/
            done
          fi

9 다음 단계

이 글에서 “최종 플랫폼 구조”를 구체화했다면, 다음 질문은:

“실제로 어떻게 운영하고 자동화하는가?”

다음 글 “AI Agent 플랫폼 운영 자동화와 DevOps”에서는: - 배포 전략: Blue-Green, Canary, A/B 테스트 - 모니터링: Agent별 메트릭, 대시보드, 알림 - 로깅: 구조화된 로그, 분석 - 보안: 인증, 권한, API 키 관리

를 다룬다.

Subscribe

Enjoy this blog? Get notified of new posts by email: