Hidden Markov Model (HMM) for Longitudinal Data

관측 불가능한 잠재 상태 전환의 발견

종단 데이터에서 관찰 가능한 행동 이면의 잠재 상태를 발견하는 Hidden Markov Model을 다룬다. HMM의 수학적 구조(초기 확률 π, 전환 행렬 A, 방출 분포 B), 3대 핵심 문제(Forward, Viterbi, Baum-Welch), 잠재 상태 수 결정(BIC/AIC), AI Agent 사용자 상태 전환 실무 예시를 Python(hmmlearn)과 R(depmixS4)로 구현한다.

Statistics
Machine Learning
Latent Variable Models
저자

Kwangmin Kim

공개

2026년 03월 08일

1 Hidden Markov Model (HMM) for Longitudinal Data

1.1 왜 HMM인가: 잠재 상태의 발견

1.1.1 관찰 가능한 행동 vs 관찰 불가능한 상태

종단 데이터에서 우리가 측정하는 것은 관찰 가능한 행동이다:

  • 만족도 점수, 세션 길이, 질문 유형, 감정 점수, 클릭 패턴

하지만 이 행동들 이면에는 관찰 불가능한 잠재 상태(latent state)가 존재한다:

  • “탐색 중”, “학습 중”, “문제 해결 중”, “이탈 직전”

이 잠재 상태를 명시적으로 모델링하면:

  1. 사용자 여정(journey)을 이해할 수 있다 — “어떤 경로로 이탈에 도달하는가?”
  2. 이탈 직전 상태를 감지할 수 있다 — 아직 이탈하지 않았지만 위험한 사용자
  3. 개입 시점을 결정할 수 있다 — “이 상태에 진입하면 즉시 개입”

1.1.2 LMM/XGBoost와의 차이

모델 접근 한계
LMM 관측값의 평균 구조를 모델링 잠재 상태 개념 없음
XGBoost 관측값으로 이탈을 직접 예측 “왜” 이탈하는지 설명 못 함
HMM 잠재 상태 전환을 모델링 상태 간 전환 패턴을 직접 학습
LMM/XGBoost: 관측값 → 예측
HMM:         관측값 → 잠재 상태 추론 → 상태 전환 패턴 → 예측 + 해석

1.2 HMM 수학적 구조

1.2.1 모델 정의

HMM은 다음 3요소로 정의된다: \(\lambda = (\pi, A, B)\)

1.2.1.1 1. 초기 상태 확률 \(\pi\)

\(t=1\)에서 각 상태에 있을 확률:

\[\pi_k = P(S_1 = k), \quad k = 1, \ldots, K\]

\[\sum_{k=1}^{K} \pi_k = 1\]

예시 (4개 잠재 상태):

\[\pi = \begin{pmatrix} 0.40 \\ 0.35 \\ 0.20 \\ 0.05 \end{pmatrix} \quad \leftarrow \begin{matrix} \text{탐색 중} \\ \text{학습 중} \\ \text{문제 해결} \\ \text{이탈 직전} \end{matrix}\]

대부분의 사용자가 “탐색 중” 또는 “학습 중”으로 시작한다.

1.2.1.2 2. 상태 전환 행렬 \(A\)

시점 \(t\)에서 상태 \(i\)였을 때, 시점 \(t+1\)에서 상태 \(j\)로 전환할 확률:

\[a_{ij} = P(S_{t+1} = j \mid S_t = i)\]

\[A = \begin{pmatrix} a_{11} & a_{12} & \cdots & a_{1K} \\ a_{21} & a_{22} & \cdots & a_{2K} \\ \vdots & \vdots & \ddots & \vdots \\ a_{K1} & a_{K2} & \cdots & a_{KK} \end{pmatrix}\]

각 행의 합 = 1 (확률 분포):

\[\sum_{j=1}^{K} a_{ij} = 1 \quad \forall i\]

핵심 가정 (1차 Markov 속성):

