1 본 글의 위치
Python Logging (기초)가 print vs logging 비교, 5단계 레벨, basicConfig, 파일·콘솔 핸들러, 로테이션, Streamlit 환경 권장 패턴을 다룬다. 본 글은 그 토대 위에서 운영 환경에 직접 적용하는 패턴에 집중한다.
| 다루는 내용 | 기초 글 | 본 글 (운영) |
|---|---|---|
| 레벨·basicConfig | Y (자세히) | 짧게 |
| 파일·콘솔 handler | Y | 짧게 |
| structured logging (JSON) | — | Y (핵심) |
| correlation ID (요청 추적) | — | Y (핵심) |
| async 환경 안전성 (QueueHandler) | — | Y |
| log aggregator (Datadog/Loki/ELK) 친화 포맷 | — | Y |
| 시크릿 마스킹 | — | Y |
| Streamlit 패턴 | Y (자세히) | — |
- MINERVA 08-1편 스트리밍·관측성 –
perf_counter기반[timing]로그 - MINERVA 11-1편 Config 운영 패턴 –
/health/build엔드포인트 + commit hash baking - MINERVA 04편 FastAPI 서빙 – 라우터의
logger.info사용
2 structured logging — 줄 단위 JSON
전통적 로그는 사람이 읽기 위한 자유 텍스트다. 그러나 운영 환경에서는 로그를 기계가 파싱해야 한다 — Datadog·Loki·ELK 같은 aggregator가 로그를 인덱싱하고 필터·집계·알림을 만든다. 자유 텍스트는 정규식으로 파싱하는데 형식이 조금만 바뀌어도 깨진다.
해결: 각 로그 줄을 JSON으로 출력한다. aggregator가 키를 그대로 인식한다.
import json
import logging
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""모든 로그를 JSON 한 줄로 출력."""
def format(self, record: logging.LogRecord) -> str:
log_obj = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# extra= 인자로 전달한 컨텍스트도 포함
for key, value in record.__dict__.items():
if key not in {"args", "exc_info", "exc_text", "msg", "stack_info",
"name", "levelname", "levelno", "pathname", "filename",
"module", "lineno", "funcName", "created", "msecs",
"relativeCreated", "thread", "threadName", "processName",
"process", "message", "asctime"}:
log_obj[key] = value
# 예외 정보가 있으면 traceback 포함
if record.exc_info:
log_obj["exception"] = self.formatException(record.exc_info)
return json.dumps(log_obj, ensure_ascii=False, default=str)
# 적용
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(handlers=[handler], level=logging.INFO)출력 예시:
{"timestamp":"2026-05-06T10:30:45.123456Z","level":"INFO","logger":"qna_chatbot","message":"Query received","module":"agent","function":"run","line":42,"user_id":"anon-9d3f2a"}message는 사람이 읽고, 나머지 키는 기계가 인덱싱한다. Datadog에서 level:ERROR user_id:"anon-9d3f2a" 같은 쿼리가 즉시 가능해진다.
3 extra= 인자 — context 부착
logger.info(...) 호출 시 extra= 인자로 임의 키를 부착한다. JSONFormatter가 이를 자동 포함한다.
import logging
logger = logging.getLogger(__name__)
def run(query):
logger.info(
"Query received",
extra={
"user_id": query.user_id,
"experiment_id": query.experiment_id,
"arm_id": query.arm_id,
"query_length": len(query.text),
},
)이 한 줄이 다음 5개 키가 포함된 JSON 로그를 만든다 — aggregator에서 experiment_id:"reranker_v2" arm_id:"treatment"로 쿼리해 특정 실험 arm의 동작만 추적할 수 있다.
extra={"name": "..."}처럼 LogRecord 표준 속성과 같은 이름을 쓰면 KeyError가 발생한다. 충돌 가능성 있는 키(message, name, module, lineno 등)는 ctx_name 같이 prefix를 붙여 회피한다.
4 correlation ID — 요청 추적
마이크로서비스나 여러 단계 처리에서는 하나의 요청이 여러 로그 줄로 나뉘어 기록된다. 이 줄들을 같은 요청으로 묶으려면 correlation_id(또는 request_id, trace_id)를 모든 로그에 부착한다.
MINERVA의 Response.run_id가 이 패턴 그대로다 — uuid.uuid4().hex로 생성된 ID가 응답·메트릭(runs.jsonl)·피드백(feedback.jsonl)을 한 키로 묶는다.
4.1 contextvars로 자동 전파
매 로그 호출마다 extra={"run_id": ...}를 명시하기 번거롭다. contextvars로 요청 시작 시점에 한 번 설정하면 그 요청 안의 모든 로그가 자동 부착되도록 만들 수 있다.
import contextvars
import logging
import uuid
# 요청 단위 context 변수
run_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("run_id", default="-")
class CorrelationFilter(logging.Filter):
"""모든 LogRecord에 run_id를 자동 추가."""
def filter(self, record: logging.LogRecord) -> bool:
record.run_id = run_id_var.get()
return True
# 적용
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
handler.addFilter(CorrelationFilter())
logging.basicConfig(handlers=[handler], level=logging.INFO)
# 라우터에서 요청 시작 시 한 번만 설정
async def run_endpoint(request):
token = run_id_var.set(uuid.uuid4().hex)
try:
logger.info("Request received")
result = await agent.run(query) # 이 안의 모든 로그에 run_id 자동 부착
logger.info("Request completed")
return result
finally:
run_id_var.reset(token)이제 어떤 로그 줄이든 같은 요청이면 같은 run_id를 가진다. aggregator에서 한 요청의 전체 흐름을 한 쿼리로 볼 수 있다.
5 async 환경에서의 logging — QueueHandler
기본 StreamHandler나 FileHandler는 동기 I/O다. async 환경에서 이걸 호출하면 이벤트 루프가 잠깐 막힌다 — 로그 쓰기가 ms 단위면 무시할 수 있지만 stdout이 막혀 있거나 디스크가 느리면 문제가 될 수 있다.
해결: QueueHandler + 별도 스레드의 QueueListener.
import logging
import logging.handlers
import queue
# 1. 큐 + 진짜 handler를 별도 스레드에서 운영
log_queue = queue.Queue(maxsize=10000)
real_handler = logging.StreamHandler() # 진짜 출력
real_handler.setFormatter(JSONFormatter())
listener = logging.handlers.QueueListener(log_queue, real_handler)
listener.start() # 백그라운드 스레드 시작
# 2. 애플리케이션은 큐로만 push (논블로킹)
queue_handler = logging.handlers.QueueHandler(log_queue)
logging.basicConfig(handlers=[queue_handler], level=logging.INFO)
# logger.info(...) 호출 → 큐에 메시지 넣고 즉시 반환
# 별도 스레드의 listener가 큐에서 꺼내 진짜 handler에 전달이 패턴으로 async 코루틴이 logger.info(...) 호출 시 마이크로초 단위로 반환되고, 실제 I/O는 백그라운드 스레드에서 처리된다.
maxsize를 두면 큐가 가득 찼을 때 queue.Full 예외가 발생한다(block=False 시). 운영에서는 이를 피하려면:
maxsize=0(무제한) — 메모리 사용 위험maxsize=N+ 큐 가득 시 가장 오래된 메시지 drop (직접 구현 필요)maxsize=N+ 워커 처리량 충분히 높게 유지
대부분의 운영에서는 maxsize=10000 + listener의 처리량 충분이면 문제 없다.
6 log aggregator 친화 포맷
각 aggregator가 기대하는 JSON 키 이름이 약간씩 다르다.
| Aggregator | 핵심 키 |
|---|---|
| Datadog | level, message, service, host, dd.trace_id, dd.span_id |
| Loki (Grafana) | label 기반 — 메타키를 label로 추가 |
| ELK (Elasticsearch) | @timestamp, level, message, host, service.name |
| CloudWatch | level, message, AWS X-Ray trace_id |
| Azure App Insights | customDimensions 안에 모든 컨텍스트 |
JSONFormatter를 aggregator마다 약간씩 조정한다. 또는 표준 OpenTelemetry 형식을 쓰면 여러 aggregator가 공통 처리.
class OTelFormatter(logging.Formatter):
"""OpenTelemetry-ish 형식."""
def format(self, record: logging.LogRecord) -> str:
return json.dumps({
"Timestamp": datetime.utcnow().isoformat() + "Z",
"SeverityText": record.levelname,
"Body": record.getMessage(),
"Attributes": {
"logger.name": record.name,
"code.module": record.module,
"code.function": record.funcName,
"code.lineno": record.lineno,
**{k: v for k, v in record.__dict__.items()
if k.startswith("ctx_") or k in ("run_id", "user_id", "experiment_id")},
},
}, ensure_ascii=False, default=str)7 perf_counter + 구조화 timing 로그 — MINERVA 08-1 패턴
MINERVA 08-1편의 [timing] 로그는 단순 텍스트 형식이지만, structured logging으로 옮기면 aggregator가 자동 집계한다.
7.1 텍스트 (현재)
[timing] hybrid_search: azure=420ms rerank/cosine=15ms map_parents=2ms children=12 parents=6
[timing] prepare: init=0ms retrieve=437ms ctx+history=1ms chain=3ms
[ttft] qna_chatbot total=1820ms (prepare=441ms llm_first_token=1379ms)
7.2 structured (개선)
import logging
import time
logger = logging.getLogger(__name__)
class TimingScope:
"""with 블록 진입·종료 시점 측정 + 자동 로그."""
def __init__(self, name: str, **context):
self.name = name
self.context = context
def __enter__(self):
self.t0 = time.perf_counter()
return self
def __exit__(self, *args):
ms = int((time.perf_counter() - self.t0) * 1000)
logger.info(
f"timing.{self.name}",
extra={
"timing_event": self.name,
"duration_ms": ms,
**self.context,
},
)
# 사용
with TimingScope("hybrid_search", k=6):
docs = retriever.hybrid_search(query, k=6)
with TimingScope("rerank", reranker_type="cross_encoder"):
docs = reranker.rerank(docs)JSON 출력:
{"timestamp":"...", "level":"INFO", "message":"timing.hybrid_search",
"timing_event":"hybrid_search", "duration_ms":420, "k":6, "run_id":"a3f2b1c"}
{"timestamp":"...", "level":"INFO", "message":"timing.rerank",
"timing_event":"rerank", "duration_ms":15, "reranker_type":"cross_encoder", "run_id":"a3f2b1c"}aggregator에서 다음 같은 분석이 즉시 가능:
timing_event:hybrid_search의 p95 duration_msreranker_type별 latency 분포- 특정
run_id의 모든 단계 시간 합
8 로그 레벨을 환경변수로 제어
운영·개발에서 다른 레벨을 쓰려면 환경변수로 제어한다 (환경변수와 dotenv 참조).
import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO))LOG_LEVEL=DEBUG로 띄우면 DEBUG 이상 모두 출력. 운영에서는 INFO 또는 WARNING. 디버깅 필요 시 일시적으로 DEBUG로 띄워 회귀 추적.
MINERVA 04편이 정확히 이 패턴 — MINERVA_LOG_LEVEL env로 제어.
9 시크릿 마스킹 — 민감 정보가 로그에 새어 나가지 않게
import logging
import re
SECRET_PATTERNS = [
re.compile(r"sk-[a-zA-Z0-9]{20,}"), # OpenAI key
re.compile(r"AKIA[A-Z0-9]{16}"), # AWS key
re.compile(r'"password"\s*:\s*"([^"]+)"'), # JSON password
]
class SecretMaskFilter(logging.Filter):
"""LogRecord의 메시지·인자에서 시크릿 패턴 마스킹."""
def filter(self, record: logging.LogRecord) -> bool:
msg = record.getMessage()
for pattern in SECRET_PATTERNS:
msg = pattern.sub("***REDACTED***", msg)
record.msg = msg
record.args = () # 이미 메시지에 흡수
return True
handler.addFilter(SecretMaskFilter())이 필터로 logger.info(f"Calling API with key={api_key}") 같은 실수도 자동 마스킹된다. 단 모든 패턴을 다 잡지 못하므로 시크릿을 로그에 의도적으로 넣지 않는 것이 1차 안전책이다.
10 자주 발생하는 오류 패턴
def handle_request(req):
print(f"Request received: {req.user_id}") # stdout으로 직행
print(f"DEBUG: arm={req.arm_id}") # 운영 로그에 디버그 섞임CORRECT:
import logging
logger = logging.getLogger(__name__)
def handle_request(req):
logger.info("Request received", extra={"user_id": req.user_id})
logger.debug("ARM info", extra={"arm_id": req.arm_id})print는 레벨·구조·context가 없다. logging은 운영에서 레벨로 디버그 메시지를 끄거나 켤 수 있다.
CORRECT:
f-string으로 값을 메시지에 담으면 aggregator가 같은 메시지를 그룹화하기 어렵다. 메시지는 고정 문자열로, 변하는 값은 extra=로.
# async 환경에서 SlackHandler 직접 사용 (HTTP 요청 동기 호출)
class SlackHandler(logging.Handler):
def emit(self, record):
requests.post(SLACK_URL, json={...}) # 동기 I/O — 이벤트 루프 차단CORRECT:
# QueueHandler + 별도 스레드의 listener에서 SlackHandler 사용
log_queue = queue.Queue(maxsize=1000)
listener = QueueListener(log_queue, SlackHandler())
listener.start()
logger.addHandler(QueueHandler(log_queue))async 코루틴 안에서 동기 I/O를 하는 handler를 직접 쓰면 이벤트 루프가 막힌다. QueueHandler로 분리.
CORRECT:
try:
do_something()
except Exception as e:
logger.error("Operation failed", exc_info=True) # traceback 포함exc_info=True로 예외 traceback이 로그에 포함된다. 디버깅 시 어디서 예외가 발생했는지 추적 가능.
11 정리
| 패턴 | 해결 |
|---|---|
| 자유 텍스트 → 기계 파싱 어려움 | JSONFormatter로 줄 단위 JSON 출력 |
| 같은 요청의 로그 여러 줄 묶기 | correlation ID (run_id) + contextvars + Filter |
| async 환경에서 I/O 차단 | QueueHandler + 별도 스레드 listener |
| aggregator마다 다른 JSON 키 | OpenTelemetry 형식 또는 aggregator별 formatter |
| 시크릿이 로그에 새어 나감 | SecretMaskFilter + 의도적 회피 |
| f-string 메시지로 그룹화 어려움 | 메시지는 고정 문자열, 값은 extra= |
| 예외 traceback 사라짐 | exc_info=True |
12 응용 분야
| MINERVA 시리즈 적용 | 본 글 패턴 |
|---|---|
08-1 [timing] 로그 |
TimingScope with 블록 + extra={"duration_ms": ...} |
04 MINERVA_LOG_LEVEL env |
환경변수로 레벨 제어 |
Response.run_id 로그 추적 |
correlation ID + contextvars |
11-1 /health/build JSON |
structured logging 자연스러운 확장 |
| FastAPI async 라우터 | QueueHandler로 이벤트 루프 차단 회피 |
| Azure OpenAI key 보호 | SecretMaskFilter |
13 관련 주제
선행 학습
- Python Logging 기초 – print vs logging, 레벨, basicConfig, 파일/콘솔 핸들러
- Python async/await 실전 패턴 – async 환경 logging의 토대
- 환경변수와 dotenv – LOG_LEVEL 환경변수 패턴
MINERVA 시리즈 응용
- MINERVA 스트리밍·관측성 (08-1) – perf_counter timing 로그
- MINERVA Config 운영 (11-1) – /health/build commit hash
- MINERVA FastAPI 서빙 (04) – 라우터 logger 사용
Phase C-9 연결 (예정)
- C34 관측성 설계 – structured log + tracing 통합