Python structured logging — 운영 환경 적용 패턴

JSON 로그·correlation ID·async 안전·log aggregator — MINERVA 08-1 timing 토대

기초 logging 모듈(레벨·handler·formatter)은 별도 글에서 다루고, 본 글은 운영 환경에서 직접 쓰는 structured logging 패턴을 정리한다. JSON 포맷, correlation ID로 요청 추적, async 환경 안전성, log aggregator 친화 출력, 시크릿 마스킹, QueueHandler까지. MINERVA 08-1편 perf_counter timing 로그·11-1편 /health/build 토대.

Engineering
저자

Kwangmin Kim

공개

2026년 05월 06일

1 본 글의 위치

Python Logging (기초)가 print vs logging 비교, 5단계 레벨, basicConfig, 파일·콘솔 핸들러, 로테이션, Streamlit 환경 권장 패턴을 다룬다. 본 글은 그 토대 위에서 운영 환경에 직접 적용하는 패턴에 집중한다.

다루는 내용 기초 글 본 글 (운영)
레벨·basicConfig Y (자세히) 짧게
파일·콘솔 handler Y 짧게
structured logging (JSON) Y (핵심)
correlation ID (요청 추적) Y (핵심)
async 환경 안전성 (QueueHandler) Y
log aggregator (Datadog/Loki/ELK) 친화 포맷 Y
시크릿 마스킹 Y
Streamlit 패턴 Y (자세히)
MINERVA 시리즈에서의 활용 위치

2 structured logging — 줄 단위 JSON

전통적 로그는 사람이 읽기 위한 자유 텍스트다. 그러나 운영 환경에서는 로그를 기계가 파싱해야 한다 — Datadog·Loki·ELK 같은 aggregator가 로그를 인덱싱하고 필터·집계·알림을 만든다. 자유 텍스트는 정규식으로 파싱하는데 형식이 조금만 바뀌어도 깨진다.

해결: 각 로그 줄을 JSON으로 출력한다. aggregator가 키를 그대로 인식한다.

import json
import logging
from datetime import datetime