\[P(S_{t+1} \mid S_t, S_{t-1}, \ldots, S_1) = P(S_{t+1} \mid S_t)\]

다음 상태는 현재 상태에만 의존하고, 과거 이력에는 의존하지 않는다.

1.2.1.3 3. 방출 분포 \(B\) (Emission Distribution)

상태 \(k\)에서 관측값 \(O_t\)가 생성되는 분포:

\[b_k(O_t) = P(O_t \mid S_t = k)\]

Gaussian HMM (연속형 관측):

\[O_t \mid S_t = k \sim \mathcal{N}(\mu_k, \Sigma_k)\]

각 상태 \(k\)는 고유한 평균 벡터 \(\mu_k\)와 공분산 행렬 \(\Sigma_k\)를 가진다.

예시:

상태 평균 만족도 평균 세션 길이 평균 감정 점수
탐색 중 3.5 5분 0.1
학습 중 4.2 15분 0.4
문제 해결 3.0 20분 -0.1
이탈 직전 2.2 3분 -0.5

1.2.2 HMM의 그래프 구조

잠재 상태: S₁ → S₂ → S₃ → S₄ → S₅ → ... (Markov chain)
             ↓    ↓    ↓    ↓    ↓
관측값:     O₁   O₂   O₃   O₄   O₅ → ... (조건부 독립)

→ 관측값은 직접 연결되지 않음 (상태를 통해서만 연결)
→ 상태가 주어지면 관측값은 독립 (조건부 독립 가정)

1.3 HMM의 3가지 핵심 문제

1.3.1 문제 1: Evaluation — Forward Algorithm

질문: 모델 \(\lambda = (\pi, A, B)\)가 주어졌을 때, 관측 시퀀스 \(O = (O_1, \ldots, O_T)\)의 확률 \(P(O \mid \lambda)\)는?

용도: 모델 비교 (어떤 모델이 데이터를 더 잘 설명하는가)

Forward 변수:

\[\alpha_t(k) = P(O_1, O_2, \ldots, O_t, S_t = k \mid \lambda)\]

재귀 계산:

\[\alpha_1(k) = \pi_k \cdot b_k(O_1)\]

\[\alpha_{t+1}(j) = \left[\sum_{k=1}^{K} \alpha_t(k) \cdot a_{kj}\right] \cdot b_j(O_{t+1})\]

최종 확률:

\[P(O \mid \lambda) = \sum_{k=1}^{K} \alpha_T(k)\]

계산 복잡도: \(O(K^2 T)\) — 열거(enumerate)의 \(O(K^T)\)에 비해 효율적

1.3.2 문제 2: Decoding — Viterbi Algorithm

질문: 관측 시퀀스 \(O\)가 주어졌을 때, 가장 확률 높은 상태 시퀀스 \(S^* = (S_1^*, \ldots, S_T^*)\)는?

용도: 각 시점에서 사용자가 어떤 상태에 있는지 추론

Viterbi 변수:

\[\delta_t(k) = \max_{S_1, \ldots, S_{t-1}} P(S_1, \ldots, S_{t-1}, S_t = k, O_1, \ldots, O_t \mid \lambda)\]

재귀 계산:

\[\delta_1(k) = \pi_k \cdot b_k(O_1)\]

\[\delta_{t+1}(j) = \left[\max_{k} \delta_t(k) \cdot a_{kj}\right] \cdot b_j(O_{t+1})\]

역추적 (Backtracking):

\[\psi_{t+1}(j) = \arg\max_{k} \delta_t(k) \cdot a_{kj}\]

\[S_T^* = \arg\max_{k} \delta_T(k)\]

\[S_t^* = \psi_{t+1}(S_{t+1}^*)\]

1.3.3 문제 3: Learning — Baum-Welch (EM) Algorithm

질문: 관측 시퀀스 \(O\)로부터 모델 파라미터 \(\lambda = (\pi, A, B)\)를 어떻게 추정하는가?

용도: 데이터에서 HMM 학습

Baum-Welch는 EM 알고리즘의 HMM 특수 버전이다:

E-step: Forward-Backward 알고리즘으로 사후 확률 계산

\[\gamma_t(k) = P(S_t = k \mid O, \lambda)\]

\[\xi_t(i, j) = P(S_t = i, S_{t+1} = j \mid O, \lambda)\]

M-step: 파라미터 업데이트

\[\hat{\pi}_k = \gamma_1(k)\]

\[\hat{a}_{ij} = \frac{\sum_{t=1}^{T-1} \xi_t(i, j)}{\sum_{t=1}^{T-1} \gamma_t(i)}\]

\[\hat{\mu}_k = \frac{\sum_{t=1}^{T} \gamma_t(k) \cdot O_t}{\sum_{t=1}^{T} \gamma_t(k)}\]

\[\hat{\Sigma}_k = \frac{\sum_{t=1}^{T} \gamma_t(k) (O_t - \hat{\mu}_k)(O_t - \hat{\mu}_k)^{\top}}{\sum_{t=1}^{T} \gamma_t(k)}\]

수렴: log-likelihood가 더 이상 개선되지 않을 때까지 E-M 반복

1.3.4 3문제 요약

문제 알고리즘 입력 출력 용도
Evaluation Forward \(O, \lambda\) \(P(O \mid \lambda)\) 모델 비교
Decoding Viterbi \(O, \lambda\) \(S^*\) (최적 상태 시퀀스) 상태 추론
Learning Baum-Welch \(O\) \(\hat{\lambda}\) 파라미터 추정

1.4 잠재 상태 수 결정

HMM에서 가장 중요한 하이퍼파라미터는 잠재 상태 수 \(K\)이다. 이는 데이터로부터 자동으로 결정되지 않으므로 모델 선택이 필요하다.

1.4.1 BIC/AIC 기반 선택

다양한 \(K\) 값으로 HMM을 적합하고 정보 기준(information criterion)으로 비교한다.

\[\text{AIC} = -2 \log L + 2p\]

\[\text{BIC} = -2 \log L + p \log N\]

여기서:

  • \(\log L\): 로그 우도
  • \(p\): 파라미터 수 = \(K^2 + K \cdot d + K \cdot d(d+1)/2 + K - 1\)
    • \(K^2\): 전환 행렬 (실제로는 \(K(K-1)\), 행 합 제약)
    • \(K \cdot d\): 평균 벡터
    • \(K \cdot d(d+1)/2\): 공분산 행렬 (full)
    • \(K - 1\): 초기 확률
  • \(N\): 총 관측 수
from hmmlearn import hmm
import numpy as np

def select_n_states(X, lengths, k_range=range(2, 8)):
    """BIC/AIC로 최적 상태 수 선택"""
    results = []

    for k in k_range:
        model = hmm.GaussianHMM(
            n_components=k,
            covariance_type="full",
            n_iter=200,
            random_state=42
        )
        try:
            model.fit(X, lengths)
            log_likelihood = model.score(X, lengths)

            # 파라미터 수 계산
            d = X.shape[1]  # 관측 변수 차원
            n_params = (k * k      # 전환 행렬
                       + k * d     # 평균
                       + k * d * (d + 1) // 2  # 공분산 (full)
                       + k - 1)    # 초기 확률

            n_obs = X.shape[0]
            aic = -2 * log_likelihood + 2 * n_params
            bic = -2 * log_likelihood + n_params * np.log(n_obs)

            results.append({
                "k": k,
                "log_likelihood": log_likelihood,
                "aic": aic,
                "bic": bic,
                "n_params": n_params
            })
        except Exception as e:
            print(f"k={k} failed: {e}")

    return pd.DataFrame(results)

results = select_n_states(X_hmm, lengths)
print(results.to_string(index=False))
예시 출력:
 k  log_likelihood      aic       bic  n_params
 2       -4521.3     9082.6    9152.1       20
 3       -4102.7     8271.4    8398.2       33
 4       -3891.2     7882.4    8069.5       50   ← BIC 최소
 5       -3852.1     7844.2    8101.6       70
 6       -3831.5     7843.0    8178.8       90
 7       -3818.3     7856.6    8278.8      110

BIC 선택 기준: BIC가 가장 작은 \(K\)를 선택 (BIC는 모델 복잡도에 더 큰 페널티)

1.4.2 도메인 지식 활용

통계적 기준만으로는 부족할 수 있다. 도메인 지식을 결합한다:

\(K\) 해석 가능성 비고
2 “활성” vs “비활성” 너무 단순
3 “참여” vs “중립” vs “이탈” 적절한 시작점
4 “탐색” vs “학습” vs “문제해결” vs “이탈직전” 풍부한 해석
5+ 상태 간 구분이 불명확해짐 과적합 위험

권장: BIC 최소 + 상태별 방출 분포가 해석 가능한 \(K\)를 선택


1.5 실무 예시: AI Agent 사용자 상태 전환

1.5.1 데이터 준비

import numpy as np
import pandas as pd
from hmmlearn import hmm

# AI Agent 사용자 행동 데이터
np.random.seed(42)
n_users = 150
max_weeks = 12

obs_sequences = []
lengths = []
user_ids = []

for uid in range(n_users):
    # 사용자별 시퀀스 길이 (4~12주)
    n_weeks = np.random.randint(4, max_weeks + 1)

    # 관측 피처: [만족도, 세션 길이(분), 감정 점수]
    base_satisfaction = np.random.normal(3.5, 0.5)
    trajectory = np.random.choice(["stable", "declining", "growing"], p=[0.4, 0.35, 0.25])

    obs = []
    for w in range(n_weeks):
        if trajectory == "declining":
            sat = base_satisfaction - 0.15 * w + np.random.normal(0, 0.3)
        elif trajectory == "growing":
            sat = base_satisfaction + 0.1 * w + np.random.normal(0, 0.2)
        else:
            sat = base_satisfaction + np.random.normal(0, 0.3)

        session_len = max(1, 10 + 3 * sat + np.random.normal(0, 3))
        emotion = 0.3 * (sat - 3) + np.random.normal(0, 0.2)

        obs.append([np.clip(sat, 1, 5), session_len, np.clip(emotion, -1, 1)])

    obs_array = np.array(obs)
    obs_sequences.append(obs_array)
    lengths.append(n_weeks)
    user_ids.append(uid)

X_hmm = np.vstack(obs_sequences)
print(f"총 관측 수: {X_hmm.shape[0]}, 피처 수: {X_hmm.shape[1]}")
print(f"사용자 수: {n_users}, 시퀀스 길이 범위: {min(lengths)}~{max(lengths)}")

1.5.2 HMM 적합

# Gaussian HMM (4개 잠재 상태)
model_hmm = hmm.GaussianHMM(
    n_components=4,
    covariance_type="full",
    n_iter=200,
    random_state=42,
    verbose=False
)
model_hmm.fit(X_hmm, lengths)

print(f"수렴 여부: {model_hmm.monitor_.converged}")
print(f"Log-likelihood: {model_hmm.score(X_hmm, lengths):.1f}")

1.5.3 학습된 파라미터 해석

# 초기 상태 확률
print("초기 상태 확률 π:")
for k in range(4):
    print(f"  State {k}: {model_hmm.startprob_[k]:.3f}")

# 각 상태의 방출 분포 (평균)
feature_names = ["만족도", "세션길이(분)", "감정점수"]
print("\n방출 분포 평균 μ_k:")
for k in range(4):
    means = model_hmm.means_[k]
    print(f"  State {k}: " + ", ".join(
        f"{fn}={m:.2f}" for fn, m in zip(feature_names, means)
    ))
초기 상태 확률 π:
  State 0: 0.412   ← "탐색 중" (가장 흔한 시작)
  State 1: 0.338   ← "학습 중"
  State 2: 0.195   ← "문제 해결"
  State 3: 0.055   ← "이탈 직전" (드문 시작)

방출 분포 평균 μ_k:
  State 0: 만족도=3.52, 세션길이=12.8분, 감정점수=0.08
  State 1: 만족도=4.21, 세션길이=18.3분, 감정점수=0.42
  State 2: 만족도=2.98, 세션길이=15.1분, 감정점수=-0.12
  State 3: 만족도=2.15, 세션길이=5.2분,  감정점수=-0.48

1.5.4 상태 전환 행렬 시각화

import seaborn as sns
import matplotlib.pyplot as plt

trans_mat = model_hmm.transmat_
state_names = ["탐색 중", "학습 중", "문제 해결", "이탈 직전"]

fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(
    trans_mat,
    xticklabels=state_names,
    yticklabels=state_names,
    annot=True, fmt=".2f",
    cmap="Blues",
    vmin=0, vmax=1,
    ax=ax
)
ax.set_title("상태 전환 행렬 A")
ax.set_ylabel("현재 상태 (t)")
ax.set_xlabel("다음 상태 (t+1)")
plt.tight_layout()
plt.show()
상태 전환 행렬 (예시):
                탐색     학습    문제해결  이탈직전
탐색 중    │  0.65    0.20     0.10     0.05
학습 중    │  0.10    0.72     0.12     0.06
문제 해결  │  0.08    0.15     0.60     0.17   ← 이탈 위험 경로
이탈 직전  │  0.05    0.03     0.10     0.82   ← 자기 흡수성 높음

핵심 인사이트:
1. "이탈 직전" → "이탈 직전" = 0.82 : 한번 진입하면 빠져나오기 어려움
2. "문제 해결" → "이탈 직전" = 0.17 : 문제 해결 단계가 이탈의 주요 진입점
3. "학습 중" → "학습 중" = 0.72 : 학습 상태는 안정적

1.5.5 개별 사용자 상태 시퀀스 추론

# Viterbi 디코딩: 각 사용자의 최적 상태 시퀀스
state_sequences = {}
offset = 0
for uid, length in zip(user_ids, lengths):
    obs = X_hmm[offset:offset + length]
    states = model_hmm.predict(obs)
    state_sequences[uid] = states
    offset += length

# 예시 출력
for uid in [0, 1, 2, 5, 10]:
    seq = state_sequences[uid]
    state_labels = [state_names[s] for s in seq]
    print(f"User {uid:3d}: {' → '.join(state_labels)}")
User   0: 탐색 중 → 탐색 중 → 학습 중 → 학습 중 → 학습 중 → 학습 중
User   1: 탐색 중 → 탐색 중 → 문제 해결 → 문제 해결 → 이탈 직전 → 이탈 직전
User   2: 학습 중 → 학습 중 → 학습 중 → 학습 중 → 학습 중 → 탐색 중
User   5: 탐색 중 → 문제 해결 → 이탈 직전 → 이탈 직전 → 이탈 직전 → 이탈 직전
User  10: 학습 중 → 학습 중 → 학습 중 → 학습 중 → 학습 중 → 학습 중

1.5.6 상태별 사용자 분포 시각화

# 마지막 시점의 상태 분포
last_states = {uid: seq[-1] for uid, seq in state_sequences.items()}
state_dist = pd.Series(last_states).value_counts().sort_index()

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 1. 마지막 시점 상태 분포
axes[0].bar([state_names[i] for i in state_dist.index], state_dist.values,
            color=["#3498db", "#2ecc71", "#f39c12", "#e74c3c"])
axes[0].set_title("마지막 시점 상태 분포")
axes[0].set_ylabel("사용자 수")

# 2. 시간에 따른 상태 비율 변화
state_by_week = []
offset = 0
for uid, length in zip(user_ids, lengths):
    for w in range(length):
        state_by_week.append({
            "week": w + 1,
            "state": state_names[state_sequences[uid][w]]
        })
    offset += length

df_states = pd.DataFrame(state_by_week)
state_props = df_states.groupby(["week", "state"]).size().unstack(fill_value=0)
state_props = state_props.div(state_props.sum(axis=1), axis=0)

state_props.plot.area(ax=axes[1], color=["#3498db", "#2ecc71", "#f39c12", "#e74c3c"])
axes[1].set_title("시간에 따른 상태 비율 변화")
axes[1].set_xlabel("주 (Week)")
axes[1].set_ylabel("비율")
axes[1].legend(title="상태")

plt.tight_layout()
plt.show()

1.6 R 코드: depmixS4

library(depmixS4)
library(ggplot2)
library(dplyr)
library(tidyr)

# 데이터 준비
set.seed(42)
n_users <- 150
records <- list()
idx <- 1

for (uid in 1:n_users) {
  n_weeks <- sample(4:12, 1)
  base_sat <- rnorm(1, 3.5, 0.5)
  trajectory <- sample(c("stable", "declining", "growing"),
                        1, prob = c(0.4, 0.35, 0.25))

  for (w in 1:n_weeks) {
    if (trajectory == "declining") {
      sat <- base_sat - 0.15 * w + rnorm(1, 0, 0.3)
    } else if (trajectory == "growing") {
      sat <- base_sat + 0.1 * w + rnorm(1, 0, 0.2)
    } else {
      sat <- base_sat + rnorm(1, 0, 0.3)
    }
    sat <- pmin(pmax(sat, 1), 5)
    session_len <- max(1, 10 + 3 * sat + rnorm(1, 0, 3))
    emotion <- pmin(pmax(0.3 * (sat - 3) + rnorm(1, 0, 0.2), -1), 1)

    records[[idx]] <- data.frame(
      user_id = uid, week = w,
      satisfaction = sat,
      session_length = session_len,
      emotion_score = emotion
    )
    idx <- idx + 1
  }
}

df <- bind_rows(records)

# depmixS4로 HMM 적합
# 각 사용자가 하나의 시퀀스
n_states <- 4

# 응답 변수 3개 (Gaussian)
mod <- depmix(
  list(satisfaction ~ 1,
       session_length ~ 1,
       emotion_score ~ 1),
  data = df,
  nstates = n_states,
  family = list(gaussian(), gaussian(), gaussian()),
  ntimes = table(df$user_id)  # 각 사용자의 시퀀스 길이
)

# EM으로 적합
fit_mod <- fit(mod)
summary(fit_mod)

# BIC/AIC
cat(sprintf("AIC: %.1f\n", AIC(fit_mod)))
cat(sprintf("BIC: %.1f\n", BIC(fit_mod)))

1.6.1 모델 선택 (R)

# 다양한 상태 수로 BIC 비교
bic_results <- data.frame(k = integer(), bic = numeric())

for (k in 2:7) {
  tryCatch({
    mod_k <- depmix(
      list(satisfaction ~ 1,
           session_length ~ 1,
           emotion_score ~ 1),
      data = df,
      nstates = k,
      family = list(gaussian(), gaussian(), gaussian()),
      ntimes = table(df$user_id)
    )
    fit_k <- fit(mod_k, verbose = FALSE)
    bic_results <- rbind(bic_results,
                          data.frame(k = k, bic = BIC(fit_k)))
  }, error = function(e) {
    cat(sprintf("k=%d failed: %s\n", k, e$message))
  })
}

# BIC가 최소인 k
best_k <- bic_results$k[which.min(bic_results$bic)]
cat(sprintf("최적 상태 수: %d\n", best_k))

ggplot(bic_results, aes(x = k, y = bic)) +
  geom_line() + geom_point(size = 3) +
  geom_vline(xintercept = best_k, linetype = "dashed", color = "red") +
  labs(title = "BIC에 의한 잠재 상태 수 선택",
       x = "상태 수 (K)", y = "BIC") +
  theme_minimal()

1.6.2 상태 추론 (R)

# Viterbi 디코딩
states <- posterior(fit_mod)

# 각 사용자의 상태 시퀀스
df$state <- states$state

# 상태별 평균 관측값
df %>%
  group_by(state) %>%
  summarise(
    mean_satisfaction = mean(satisfaction),
    mean_session_length = mean(session_length),
    mean_emotion = mean(emotion_score),
    n = n()
  ) %>%
  arrange(state)

1.7 HMM 결과 활용: 이탈 직전 상태 감지 → 개입 전략

1.7.1 실시간 상태 감지

def detect_at_risk_users(model_hmm, obs_sequences, lengths, user_ids,
                          risk_state=3, threshold=0.5):
    """이탈 직전 상태에 있을 확률이 높은 사용자를 감지"""
    at_risk = []

    offset = 0
    for uid, length in zip(user_ids, lengths):
        obs = X_hmm[offset:offset + length]

        # 마지막 시점의 상태 확률 (posterior)
        posteriors = model_hmm.predict_proba(obs)
        last_prob = posteriors[-1]  # 마지막 시점

        risk_prob = last_prob[risk_state]
        current_state = np.argmax(last_prob)

        if risk_prob > threshold:
            at_risk.append({
                "user_id": uid,
                "current_state": state_names[current_state],
                "risk_probability": risk_prob,
                "weeks_active": length,
            })

        offset += length

    return pd.DataFrame(at_risk).sort_values("risk_probability", ascending=False)

at_risk_users = detect_at_risk_users(model_hmm, obs_sequences, lengths, user_ids)
print(f"이탈 위험 사용자: {len(at_risk_users)}명 / {n_users}명")
print(at_risk_users.head(10))

1.7.2 상태 전환 기반 개입 규칙

# 전환 행렬에서 위험 경로 식별
trans_mat = model_hmm.transmat_

print("위험 전환 경로 (이탈 직전으로의 전환 확률):")
for i, name in enumerate(state_names):
    prob_to_churn = trans_mat[i, 3]  # state 3 = 이탈 직전
    risk_level = "🔴 높음" if prob_to_churn > 0.15 else (
                 "🟡 중간" if prob_to_churn > 0.08 else "🟢 낮음")
    print(f"  {name:10s} → 이탈직전: {prob_to_churn:.2f}  ({risk_level})")

# 개입 전략
intervention_rules = {
    "문제 해결": {
        "trigger": "이 상태 2주 이상 지속",
        "action": "전문 상담 에이전트로 연결, 문제 해결 지원 강화",
        "priority": "HIGH"
    },
    "이탈 직전": {
        "trigger": "이 상태 진입 즉시",
        "action": "할인 쿠폰 제공, 온보딩 재시작, 관리자 연락",
        "priority": "CRITICAL"
    },
    "탐색 중": {
        "trigger": "3주 이상 탐색만 지속",
        "action": "맞춤형 콘텐츠 추천, 학습 경로 제안",
        "priority": "MEDIUM"
    }
}

1.7.3 이탈 예측 확률 계산

def predict_churn_probability(model_hmm, obs, n_steps=4, churn_state=3):
    """향후 n_steps 이내에 이탈 직전 상태에 도달할 확률"""
    # 현재 상태 확률
    posteriors = model_hmm.predict_proba(obs)
    current_state_prob = posteriors[-1]  # 마지막 시점의 상태 확률

    # 전환 행렬의 거듭제곱으로 n_steps 후 상태 확률 계산
    trans_mat = model_hmm.transmat_
    future_prob = current_state_prob

    for _ in range(n_steps):
        future_prob = future_prob @ trans_mat

    return future_prob[churn_state]

# 모든 사용자의 4주 이내 이탈 확률
offset = 0
churn_probs = []
for uid, length in zip(user_ids, lengths):
    obs = X_hmm[offset:offset + length]
    prob = predict_churn_probability(model_hmm, obs, n_steps=4)
    churn_probs.append({"user_id": uid, "churn_prob_4w": prob})
    offset += length

df_churn = pd.DataFrame(churn_probs)
print(f"4주 이내 이탈 확률 > 50%: {(df_churn['churn_prob_4w'] > 0.5).sum()}명")

1.8 HMM의 한계와 확장

1.8.1 HMM의 한계

한계 설명
1차 Markov 가정 다음 상태가 현재 상태에만 의존 → 장기 의존성 포착 불가
상태 수 고정 \(K\)를 미리 지정해야 함 → 데이터에 따라 달라짐
관측 독립 가정 상태가 주어지면 관측값이 독립 → 관측 간 자기상관 무시
정상성(Stationarity) 전환 행렬이 시간에 따라 변하지 않음 → 시간 가변 전환 불가
공변량 미반영 기본 HMM은 외부 공변량(세그먼트, 개인화 여부)을 포함하지 않음

1.8.2 확장 모델

1.8.2.1 1. Input-Output HMM (IOHMM)

전환 확률과 방출 분포가 외부 입력(공변량)에 의존한다.

\[a_{ij}(u_t) = P(S_{t+1} = j \mid S_t = i, u_t)\]

예: 개인화 여부(u_t)에 따라 전환 확률이 달라짐
    개인화 O → "학습 중" → "학습 중" 전환 확률 증가
    개인화 X → "학습 중" → "이탈 직전" 전환 확률 증가

1.8.2.2 2. Hierarchical HMM (HHMM)

상태 자체가 하위 HMM을 포함하는 계층 구조:

상위 상태: "활성" vs "비활성"
   └── "활성" 내 하위 상태: "탐색" → "학습" → "문제해결"
   └── "비활성" 내 하위 상태: "휴면" → "이탈준비" → "이탈"

1.8.2.3 3. Non-homogeneous HMM

전환 행렬이 시간에 따라 변한다:

\[A_t = f(t, \text{covariates})\]

depmixS4에서 구현 가능:

# 전환 확률이 week에 의존
mod_nh <- depmix(
  list(satisfaction ~ 1, session_length ~ 1),
  data = df,
  nstates = 4,
  family = list(gaussian(), gaussian()),
  ntimes = table(df$user_id),
  transition = ~ week  # 전환 확률이 시간에 의존
)
fit_nh <- fit(mod_nh)

1.8.2.4 4. Hidden Semi-Markov Model (HSMM)

각 상태의 체류 시간(sojourn time)을 명시적으로 모델링:

\[d_k \sim \text{NegBin}(\mu_k, r_k)\]

기본 HMM에서는 체류 시간이 기하분포 (memoryless)이지만, HSMM은 이 가정을 완화한다.

1.8.3 확장 모델 비교

모델 장점 구현
기본 HMM 단순, 빠름 hmmlearn, depmixS4
IOHMM 공변량 반영 depmixS4 (transition formula)
HHMM 계층 구조 직접 구현 필요
Non-homogeneous 시간 가변 전환 depmixS4
HSMM 유연한 체류 시간 hsmm (R), pyhsmm (Python)

1.9 요약

핵심 내용
HMM은 관측 불가능한 잠재 상태의 전환을 모델링하여 종단 데이터의 숨겨진 구조를 발견
3요소 초기 확률 \(\pi\), 전환 행렬 \(A\), 방출 분포 \(B\)
3문제 Evaluation (Forward), Decoding (Viterbi), Learning (Baum-Welch)
상태 수 BIC/AIC + 도메인 지식으로 결정
실무 활용 이탈 직전 상태 감지 → 개입 시점 결정 → 전환 확률 기반 예측
한계 1차 Markov, 정상성, 공변량 미반영 → IOHMM, HSMM으로 확장

다음: 24 — 고차원 종단 데이터의 정규화 기법 — Lasso, Elastic Net, glmmLasso

Subscribe

Enjoy this blog? Get notified of new posts by email: