1 들어가며
1.1 왜 데이터 표준화 계층이 필요한가?
Phase 4 플랫폼을 운영하다 보면 발생하는 문제들:
질문들: - “프롬프트를 수정했는데, 어떤 Agent가 영향받는지 모르겠어요” - “RAG 데이터를 업데이트하면 Agent를 재배포해야 하나요?” - “Agent 실행 결과를 어떻게 일관되게 기록하나요?” - “프롬프트 A/B 테스트는 어떻게 하나요?”
이 글의 목표: - 프롬프트 버전 관리 시스템 구축 - 벡터 데이터 자동 업데이트 전략 - 표준 메타데이터 스키마 설계 - 설정 관리 모범 사례
1.2 앞선 글 요약
4번 글 (인터페이스 설계): - BaseAgent 추상 클래스 - Template Method Pattern - AgentRegistry + Orchestrator - Agent 구현 예시
핵심 질문: “Agent들이 사용하는 데이터를 어떻게 통일하고 관리할 것인가?”
2 프롬프트 관리 시스템
2.1 문제: 프롬프트 변경의 어려움
2.1.1 초기 방식 (Phase 1-2)
# ❌ 나쁜 예: 프롬프트가 코드에 하드코딩
class DataAgent:
def process(self, input):
prompt = """다음 스키마를 표준화하세요:
{schema}
규칙:
- 컬럼명: snake_case
- 날짜: ISO 8601"""
result = llm.generate(prompt.format(schema=input['data']))
return result문제점: 1. 변경 추적 불가: 누가, 언제, 왜 수정했는지 모름 2. A/B 테스트 불가: 새 프롬프트 성능 비교 어려움 3. 롤백 불가: 문제 발생 시 이전 버전으로 돌아갈 수 없음 4. 재사용 불가: 같은 프롬프트를 여러 Agent가 복사-붙여넣기
2.1.2 개선 방식: PromptRegistry
# ✅ 좋은 예: 프롬프트를 외부에서 관리
class DataAgent:
def __init__(self):
self.prompt_registry = PromptRegistry()
def process(self, input):
# 버전 지정 가능
prompt_template = self.prompt_registry.get(
name="data_standardization",
version="1.2.0"
)
prompt = prompt_template.format(schema=input['data'])
result = llm.generate(prompt)
return result2.2 PromptRegistry 설계
2.2.1 YAML 기반 프롬프트 저장
# shared/prompts/data_standardization.yaml
name: data_standardization
domain: data
description: "데이터 스키마 표준화 프롬프트"
tags: ["schema", "normalization"]
# 변수 정의
variables:
- name: schema
type: dict
required: true
description: "입력 스키마"
- name: target_format
type: string
required: false
default: "snake_case"
description: "컬럼명 포맷"
# 버전 이력
versions:
- version: "1.0.0"
created_at: "2026-01-28"
author: "Platform Team"
performance:
accuracy: 0.85
avg_time: 2.3
content: |
다음 데이터 스키마를 표준화하세요:
입력 스키마:
{schema}
표준 형식:
- 컬럼명: {target_format}
- 날짜: ISO 8601
- 문자열: UTF-8
출력 형식: JSON
- version: "1.1.0"
created_at: "2026-02-01"
author: "Platform Team"
changes: "타입 검증 규칙 추가"
performance:
accuracy: 0.89
avg_time: 2.5
content: |
다음 데이터 스키마를 표준화하세요:
입력 스키마:
{schema}
표준 형식:
- 컬럼명: {target_format} (영문 소문자만)
- 날짜: ISO 8601 (YYYY-MM-DD)
- 문자열: UTF-8 (특수문자 제거)
- 타입: string, int, float, date만 허용
출력 형식: JSON (필드명: name, type, description)
- version: "1.2.0"
created_at: "2026-02-02"
author: "Data Team"
changes: "예시 추가로 성능 개선"
performance:
accuracy: 0.93
avg_time: 2.4
content: |
다음 데이터 스키마를 표준화하세요:
입력 스키마:
{schema}
표준 형식:
- 컬럼명: {target_format} (예: '환자번호' → 'patient_id')
- 날짜: ISO 8601 (예: '2026-02-02')
- 문자열: UTF-8
- 타입: string, int, float, date만 허용
예시:
입력: {{"환자번호": "12345", "진료일": "2026/02/02"}}
출력: {{"patient_id": {{"type": "string"}}, "visit_date": {{"type": "date"}}}}
출력 형식: JSON2.2.2 PromptRegistry 구현
# shared/prompt_management/registry.py
from typing import Dict, Optional, List
from pathlib import Path
import yaml
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class PromptVersion:
"""프롬프트 버전 정보"""
version: str
content: str
created_at: str
author: str
changes: Optional[str] = None
performance: Optional[Dict] = None
@dataclass
class PromptMetadata:
"""프롬프트 메타데이터"""
name: str
domain: str
description: str
tags: List[str]
variables: List[Dict]
versions: List[PromptVersion]
class PromptRegistry:
"""프롬프트 저장소
기능:
1. YAML에서 프롬프트 로드
2. 버전별 조회
3. 변경 이력 추적
4. 성능 메트릭 기록
"""
def __init__(self, prompts_dir: str = "shared/prompts"):
self.prompts_dir = Path(prompts_dir)
self._prompts: Dict[str, PromptMetadata] = {}
self._load_prompts()
def _load_prompts(self):
"""YAML 파일에서 모든 프롬프트 로드"""
if not self.prompts_dir.exists():
logger.warning(f"Prompts directory not found: {self.prompts_dir}")
return
for yaml_file in self.prompts_dir.glob("*.yaml"):
try:
with open(yaml_file) as f:
data = yaml.safe_load(f)
# PromptVersion 객체 생성
versions = [
PromptVersion(**v) for v in data['versions']
]
# PromptMetadata 생성
metadata = PromptMetadata(
name=data['name'],
domain=data['domain'],
description=data['description'],
tags=data.get('tags', []),
variables=data.get('variables', []),
versions=versions
)
self._prompts[data['name']] = metadata
logger.info(f"Loaded prompt: {data['name']} ({len(versions)} versions)")
except Exception as e:
logger.error(f"Failed to load {yaml_file}: {e}")
def get(
self,
name: str,
version: str = "latest"
) -> str:
"""프롬프트 조회
Args:
name: 프롬프트 이름
version: 버전 (latest, 1.0.0, 1.1.0 등)
Returns:
프롬프트 템플릿 문자열
"""
if name not in self._prompts:
available = ', '.join(self._prompts.keys())
raise ValueError(f"Prompt {name} not found. Available: {available}")
prompt_metadata = self._prompts[name]
if version == "latest":
return prompt_metadata.versions[-1].content
for v in prompt_metadata.versions:
if v.version == version:
return v.content
raise ValueError(f"Prompt {name} version {version} not found")
def get_metadata(self, name: str) -> PromptMetadata:
"""프롬프트 메타데이터 조회"""
if name not in self._prompts:
raise ValueError(f"Prompt {name} not found")
return self._prompts[name]
def list_versions(self, name: str) -> List[PromptVersion]:
"""프롬프트의 모든 버전 조회"""
metadata = self.get_metadata(name)
return metadata.versions
def get_performance(self, name: str, version: str) -> Optional[Dict]:
"""특정 버전의 성능 메트릭"""
versions = self.list_versions(name)
for v in versions:
if v.version == version:
return v.performance
return None
def list_all(self) -> List[str]:
"""모든 프롬프트 이름 목록"""
return list(self._prompts.keys())
def search_by_tag(self, tag: str) -> List[str]:
"""태그로 프롬프트 검색"""
return [
name
for name, metadata in self._prompts.items()
if tag in metadata.tags
]2.3 프롬프트 A/B 테스트
# shared/prompt_management/ab_testing.py
from typing import Dict, Any
import random
from .registry import PromptRegistry
class PromptABTester:
"""프롬프트 A/B 테스트
기능:
1. 두 버전 랜덤 배정
2. 성능 메트릭 수집
3. 통계적 유의성 검증
"""
def __init__(self, registry: PromptRegistry):
self.registry = registry
self.results: Dict[str, List[float]] = {}
def get_test_prompt(
self,
name: str,
version_a: str,
version_b: str,
traffic_split: float = 0.5
) -> tuple[str, str]:
"""A/B 테스트용 프롬프트 반환
Args:
name: 프롬프트 이름
version_a: 버전 A
version_b: 버전 B
traffic_split: A에 할당할 트래픽 비율 (0-1)
Returns:
(선택된 프롬프트, 선택된 버전)
"""
# 랜덤 배정
if random.random() < traffic_split:
selected_version = version_a
else:
selected_version = version_b
prompt = self.registry.get(name, selected_version)
return prompt, selected_version
def record_result(
self,
name: str,
version: str,
score: float
):
"""테스트 결과 기록"""
key = f"{name}:{version}"
if key not in self.results:
self.results[key] = []
self.results[key].append(score)
def compare(
self,
name: str,
version_a: str,
version_b: str
) -> Dict[str, Any]:
"""두 버전 비교
Returns:
{
'version_a': {'mean': 0.85, 'std': 0.05, 'n': 100},
'version_b': {'mean': 0.89, 'std': 0.04, 'n': 100},
'winner': 'version_b',
'p_value': 0.001
}
"""
import numpy as np
from scipy import stats
key_a = f"{name}:{version_a}"
key_b = f"{name}:{version_b}"
results_a = self.results.get(key_a, [])
results_b = self.results.get(key_b, [])
if len(results_a) < 30 or len(results_b) < 30:
return {
'error': 'Not enough samples (minimum 30 per version)',
'samples_a': len(results_a),
'samples_b': len(results_b)
}
# t-test
t_stat, p_value = stats.ttest_ind(results_a, results_b)
mean_a = np.mean(results_a)
mean_b = np.mean(results_b)
winner = version_a if mean_a > mean_b else version_b
return {
'version_a': {
'mean': mean_a,
'std': np.std(results_a),
'n': len(results_a)
},
'version_b': {
'mean': mean_b,
'std': np.std(results_b),
'n': len(results_b)
},
'winner': winner if p_value < 0.05 else 'no significant difference',
'p_value': p_value,
'significant': p_value < 0.05
}2.4 실전 예시: Agent에서 프롬프트 사용
# agents/data_standardization/agent.py
from core.base_agent import BaseAgent, AgentMetadata
from shared.llm.client import LLMClient
from shared.prompt_management.registry import PromptRegistry
from typing import Dict, Any
class DataStandardizationAgent(BaseAgent):
"""프롬프트 버전 관리를 사용하는 Agent"""
def __init__(self, metadata: AgentMetadata):
super().__init__(metadata)
self.llm = LLMClient()
self.prompt_registry = PromptRegistry()
# 사용할 프롬프트 버전 지정
self.prompt_name = "data_standardization"
self.prompt_version = "1.2.0" # 또는 "latest"
def process(self, input: Dict[str, Any]) -> Dict[str, Any]:
"""표준화 로직"""
schema = input['data']
target_format = input.get('config', {}).get('target_format', 'snake_case')
# 1. 프롬프트 템플릿 가져오기
prompt_template = self.prompt_registry.get(
name=self.prompt_name,
version=self.prompt_version
)
# 2. 변수 치환
prompt = prompt_template.format(
schema=schema,
target_format=target_format
)
# 3. LLM 호출
result = self.llm.generate(prompt, temperature=0.0)
# 4. 결과 파싱
import json
standardized = json.loads(result)
return {
'result': standardized,
'confidence': self._calculate_confidence(standardized),
'prompt_version': self.prompt_version # 사용한 프롬프트 버전 기록
}
def _calculate_confidence(self, result: Dict) -> float:
"""신뢰도 계산"""
if 'fields' not in result:
return 0.0
fields = result['fields']
complete_fields = sum(
1 for f in fields
if 'type' in f and 'description' in f
)
return complete_fields / len(fields) if fields else 0.0
def evaluate(self, ground_truth: Dict, prediction: Dict) -> float:
"""평가"""
truth_fields = {f['name'] for f in ground_truth['fields']}
pred_fields = {f['name'] for f in prediction['fields']}
intersection = len(truth_fields & pred_fields)
union = len(truth_fields | pred_fields)
return intersection / union if union > 0 else 0.0사용 예시:
# 프롬프트 A/B 테스트 실행
from shared.prompt_management.ab_testing import PromptABTester
# Tester 생성
tester = PromptABTester(PromptRegistry())
# 100번 테스트
for i in range(100):
# A/B 버전 랜덤 선택
prompt, version = tester.get_test_prompt(
name="data_standardization",
version_a="1.1.0",
version_b="1.2.0",
traffic_split=0.5
)
# Agent 실행
result = agent.process(test_data)
score = agent.evaluate(ground_truth, result['result'])
# 결과 기록
tester.record_result("data_standardization", version, score)
# 결과 비교
comparison = tester.compare("data_standardization", "1.1.0", "1.2.0")
print(comparison)
# {
# 'version_a': {'mean': 0.89, 'std': 0.04, 'n': 50},
# 'version_b': {'mean': 0.93, 'std': 0.03, 'n': 50},
# 'winner': '1.2.0',
# 'p_value': 0.001,
# 'significant': True
# }3 벡터 데이터 관리
3.1 문제: RAG 데이터 업데이트
3.1.1 초기 방식 (Phase 1-2)
# ❌ 나쁜 예: 벡터 데이터가 Agent 코드에 포함
class KnowledgeAgent:
def __init__(self):
# Agent 초기화 시 벡터 DB 생성
self.vector_db = ChromaDB()
self.vector_db.add_documents([
"문서 1 내용...",
"문서 2 내용...",
])
def process(self, input):
# RAG 검색
docs = self.vector_db.search(input['query'])
return generate_answer(docs)문제점: 1. 업데이트 어려움: 문서 추가 시 Agent 재배포 필요 2. 중복: 여러 Agent가 같은 문서 저장 3. 동기화 불가: Agent A와 Agent B의 벡터 DB 버전 다름
3.1.2 개선 방식: VectorStoreManager
# ✅ 좋은 예: 중앙집중식 벡터 저장소
class KnowledgeAgent:
def __init__(self):
# 공통 벡터 저장소 사용
self.vector_manager = VectorStoreManager(
collection_name="knowledge_base"
)
def process(self, input):
# 검색만 수행 (업데이트는 별도 프로세스)
docs = self.vector_manager.search(input['query'])
return generate_answer(docs)
# 별도 업데이트 스크립트
def update_vector_store():
manager = VectorStoreManager("knowledge_base")
# 새 문서 추가 (Agent 재배포 불필요)
manager.add_documents([
{"id": "doc_123", "text": "새 문서...", "metadata": {...}}
])3.2 VectorStoreManager 구현
# shared/vector_store/manager.py
from typing import List, Dict, Any, Optional
from chromadb import Client, Collection
import logging
logger = logging.getLogger(__name__)
class VectorStoreManager:
"""벡터 저장소 관리자
기능:
1. 문서 추가/업데이트/삭제
2. 검색 (유사도 기반)
3. 메타데이터 필터링
4. 버전 관리
"""
def __init__(
self,
collection_name: str,
embedding_model: str = "text-embedding-ada-002"
):
self.collection_name = collection_name
self.embedding_model = embedding_model
# ChromaDB 클라이언트
self.client = Client()
self.collection: Collection = self.client.get_or_create_collection(
name=collection_name
)
logger.info(f"Initialized VectorStore: {collection_name}")
def add_documents(
self,
documents: List[Dict[str, Any]],
batch_size: int = 100
):
"""문서 추가 (배치 처리)
Args:
documents: 문서 목록
[
{
'id': 'doc_1',
'text': '문서 내용',
'metadata': {'source': 'wiki', 'date': '2026-02-02'}
}
]
batch_size: 배치 크기
"""
total = len(documents)
logger.info(f"Adding {total} documents to {self.collection_name}")
for i in range(0, total, batch_size):
batch = documents[i:i+batch_size]
ids = [doc['id'] for doc in batch]
texts = [doc['text'] for doc in batch]
metadatas = [doc.get('metadata', {}) for doc in batch]
self.collection.add(
ids=ids,
documents=texts,
metadatas=metadatas
)
logger.info(f"Added batch {i//batch_size + 1}/{(total + batch_size - 1)//batch_size}")
logger.info(f"Successfully added {total} documents")
def update_document(
self,
doc_id: str,
new_text: Optional[str] = None,
new_metadata: Optional[Dict] = None
):
"""문서 업데이트
Args:
doc_id: 문서 ID
new_text: 새 텍스트 (선택)
new_metadata: 새 메타데이터 (선택)
"""
update_kwargs = {'ids': [doc_id]}
if new_text:
update_kwargs['documents'] = [new_text]
if new_metadata:
update_kwargs['metadatas'] = [new_metadata]
self.collection.update(**update_kwargs)
logger.info(f"Updated document: {doc_id}")
def delete_documents(self, doc_ids: List[str]):
"""문서 삭제"""
self.collection.delete(ids=doc_ids)
logger.info(f"Deleted {len(doc_ids)} documents")
def search(
self,
query: str,
n_results: int = 5,
metadata_filter: Optional[Dict] = None
) -> List[Dict[str, Any]]:
"""유사도 검색
Args:
query: 검색 쿼리
n_results: 반환할 문서 수
metadata_filter: 메타데이터 필터
예: {'source': 'wiki', 'date': {'$gte': '2026-01-01'}}
Returns:
검색 결과 목록
[
{
'id': 'doc_1',
'text': '문서 내용',
'metadata': {...},
'distance': 0.15
}
]
"""
query_kwargs = {
'query_texts': [query],
'n_results': n_results
}
if metadata_filter:
query_kwargs['where'] = metadata_filter
results = self.collection.query(**query_kwargs)
# 결과 포맷팅
formatted_results = []
for i in range(len(results['ids'][0])):
formatted_results.append({
'id': results['ids'][0][i],
'text': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i]
})
return formatted_results
def get_stats(self) -> Dict[str, Any]:
"""저장소 통계"""
count = self.collection.count()
return {
'collection_name': self.collection_name,
'document_count': count,
'embedding_model': self.embedding_model
}
def export_metadata(self) -> List[Dict]:
"""모든 문서의 메타데이터 추출"""
results = self.collection.get()
metadata_list = []
for i in range(len(results['ids'])):
metadata_list.append({
'id': results['ids'][i],
'metadata': results['metadatas'][i]
})
return metadata_list3.3 자동 업데이트 스크립트
# shared/vector_store/updater.py
from pathlib import Path
from typing import List, Dict, Any
import hashlib
import json
import logging
logger = logging.getLogger(__name__)
class VectorStoreUpdater:
"""벡터 저장소 자동 업데이트
기능:
1. 파일 변경 감지 (해시 기반)
2. 증분 업데이트
3. 변경 이력 추적
"""
def __init__(
self,
manager: VectorStoreManager,
data_dir: str,
cache_file: str = ".vector_cache.json"
):
self.manager = manager
self.data_dir = Path(data_dir)
self.cache_file = cache_file
# 이전 해시 로드
self.previous_hashes = self._load_cache()
def _load_cache(self) -> Dict[str, str]:
"""캐시 파일 로드 (파일 → 해시)"""
if not Path(self.cache_file).exists():
return {}
with open(self.cache_file) as f:
return json.load(f)
def _save_cache(self):
"""캐시 파일 저장"""
with open(self.cache_file, 'w') as f:
json.dump(self.previous_hashes, f, indent=2)
def _calculate_hash(self, file_path: Path) -> str:
"""파일 해시 계산"""
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
def detect_changes(self) -> tuple[List[Path], List[Path], List[str]]:
"""변경 감지
Returns:
(추가된 파일, 수정된 파일, 삭제된 파일 ID)
"""
added = []
modified = []
current_files = {}
# 현재 파일 스캔
for file_path in self.data_dir.rglob("*.txt"):
file_hash = self._calculate_hash(file_path)
current_files[str(file_path)] = file_hash
if str(file_path) not in self.previous_hashes:
# 신규 파일
added.append(file_path)
elif self.previous_hashes[str(file_path)] != file_hash:
# 수정된 파일
modified.append(file_path)
# 삭제된 파일 찾기
deleted_files = set(self.previous_hashes.keys()) - set(current_files.keys())
deleted_ids = [self._file_to_id(Path(f)) for f in deleted_files]
return added, modified, deleted_ids
def _file_to_id(self, file_path: Path) -> str:
"""파일 경로 → 문서 ID"""
return f"doc_{file_path.stem}"
def _file_to_document(self, file_path: Path) -> Dict[str, Any]:
"""파일 → 문서 딕셔너리"""
with open(file_path) as f:
text = f.read()
return {
'id': self._file_to_id(file_path),
'text': text,
'metadata': {
'source': str(file_path),
'updated_at': file_path.stat().st_mtime
}
}
def update(self) -> Dict[str, int]:
"""증분 업데이트 실행
Returns:
{'added': 5, 'modified': 2, 'deleted': 1}
"""
logger.info("Detecting changes...")
added_files, modified_files, deleted_ids = self.detect_changes()
# 추가
if added_files:
docs = [self._file_to_document(f) for f in added_files]
self.manager.add_documents(docs)
logger.info(f"Added {len(added_files)} documents")
# 수정
if modified_files:
for file_path in modified_files:
doc = self._file_to_document(file_path)
self.manager.update_document(
doc_id=doc['id'],
new_text=doc['text'],
new_metadata=doc['metadata']
)
logger.info(f"Modified {len(modified_files)} documents")
# 삭제
if deleted_ids:
self.manager.delete_documents(deleted_ids)
logger.info(f"Deleted {len(deleted_ids)} documents")
# 캐시 업데이트
for file_path in added_files + modified_files:
file_hash = self._calculate_hash(file_path)
self.previous_hashes[str(file_path)] = file_hash
for deleted_file in deleted_ids:
if deleted_file in self.previous_hashes:
del self.previous_hashes[deleted_file]
self._save_cache()
return {
'added': len(added_files),
'modified': len(modified_files),
'deleted': len(deleted_ids)
}3.4 사용 예시
# 1. VectorStore 초기화
manager = VectorStoreManager(collection_name="knowledge_base")
# 2. Updater 생성
updater = VectorStoreUpdater(
manager=manager,
data_dir="data/documents"
)
# 3. 초기 로드
initial_docs = [
{"id": "doc_1", "text": "문서 1", "metadata": {"source": "wiki"}},
{"id": "doc_2", "text": "문서 2", "metadata": {"source": "manual"}}
]
manager.add_documents(initial_docs)
# 4. 자동 업데이트 (크론잡 등으로 정기 실행)
stats = updater.update()
print(f"Updated: {stats}")
# {'added': 3, 'modified': 1, 'deleted': 0}
# 5. Agent에서 검색
results = manager.search(
query="벡터 데이터 관리 방법",
n_results=3,
metadata_filter={'source': 'wiki'}
)
for result in results:
print(f"[{result['distance']:.2f}] {result['id']}: {result['text'][:50]}...")4 표준 메타데이터 스키마
4.1 설계 원칙
모든 Agent 실행 결과는 일관된 메타데이터 구조를 가져야 한다:
# shared/metadata/schema.py
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any
from datetime import datetime
@dataclass
class AgentInfo:
"""Agent 정보"""
name: str
version: str
domain: str
@dataclass
class ExecutionInfo:
"""실행 정보"""
timestamp: str # ISO 8601
duration_ms: float
status: str # success, failure, timeout
error_message: Optional[str] = None
@dataclass
class InputInfo:
"""입력 정보"""
task: str
source: str # user_upload, api_call, scheduled, chained
parent_agent: Optional[str] = None # 체이닝 시 이전 Agent
@dataclass
class OutputInfo:
"""출력 정보"""
confidence: float # 0-1
warnings: List[str]
errors: List[str]
@dataclass
class ResourceUsage:
"""리소스 사용량"""
llm_provider: Optional[str] = None # openai, anthropic, etc.
llm_model: Optional[str] = None
llm_tokens: Optional[int] = None
llm_cost: Optional[float] = None
cache_hit: bool = False
memory_mb: Optional[float] = None
@dataclass
class StandardMetadata:
"""표준 메타데이터 스키마"""
agent: AgentInfo
execution: ExecutionInfo
input: InputInfo
output: OutputInfo
resources: ResourceUsage
custom: Dict[str, Any] = None # Agent별 커스텀 필드
def to_dict(self) -> Dict[str, Any]:
"""딕셔너리로 변환"""
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'StandardMetadata':
"""딕셔너리에서 생성"""
return cls(
agent=AgentInfo(**data['agent']),
execution=ExecutionInfo(**data['execution']),
input=InputInfo(**data['input']),
output=OutputInfo(**data['output']),
resources=ResourceUsage(**data['resources']),
custom=data.get('custom')
)4.2 BaseAgent에 메타데이터 통합
# core/base_agent.py 수정
import time
from datetime import datetime
from shared.metadata.schema import (
StandardMetadata, AgentInfo, ExecutionInfo,
InputInfo, OutputInfo, ResourceUsage
)
class BaseAgent(ABC):
"""메타데이터 자동 생성"""
def execute(self, input: Dict[str, Any]) -> Dict[str, Any]:
"""실행 + 메타데이터 자동 추가"""
start_time = time.time()
status = "success"
error_message = None
try:
# 검증
self._validate_input(input)
# 전처리
processed_input = self._pre_process(input)
# 핵심 로직
result = self.process(processed_input)
# 후처리
processed_result = self._post_process(result)
except Exception as e:
status = "failure"
error_message = str(e)
self.metrics.record_error(error_message)
raise
finally:
# 메타데이터 생성
duration_ms = (time.time() - start_time) * 1000
metadata = StandardMetadata(
agent=AgentInfo(
name=self.metadata.name,
version=self.metadata.version,
domain=self.metadata.domain
),
execution=ExecutionInfo(
timestamp=datetime.utcnow().isoformat(),
duration_ms=duration_ms,
status=status,
error_message=error_message
),
input=InputInfo(
task=input.get('task', 'unknown'),
source=input.get('source', 'unknown'),
parent_agent=input.get('metadata', {}).get('agent')
),
output=OutputInfo(
confidence=processed_result.get('confidence', 0.0),
warnings=processed_result.get('warnings', []),
errors=processed_result.get('errors', [])
),
resources=self._get_resource_usage(),
custom=self._get_custom_metadata()
)
processed_result['metadata'] = metadata.to_dict()
return processed_result
def _get_resource_usage(self) -> ResourceUsage:
"""리소스 사용량 (하위 클래스에서 override 가능)"""
return ResourceUsage()
def _get_custom_metadata(self) -> Dict[str, Any]:
"""커스텀 메타데이터 (하위 클래스에서 override 가능)"""
return {}4.3 메타데이터 활용 예시
4.3.1 1. 로깅
# shared/logging/structured_logger.py
import logging
import json
class StructuredLogger:
"""구조화된 로깅"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def log_execution(self, result: Dict[str, Any]):
"""Agent 실행 결과 로깅"""
metadata = result.get('metadata', {})
log_data = {
'agent': metadata['agent']['name'],
'status': metadata['execution']['status'],
'duration_ms': metadata['execution']['duration_ms'],
'confidence': metadata['output']['confidence'],
'llm_cost': metadata['resources'].get('llm_cost', 0)
}
self.logger.info(json.dumps(log_data))4.3.2 2. 비용 추적
# shared/monitoring/cost_tracker.py
from typing import List, Dict, Any
class CostTracker:
"""LLM 비용 추적"""
def __init__(self):
self.costs: List[Dict] = []
def record(self, metadata: Dict[str, Any]):
"""메타데이터에서 비용 추출"""
resources = metadata.get('resources', {})
if resources.get('llm_cost'):
self.costs.append({
'agent': metadata['agent']['name'],
'timestamp': metadata['execution']['timestamp'],
'cost': resources['llm_cost'],
'tokens': resources.get('llm_tokens', 0)
})
def get_total_cost(self, agent_name: Optional[str] = None) -> float:
"""총 비용 계산"""
filtered = self.costs
if agent_name:
filtered = [c for c in self.costs if c['agent'] == agent_name]
return sum(c['cost'] for c in filtered)
def get_summary(self) -> Dict[str, Any]:
"""비용 요약"""
by_agent = {}
for cost in self.costs:
agent = cost['agent']
if agent not in by_agent:
by_agent[agent] = {'cost': 0, 'tokens': 0, 'calls': 0}
by_agent[agent]['cost'] += cost['cost']
by_agent[agent]['tokens'] += cost['tokens']
by_agent[agent]['calls'] += 1
return {
'total_cost': self.get_total_cost(),
'by_agent': by_agent
}4.3.3 3. 성능 분석
# shared/monitoring/performance_analyzer.py
from typing import List, Dict, Any
import numpy as np
class PerformanceAnalyzer:
"""성능 분석"""
def __init__(self):
self.executions: List[Dict] = []
def record(self, metadata: Dict[str, Any]):
"""메타데이터 기록"""
self.executions.append({
'agent': metadata['agent']['name'],
'duration_ms': metadata['execution']['duration_ms'],
'confidence': metadata['output']['confidence'],
'status': metadata['execution']['status']
})
def get_agent_stats(self, agent_name: str) -> Dict[str, Any]:
"""Agent별 통계"""
agent_execs = [
e for e in self.executions
if e['agent'] == agent_name
]
if not agent_execs:
return {}
durations = [e['duration_ms'] for e in agent_execs]
confidences = [e['confidence'] for e in agent_execs]
successes = sum(1 for e in agent_execs if e['status'] == 'success')
return {
'total_executions': len(agent_execs),
'success_rate': successes / len(agent_execs),
'avg_duration_ms': np.mean(durations),
'p95_duration_ms': np.percentile(durations, 95),
'avg_confidence': np.mean(confidences)
}5 설정 관리
5.1 환경별 설정 분리
# config/
├── base.yaml # 공통 설정
├── development.yaml # 개발 환경
├── staging.yaml # 스테이징 환경
└── production.yaml # 프로덕션 환경5.1.1 base.yaml
# config/base.yaml
platform:
name: "ai-agent-platform"
version: "1.0.0"
agents:
default_timeout: 30
max_retries: 3
llm:
default_provider: "openai"
default_model: "gpt-4"
temperature: 0.0
max_tokens: 2000
vector_store:
default_collection: "knowledge_base"
embedding_model: "text-embedding-ada-002"
monitoring:
enabled: true
log_level: "INFO"
metrics_port: 9090
prompts:
directory: "shared/prompts"
default_version: "latest"5.1.2 production.yaml
5.2 Config 로더
# shared/config/loader.py
from typing import Dict, Any
from pathlib import Path
import yaml
import os
class ConfigLoader:
"""설정 로더
환경 변수 ENV에 따라 설정 로드:
- development: base + development
- production: base + production
"""
def __init__(self, config_dir: str = "config"):
self.config_dir = Path(config_dir)
self.env = os.getenv("ENV", "development")
self.config = self._load_config()
def _load_yaml(self, file_name: str) -> Dict[str, Any]:
"""YAML 파일 로드"""
file_path = self.config_dir / file_name
if not file_path.exists():
return {}
with open(file_path) as f:
return yaml.safe_load(f) or {}
def _merge_configs(
self,
base: Dict[str, Any],
override: Dict[str, Any]
) -> Dict[str, Any]:
"""설정 병합 (재귀적)"""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._merge_configs(result[key], value)
else:
result[key] = value
return result
def _load_config(self) -> Dict[str, Any]:
"""설정 로드 (base + env)"""
base = self._load_yaml("base.yaml")
env_config = self._load_yaml(f"{self.env}.yaml")
return self._merge_configs(base, env_config)
def get(self, key: str, default: Any = None) -> Any:
"""설정 값 조회 (dot notation)
Example:
config.get("llm.default_model") # "gpt-4"
"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def get_all(self) -> Dict[str, Any]:
"""전체 설정 반환"""
return self.config.copy()
# 전역 설정 인스턴스
_config = None
def get_config() -> ConfigLoader:
"""전역 설정 조회 (Singleton)"""
global _config
if _config is None:
_config = ConfigLoader()
return _config5.3 사용 예시
# Agent에서 설정 사용
from shared.config.loader import get_config
class DataAgent(BaseAgent):
def __init__(self, metadata):
super().__init__(metadata)
config = get_config()
# LLM 설정
self.llm = LLMClient(
provider=config.get("llm.default_provider"),
model=config.get("llm.default_model"),
temperature=config.get("llm.temperature"),
cache_enabled=config.get("llm.cache_enabled", False)
)
# Prompt 설정
self.prompt_registry = PromptRegistry(
prompts_dir=config.get("prompts.directory")
)
# 타임아웃 설정
self.timeout = config.get("agents.default_timeout")6 핵심 설계 결정 요약
6.1 프롬프트 관리
- YAML 기반 저장: 버전 이력, 성능 메트릭 포함
- PromptRegistry: 중앙집중식 관리
- A/B 테스트: 통계적 유의성 검증
- Agent 독립성: 프롬프트 변경 시 Agent 재배포 불필요
6.2 벡터 데이터 관리
- VectorStoreManager: 공통 벡터 저장소
- 증분 업데이트: 변경된 문서만 업데이트
- 해시 기반 감지: 파일 변경 자동 감지
- 메타데이터 필터링: 효율적인 검색
6.3 메타데이터 표준
- StandardMetadata: 모든 Agent 공통 스키마
- 자동 생성: BaseAgent가 메타데이터 추가
- 구조화 로깅: JSON 형식
- 비용/성능 추적: 메타데이터 활용
6.4 설정 관리
- 환경별 분리: base + development/production
- YAML 형식: 가독성
- Dot notation: 계층 구조 접근
- Singleton 패턴: 전역 설정
6.5 다음 단계
이 글에서 데이터 표준화 계층을 완성했다. 다음 글에서는:
- 6번 글: 플랫폼 운영 (CI/CD, 모니터링, 배포 전략, 보안)
6.6 참고문헌
Data Management: - Kleppmann, M. (2017). “Designing Data-Intensive Applications.” O’Reilly. - Reis, J., & Housley, M. (2022). “Fundamentals of Data Engineering.” O’Reilly.
Configuration Management: - Humble, J., & Farley, D. (2010). “Continuous Delivery.” Addison-Wesley. - Newman, S. (2021). “Building Microservices.” O’Reilly.
Vector Databases: - ChromaDB Documentation. https://docs.trychroma.com/ - Pinecone Documentation. https://docs.pinecone.io/
Prompt Engineering: - OpenAI Cookbook. https://cookbook.openai.com/ - Anthropic Claude Documentation. https://docs.anthropic.com/
shared/prompt_management/
├── registry.py # 프롬프트 저장소
├── version_control.py # 버전 관리
└── ab_testing.py # A/B 테스트 도구6.6.1 프롬프트 레지스트리
# shared/prompt_management/registry.py
from typing import Dict
import yaml
class PromptRegistry:
"""프롬프트 버전 관리"""
def __init__(self, prompts_dir: str):
self.prompts_dir = prompts_dir
self._prompts: Dict[str, Dict] = {}
self._load_prompts()
def _load_prompts(self):
"""YAML에서 프롬프트 로드"""
for file in Path(self.prompts_dir).glob("*.yaml"):
with open(file) as f:
data = yaml.safe_load(f)
self._prompts[data['name']] = data
def get(self, name: str, version: str = "latest") -> str:
"""프롬프트 조회"""
prompt_data = self._prompts[name]
if version == "latest":
return prompt_data['versions'][-1]['content']
else:
for v in prompt_data['versions']:
if v['version'] == version:
return v['content']
raise ValueError(f"Prompt {name} version {version} not found")6.6.2 프롬프트 YAML 형식
# shared/prompt_management/prompts/data_standardization.yaml
name: data_standardization
domain: data
description: "데이터 스키마 표준화 프롬프트"
versions:
- version: "1.0.0"
created_at: "2026-01-28"
author: "Platform Team"
content: |
다음 데이터 스키마를 표준화하세요:
{schema}
표준 형식:
- 컬럼명: snake_case
- 날짜: ISO 8601
- 문자열: UTF-8
- version: "1.1.0"
created_at: "2026-02-01"
author: "Platform Team"
changes: "타입 검증 규칙 추가"
content: |
다음 데이터 스키마를 표준화하세요:
{schema}
표준 형식:
- 컬럼명: snake_case
- 날짜: ISO 8601 (YYYY-MM-DD)
- 문자열: UTF-8
- 타입: string, int, float, date만 허용6.7 벡터 데이터 관리
문제: RAG 기반 Agent의 벡터 데이터 업데이트 자동화
6.7.1 벡터 저장소 관리자
# shared/vector_store/manager.py
from typing import List, Dict, Any
from chromadb import Client
class VectorStoreManager:
"""벡터 데이터 관리"""
def __init__(self, collection_name: str):
self.client = Client()
self.collection = self.client.get_or_create_collection(collection_name)
def add_documents(self, documents: List[Dict[str, Any]]):
"""문서 추가 (증분 업데이트)"""
ids = [doc['id'] for doc in documents]
texts = [doc['text'] for doc in documents]
metadatas = [doc['metadata'] for doc in documents]
self.collection.add(
ids=ids,
documents=texts,
metadatas=metadatas
)
def update_document(self, doc_id: str, new_text: str):
"""문서 업데이트"""
self.collection.update(
ids=[doc_id],
documents=[new_text]
)
def search(self, query: str, n_results: int = 5) -> List[Dict]:
"""검색"""
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
return results6.8 메타데이터 표준
모든 Agent 실행 결과에 포함되는 메타데이터 스키마:
{
"agent": {
"name": "data_standardization",
"version": "1.0.0",
"domain": "data"
},
"execution": {
"timestamp": "2026-02-02T10:30:00Z",
"duration_ms": 1250,
"status": "success"
},
"input": {
"task": "standardize",
"source": "user_upload"
},
"output": {
"confidence": 0.95,
"warnings": [],
"errors": []
},
"resources": {
"llm_tokens": 1500,
"llm_cost": 0.045,
"cache_hit": false
}
}7 최종 디렉토리 구조
2번 글의 Phase 4 결과를 구체화:
ai-agent-platform/ # Monorepo 루트
├── pyproject.toml # 전체 의존성 관리
├── README.md
│
├── core/ # 플랫폼 핵심
│ ├── agent/
│ │ ├── base_agent.py # BaseAgent 추상 클래스
│ │ ├── lifecycle.py
│ │ ├── registry.py # AgentRegistry (Singleton)
│ │ └── metadata.py
│ ├── orchestrator.py # AgentOrchestrator
│ ├── monitoring.py
│ └── evaluation.py
│
├── shared/ # 공통 라이브러리
│ ├── llm/
│ │ ├── client.py # LLMClient (캐싱, 재시도)
│ │ ├── prompt_engine.py
│ │ └── context_manager.py
│ ├── validation/
│ │ ├── json_parser.py
│ │ └── schema_validator.py
│ ├── logging/
│ │ └── structured_logger.py
│ ├── monitoring/
│ │ ├── metrics.py
│ │ └── dashboard.py
│ ├── document_processing/
│ │ ├── pdf_parser.py
│ │ ├── code_parser.py
│ │ └── text_extractor.py
│ ├── vector_store/
│ │ ├── manager.py # VectorStoreManager
│ │ ├── updater.py
│ │ └── index.py
│ └── prompt_management/
│ ├── registry.py # PromptRegistry
│ ├── version_control.py
│ ├── ab_testing.py
│ └── prompts/ # YAML 프롬프트 모음
│ ├── data_standardization.yaml
│ ├── code_analysis.yaml
│ └── knowledge_qna.yaml
│
├── agents/ # 도메인 Agent들
│ ├── data_standardization/
│ │ ├── __init__.py # Agent 자동 등록
│ │ ├── agent.py # DataStandardizationAgent
│ │ ├── manifest.yaml # Agent 메타데이터
│ │ ├── rules/ # 도메인 특화 규칙
│ │ │ └── schema_rules.py
│ │ └── tests/
│ │ └── test_agent.py
│ ├── code_analysis/
│ │ ├── __init__.py
│ │ ├── agent.py
│ │ ├── manifest.yaml
│ │ ├── parsers/ # 도메인 특화 파서
│ │ │ ├── python_parser.py
│ │ │ └── java_parser.py
│ │ └── tests/
│ └── knowledge_qna/
│ ├── __init__.py
│ ├── agent.py
│ ├── manifest.yaml
│ ├── rag_pipeline.py # 도메인 특화 RAG
│ └── tests/
│
├── platform-api/ # API Gateway
│ ├── main.py # FastAPI 앱
│ ├── routes/
│ │ ├── agents.py # /api/agents/*
│ │ ├── monitoring.py # /api/metrics/*
│ │ └── management.py # /api/manage/*
│ └── middleware/
│ ├── auth.py
│ └── logging.py
│
├── infra/ # 인프라 코드
│ ├── docker/
│ │ ├── Dockerfile.base
│ │ ├── Dockerfile.agent
│ │ └── docker-compose.yml
│ ├── k8s/ # Kubernetes 매니페스트
│ │ ├── core-deployment.yaml
│ │ ├── agent-deployment.yaml
│ │ └── service.yaml
│ └── ci_cd/
│ └── .github/
│ └── workflows/
│ ├── test.yml # 변경 감지 + 테스트
│ └── deploy.yml
│
└── tests/ # 통합 테스트
├── integration/
│ ├── test_agent_chain.py # Agent 체이닝 테스트
│ └── test_orchestrator.py
└── performance/
└── test_load.py
8 CI/CD 파이프라인
8.1 변경 감지 및 선택적 테스트
# .github/workflows/test.yml
name: Test Affected Components
on: [push, pull_request]
jobs:
detect-changes:
runs-on: ubuntu-latest
outputs:
affected: ${{ steps.detect.outputs.affected }}
steps:
- uses: actions/checkout@v2
- name: Detect changed modules
id: detect
run: |
CHANGED=$(git diff --name-only HEAD~1)
if echo "$CHANGED" | grep -E '^(core|shared)/'; then
echo "affected=all" >> $GITHUB_OUTPUT
else
AGENTS=$(echo "$CHANGED" | grep 'agents/' | cut -d'/' -f2 | sort -u)
echo "affected=$AGENTS" >> $GITHUB_OUTPUT
fi
test:
needs: detect-changes
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- name: Install dependencies
run: |
pip install poetry
poetry install
- name: Run tests
run: |
if [ "${{ needs.detect-changes.outputs.affected }}" = "all" ]; then
poetry run pytest
else
for agent in ${{ needs.detect-changes.outputs.affected }}; do
poetry run pytest agents/$agent/tests/
done
fi9 다음 단계
이 글에서 “최종 플랫폼 구조”를 구체화했다면, 다음 질문은:
“실제로 어떻게 운영하고 자동화하는가?”
다음 글 “AI Agent 플랫폼 운영 자동화와 DevOps”에서는: - 배포 전략: Blue-Green, Canary, A/B 테스트 - 모니터링: Agent별 메트릭, 대시보드, 알림 - 로깅: 구조화된 로그, 분석 - 보안: 인증, 권한, API 키 관리
를 다룬다.