class JSONFormatter(logging.Formatter):
    """모든 로그를 JSON 한 줄로 출력."""

    def format(self, record: logging.LogRecord) -> str:
        log_obj = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        # extra= 인자로 전달한 컨텍스트도 포함
        for key, value in record.__dict__.items():
            if key not in {"args", "exc_info", "exc_text", "msg", "stack_info",
                            "name", "levelname", "levelno", "pathname", "filename",
                            "module", "lineno", "funcName", "created", "msecs",
                            "relativeCreated", "thread", "threadName", "processName",
                            "process", "message", "asctime"}:
                log_obj[key] = value
        # 예외 정보가 있으면 traceback 포함
        if record.exc_info:
            log_obj["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_obj, ensure_ascii=False, default=str)


# 적용
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(handlers=[handler], level=logging.INFO)

출력 예시:

{"timestamp":"2026-05-06T10:30:45.123456Z","level":"INFO","logger":"qna_chatbot","message":"Query received","module":"agent","function":"run","line":42,"user_id":"anon-9d3f2a"}

message는 사람이 읽고, 나머지 키는 기계가 인덱싱한다. Datadog에서 level:ERROR user_id:"anon-9d3f2a" 같은 쿼리가 즉시 가능해진다.

3 extra= 인자 — context 부착

logger.info(...) 호출 시 extra= 인자로 임의 키를 부착한다. JSONFormatter가 이를 자동 포함한다.

import logging
logger = logging.getLogger(__name__)


def run(query):
    logger.info(
        "Query received",
        extra={
            "user_id": query.user_id,
            "experiment_id": query.experiment_id,
            "arm_id": query.arm_id,
            "query_length": len(query.text),
        },
    )

이 한 줄이 다음 5개 키가 포함된 JSON 로그를 만든다 — aggregator에서 experiment_id:"reranker_v2" arm_id:"treatment"로 쿼리해 특정 실험 arm의 동작만 추적할 수 있다.

extra=의 키 충돌

extra={"name": "..."}처럼 LogRecord 표준 속성과 같은 이름을 쓰면 KeyError가 발생한다. 충돌 가능성 있는 키(message, name, module, lineno 등)는 ctx_name 같이 prefix를 붙여 회피한다.

4 correlation ID — 요청 추적

마이크로서비스나 여러 단계 처리에서는 하나의 요청이 여러 로그 줄로 나뉘어 기록된다. 이 줄들을 같은 요청으로 묶으려면 correlation_id(또는 request_id, trace_id)를 모든 로그에 부착한다.

MINERVA의 Response.run_id가 이 패턴 그대로다 — uuid.uuid4().hex로 생성된 ID가 응답·메트릭(runs.jsonl)·피드백(feedback.jsonl)을 한 키로 묶는다.

4.1 contextvars로 자동 전파

매 로그 호출마다 extra={"run_id": ...}를 명시하기 번거롭다. contextvars로 요청 시작 시점에 한 번 설정하면 그 요청 안의 모든 로그가 자동 부착되도록 만들 수 있다.

import contextvars
import logging
import uuid


# 요청 단위 context 변수
run_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("run_id", default="-")


class CorrelationFilter(logging.Filter):
    """모든 LogRecord에 run_id를 자동 추가."""

    def filter(self, record: logging.LogRecord) -> bool:
        record.run_id = run_id_var.get()
        return True


# 적용
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
handler.addFilter(CorrelationFilter())
logging.basicConfig(handlers=[handler], level=logging.INFO)


# 라우터에서 요청 시작 시 한 번만 설정
async def run_endpoint(request):
    token = run_id_var.set(uuid.uuid4().hex)
    try:
        logger.info("Request received")
        result = await agent.run(query)         # 이 안의 모든 로그에 run_id 자동 부착
        logger.info("Request completed")
        return result
    finally:
        run_id_var.reset(token)

이제 어떤 로그 줄이든 같은 요청이면 같은 run_id를 가진다. aggregator에서 한 요청의 전체 흐름을 한 쿼리로 볼 수 있다.

5 async 환경에서의 logging — QueueHandler

기본 StreamHandlerFileHandler동기 I/O다. async 환경에서 이걸 호출하면 이벤트 루프가 잠깐 막힌다 — 로그 쓰기가 ms 단위면 무시할 수 있지만 stdout이 막혀 있거나 디스크가 느리면 문제가 될 수 있다.

해결: QueueHandler + 별도 스레드의 QueueListener.

import logging
import logging.handlers
import queue


# 1. 큐 + 진짜 handler를 별도 스레드에서 운영
log_queue = queue.Queue(maxsize=10000)
real_handler = logging.StreamHandler()                      # 진짜 출력
real_handler.setFormatter(JSONFormatter())
listener = logging.handlers.QueueListener(log_queue, real_handler)
listener.start()                                             # 백그라운드 스레드 시작


# 2. 애플리케이션은 큐로만 push (논블로킹)
queue_handler = logging.handlers.QueueHandler(log_queue)
logging.basicConfig(handlers=[queue_handler], level=logging.INFO)


# logger.info(...) 호출 → 큐에 메시지 넣고 즉시 반환
# 별도 스레드의 listener가 큐에서 꺼내 진짜 handler에 전달

이 패턴으로 async 코루틴이 logger.info(...) 호출 시 마이크로초 단위로 반환되고, 실제 I/O는 백그라운드 스레드에서 처리된다.

queue 가득 참

maxsize를 두면 큐가 가득 찼을 때 queue.Full 예외가 발생한다(block=False 시). 운영에서는 이를 피하려면:

  • maxsize=0 (무제한) — 메모리 사용 위험
  • maxsize=N + 큐 가득 시 가장 오래된 메시지 drop (직접 구현 필요)
  • maxsize=N + 워커 처리량 충분히 높게 유지

대부분의 운영에서는 maxsize=10000 + listener의 처리량 충분이면 문제 없다.

6 log aggregator 친화 포맷

각 aggregator가 기대하는 JSON 키 이름이 약간씩 다르다.

Aggregator 핵심 키
Datadog level, message, service, host, dd.trace_id, dd.span_id
Loki (Grafana) label 기반 — 메타키를 label로 추가
ELK (Elasticsearch) @timestamp, level, message, host, service.name
CloudWatch level, message, AWS X-Ray trace_id
Azure App Insights customDimensions 안에 모든 컨텍스트

JSONFormatter를 aggregator마다 약간씩 조정한다. 또는 표준 OpenTelemetry 형식을 쓰면 여러 aggregator가 공통 처리.

class OTelFormatter(logging.Formatter):
    """OpenTelemetry-ish 형식."""

    def format(self, record: logging.LogRecord) -> str:
        return json.dumps({
            "Timestamp": datetime.utcnow().isoformat() + "Z",
            "SeverityText": record.levelname,
            "Body": record.getMessage(),
            "Attributes": {
                "logger.name": record.name,
                "code.module": record.module,
                "code.function": record.funcName,
                "code.lineno": record.lineno,
                **{k: v for k, v in record.__dict__.items()
                   if k.startswith("ctx_") or k in ("run_id", "user_id", "experiment_id")},
            },
        }, ensure_ascii=False, default=str)

7 perf_counter + 구조화 timing 로그 — MINERVA 08-1 패턴

MINERVA 08-1편의 [timing] 로그는 단순 텍스트 형식이지만, structured logging으로 옮기면 aggregator가 자동 집계한다.

7.1 텍스트 (현재)

[timing] hybrid_search: azure=420ms rerank/cosine=15ms map_parents=2ms children=12 parents=6
[timing] prepare: init=0ms retrieve=437ms ctx+history=1ms chain=3ms
[ttft] qna_chatbot total=1820ms (prepare=441ms llm_first_token=1379ms)

7.2 structured (개선)

import logging
import time

logger = logging.getLogger(__name__)


class TimingScope:
    """with 블록 진입·종료 시점 측정 + 자동 로그."""

    def __init__(self, name: str, **context):
        self.name = name
        self.context = context

    def __enter__(self):
        self.t0 = time.perf_counter()
        return self

    def __exit__(self, *args):
        ms = int((time.perf_counter() - self.t0) * 1000)
        logger.info(
            f"timing.{self.name}",
            extra={
                "timing_event": self.name,
                "duration_ms": ms,
                **self.context,
            },
        )


# 사용
with TimingScope("hybrid_search", k=6):
    docs = retriever.hybrid_search(query, k=6)

with TimingScope("rerank", reranker_type="cross_encoder"):
    docs = reranker.rerank(docs)

JSON 출력:

{"timestamp":"...", "level":"INFO", "message":"timing.hybrid_search",
 "timing_event":"hybrid_search", "duration_ms":420, "k":6, "run_id":"a3f2b1c"}
{"timestamp":"...", "level":"INFO", "message":"timing.rerank",
 "timing_event":"rerank", "duration_ms":15, "reranker_type":"cross_encoder", "run_id":"a3f2b1c"}

aggregator에서 다음 같은 분석이 즉시 가능:

  • timing_event:hybrid_search의 p95 duration_ms
  • reranker_type별 latency 분포
  • 특정 run_id의 모든 단계 시간 합

8 로그 레벨을 환경변수로 제어

운영·개발에서 다른 레벨을 쓰려면 환경변수로 제어한다 (환경변수와 dotenv 참조).

import logging
import os

LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO))

LOG_LEVEL=DEBUG로 띄우면 DEBUG 이상 모두 출력. 운영에서는 INFO 또는 WARNING. 디버깅 필요 시 일시적으로 DEBUG로 띄워 회귀 추적.

MINERVA 04편이 정확히 이 패턴 — MINERVA_LOG_LEVEL env로 제어.

9 시크릿 마스킹 — 민감 정보가 로그에 새어 나가지 않게

import logging
import re


SECRET_PATTERNS = [
    re.compile(r"sk-[a-zA-Z0-9]{20,}"),                     # OpenAI key
    re.compile(r"AKIA[A-Z0-9]{16}"),                        # AWS key
    re.compile(r'"password"\s*:\s*"([^"]+)"'),              # JSON password
]


class SecretMaskFilter(logging.Filter):
    """LogRecord의 메시지·인자에서 시크릿 패턴 마스킹."""

    def filter(self, record: logging.LogRecord) -> bool:
        msg = record.getMessage()
        for pattern in SECRET_PATTERNS:
            msg = pattern.sub("***REDACTED***", msg)
        record.msg = msg
        record.args = ()                                     # 이미 메시지에 흡수
        return True


handler.addFilter(SecretMaskFilter())

이 필터로 logger.info(f"Calling API with key={api_key}") 같은 실수도 자동 마스킹된다. 단 모든 패턴을 다 잡지 못하므로 시크릿을 로그에 의도적으로 넣지 않는 것이 1차 안전책이다.

10 자주 발생하는 오류 패턴

WRONG:

def handle_request(req):
    print(f"Request received: {req.user_id}")          # stdout으로 직행
    print(f"DEBUG: arm={req.arm_id}")                   # 운영 로그에 디버그 섞임

CORRECT:

import logging
logger = logging.getLogger(__name__)

def handle_request(req):
    logger.info("Request received", extra={"user_id": req.user_id})
    logger.debug("ARM info", extra={"arm_id": req.arm_id})

print는 레벨·구조·context가 없다. logging은 운영에서 레벨로 디버그 메시지를 끄거나 켤 수 있다.

WRONG:

logger.info(f"Request from {user_id} took {duration}ms")   # 메시지 자체에 값 포함

CORRECT:

logger.info("Request completed",
             extra={"user_id": user_id, "duration_ms": duration})

f-string으로 값을 메시지에 담으면 aggregator가 같은 메시지를 그룹화하기 어렵다. 메시지는 고정 문자열로, 변하는 값은 extra=로.

WRONG:

# async 환경에서 SlackHandler 직접 사용 (HTTP 요청 동기 호출)
class SlackHandler(logging.Handler):
    def emit(self, record):
        requests.post(SLACK_URL, json={...})              # 동기 I/O — 이벤트 루프 차단

CORRECT:

# QueueHandler + 별도 스레드의 listener에서 SlackHandler 사용
log_queue = queue.Queue(maxsize=1000)
listener = QueueListener(log_queue, SlackHandler())
listener.start()
logger.addHandler(QueueHandler(log_queue))

async 코루틴 안에서 동기 I/O를 하는 handler를 직접 쓰면 이벤트 루프가 막힌다. QueueHandler로 분리.

WRONG:

try:
    do_something()
except Exception as e:
    logger.error(f"Error: {e}")                         # traceback 사라짐

CORRECT:

try:
    do_something()
except Exception as e:
    logger.error("Operation failed", exc_info=True)     # traceback 포함

exc_info=True로 예외 traceback이 로그에 포함된다. 디버깅 시 어디서 예외가 발생했는지 추적 가능.

11 정리

패턴 해결
자유 텍스트 → 기계 파싱 어려움 JSONFormatter로 줄 단위 JSON 출력
같은 요청의 로그 여러 줄 묶기 correlation ID (run_id) + contextvars + Filter
async 환경에서 I/O 차단 QueueHandler + 별도 스레드 listener
aggregator마다 다른 JSON 키 OpenTelemetry 형식 또는 aggregator별 formatter
시크릿이 로그에 새어 나감 SecretMaskFilter + 의도적 회피
f-string 메시지로 그룹화 어려움 메시지는 고정 문자열, 값은 extra=
예외 traceback 사라짐 exc_info=True

12 응용 분야

MINERVA 시리즈 적용 본 글 패턴
08-1 [timing] 로그 TimingScope with 블록 + extra={"duration_ms": ...}
04 MINERVA_LOG_LEVEL env 환경변수로 레벨 제어
Response.run_id 로그 추적 correlation ID + contextvars
11-1 /health/build JSON structured logging 자연스러운 확장
FastAPI async 라우터 QueueHandler로 이벤트 루프 차단 회피
Azure OpenAI key 보호 SecretMaskFilter

13 관련 주제

선행 학습

MINERVA 시리즈 응용

Phase C-9 연결 (예정)

  • C34 관측성 설계 – structured log + tracing 통합

Subscribe

Enjoy this blog? Get notified of new posts by email: