1 LSTM/GRU for Longitudinal Data
1.1 왜 RNN/LSTM인가
종단 데이터는 가변 길이 시퀀스다.
- 사용자 A: 8주 관측 (Week 1~8)
- 사용자 B: 3주 관측 (Week 1~3, 이후 이탈)
- 사용자 C: 12주 관측 (Week 1~12)
XGBoost나 로지스틱 회귀는 고정 길이 피처 벡터를 입력받는다. 가변 길이 시퀀스를 다루려면 수동으로 lag 피처를 만들거나, 시퀀스를 잘라야 한다. 이 과정에서 시간적 의존성(temporal dependency)이 손실된다.
순환 신경망(RNN)은 시퀀스 길이에 관계없이 한 시점씩 처리하면서 내부 상태(hidden state)를 업데이트한다. 이전 시점의 정보가 hidden state에 축적되므로, 과거가 현재에 미치는 영향을 자연스럽게 모델링한다.
시퀀스: x₁, x₂, x₃, ..., x_T
↓ ↓ ↓ ↓
상태: h₁ → h₂ → h₃ → ... → h_T → 예측 ŷ
1.2 Vanilla RNN의 한계: 기울기 소실/폭발
1.2.1 RNN 기본 구조
시점 \(t\)에서의 hidden state 업데이트:
\[h_t = \tanh(W_{hh} h_{t-1} + W_{xh} x_t + b_h)\]
출력:
\[\hat{y}_t = W_{hy} h_t + b_y\]
1.2.2 기울기 소실 문제
역전파 시 \(\frac{\partial L}{\partial h_1}\)을 계산하려면 체인 룰을 적용한다:
\[\frac{\partial L}{\partial h_1} = \frac{\partial L}{\partial h_T} \prod_{t=2}^{T} \frac{\partial h_t}{\partial h_{t-1}}\]
각 항은:
\[\frac{\partial h_t}{\partial h_{t-1}} = \text{diag}(\tanh'(z_t)) \cdot W_{hh}\]
여기서 \(\tanh'(z) \in (0, 1]\)이므로, 이 곱이 \(T\)번 반복되면:
- \(\|W_{hh}\| < 1\)일 때: 기울기가 지수적으로 감소 (기울기 소실)
- \(\|W_{hh}\| > 1\)일 때: 기울기가 지수적으로 증가 (기울기 폭발)
| 시퀀스 길이 T | 기울기 크기 (\(\|W_{hh}\|=0.9\)) | 실질적 의미 |
|---|---|---|
| 10 | \(0.9^{10} = 0.35\) | 35% 전달 |
| 50 | \(0.9^{50} = 0.005\) | 0.5% 전달 |
| 100 | \(0.9^{100} \approx 0\) | 사실상 전달 불가 |
결론: Vanilla RNN은 20~30 시점 이상의 장기 의존성을 학습하지 못한다.
1.3 LSTM 아키텍처 상세
LSTM(Long Short-Term Memory)은 게이트(gate) 메커니즘으로 기울기 소실 문제를 해결한다.
1.3.1 두 가지 상태
- Cell state \(C_t\): 장기 기억. 컨베이어 벨트처럼 정보를 거의 변형 없이 전달
- Hidden state \(h_t\): 단기 기억. 현재 시점의 출력에 사용
1.3.2 Forget Gate (잊기 게이트)
이전 Cell state에서 무엇을 잊을지 결정:
\[f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f)\]
- \(\sigma\): 시그모이드 함수 (출력 0~1)
- \(f_t \approx 0\): 해당 정보를 완전히 잊음
- \(f_t \approx 1\): 해당 정보를 완전히 유지
1.3.3 Input Gate (입력 게이트)
새 정보 중 무엇을 기억할지 결정:
\[i_t = \sigma(W_i \cdot [h_{t-1}, x_t] + b_i)\]
후보 Cell state:
\[\tilde{C}_t = \tanh(W_C \cdot [h_{t-1}, x_t] + b_C)\]
1.3.4 Cell State 업데이트
\[C_t = f_t \odot C_{t-1} + i_t \odot \tilde{C}_t\]
- \(\odot\): 원소별 곱(Hadamard product)
- 첫째 항: 이전 기억 중 유지할 부분
- 둘째 항: 새로 추가할 정보
이 구조가 기울기 소실을 해결하는 이유: \(C_t\)의 역전파 경로에서 \(f_t\)가 곱해지는데, \(f_t \approx 1\)이면 기울기가 거의 감쇄 없이 전달된다. Vanilla RNN의 \(W_{hh}\) 반복 곱과 달리 게이트가 적응적으로 기울기 흐름을 제어한다.
1.3.5 Output Gate (출력 게이트)
현재 Hidden state로 무엇을 내보낼지 결정:
\[o_t = \sigma(W_o \cdot [h_{t-1}, x_t] + b_o)\]
\[h_t = o_t \odot \tanh(C_t)\]
1.3.6 전체 흐름 요약
입력: x_t, h_{t-1}, C_{t-1}
↓
┌─────────────────────────────────┐
│ f_t = σ(W_f · [h_{t-1}, x_t]) │ Forget Gate
│ i_t = σ(W_i · [h_{t-1}, x_t]) │ Input Gate
│ C̃_t = tanh(W_C · [h,x]) │ Candidate
│ C_t = f_t⊙C_{t-1} + i_t⊙C̃_t │ Cell Update
│ o_t = σ(W_o · [h_{t-1}, x_t]) │ Output Gate
│ h_t = o_t ⊙ tanh(C_t) │ Hidden State
└─────────────────────────────────┘
출력: h_t, C_t
1.4 GRU: LSTM 대비 단순화
GRU(Gated Recurrent Unit)는 LSTM을 2개 게이트로 단순화한다.
1.4.1 GRU 게이트
Reset gate (리셋 게이트):
\[r_t = \sigma(W_r \cdot [h_{t-1}, x_t] + b_r)\]
Update gate (업데이트 게이트):
\[z_t = \sigma(W_z \cdot [h_{t-1}, x_t] + b_z)\]
1.4.2 GRU 상태 업데이트
후보 hidden state:
\[\tilde{h}_t = \tanh(W_h \cdot [r_t \odot h_{t-1}, x_t] + b_h)\]
최종 hidden state:
\[h_t = (1 - z_t) \odot h_{t-1} + z_t \odot \tilde{h}_t\]
- \(z_t\)가 Forget gate와 Input gate의 역할을 동시에 수행
- Cell state가 없고 Hidden state만 존재
1.4.3 파라미터 수 비교
| 모델 | 게이트 수 | 상태 수 | hidden_size=64 기준 파라미터 |
|---|---|---|---|
| LSTM | 3 (forget, input, output) + candidate | 2 (C, h) | \(4 \times (64 \times (64 + d) + 64)\) |
| GRU | 2 (reset, update) | 1 (h) | \(3 \times (64 \times (64 + d) + 64)\) |
GRU는 LSTM 대비 약 75% 파라미터로 비슷한 성능을 낸다. 소규모 데이터에서는 GRU가 오히려 유리할 수 있다.
1.5 종단 데이터 적용: 가변 길이 처리
1.5.1 패딩과 마스킹
가변 길이 시퀀스를 미니배치로 묶으려면 패딩이 필요하다.
사용자 A: [x₁, x₂, x₃, x₄, x₅] (길이 5)
사용자 B: [x₁, x₂, x₃] (길이 3)
사용자 C: [x₁, x₂, x₃, x₄, x₅, x₆] (길이 6)
패딩 후 (max_len=6):
사용자 A: [x₁, x₂, x₃, x₄, x₅, 0 ] mask: [1,1,1,1,1,0]
사용자 B: [x₁, x₂, x₃, 0, 0, 0 ] mask: [1,1,1,0,0,0]
사용자 C: [x₁, x₂, x₃, x₄, x₅, x₆] mask: [1,1,1,1,1,1]
1.5.2 pack_padded_sequence
PyTorch의 pack_padded_sequence는 패딩된 시퀀스를 압축하여 LSTM이 패딩을 무시하도록 한다. 마스킹보다 연산 효율이 높다.
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
# 시퀀스 길이 기준 내림차순 정렬 (pack 요구 조건)
lengths = mask.sum(dim=1).long()
sorted_lengths, sorted_idx = lengths.sort(descending=True)
sorted_x = x[sorted_idx]
# 패킹
packed = pack_padded_sequence(sorted_x, sorted_lengths.cpu(), batch_first=True)
# LSTM 통과
packed_out, (h_n, c_n) = lstm(packed)
# 언패킹
output, _ = pad_packed_sequence(packed_out, batch_first=True)
# 원래 순서 복원
_, unsorted_idx = sorted_idx.sort()
output = output[unsorted_idx]1.6 PyTorch 구현: 완전한 학습 파이프라인
1.6.1 데이터셋
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import numpy as np
from sklearn.metrics import roc_auc_score
class LongitudinalDataset(Dataset):
"""사용자별 가변 길이 시계열 데이터셋"""
def __init__(self, df, features, target, max_len=12):
self.sequences = []
self.targets = []
self.lengths = []
for uid in df["user_id"].unique():
user_df = df[df["user_id"] == uid].sort_values("week")
seq = user_df[features].values
tgt = user_df[target].values[-1]
T = min(len(seq), max_len)
seq = seq[:T]
self.sequences.append(seq)
self.targets.append(tgt)
self.lengths.append(T)
self.max_len = max_len
self.n_features = len(features)
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
T = self.lengths[idx]
pad_len = self.max_len - T
# 패딩
if pad_len > 0:
seq = np.vstack([seq, np.zeros((pad_len, self.n_features))])
mask = np.array([1] * T + [0] * pad_len, dtype=np.float32)
return (
torch.FloatTensor(seq),
torch.FloatTensor(mask),
torch.FloatTensor([self.targets[idx]]),
torch.LongTensor([T])
)1.6.2 LSTM 이탈 예측 모델
class LSTMChurnPredictor(nn.Module):
"""LSTM 기반 이탈 예측 — 확장 버전"""
def __init__(self, input_size, hidden_size=64, n_layers=2,
dropout=0.3, bidirectional=False):
super().__init__()
self.hidden_size = hidden_size
self.n_layers = n_layers
self.bidirectional = bidirectional
self.n_directions = 2 if bidirectional else 1
# 입력 정규화
self.layer_norm = nn.LayerNorm(input_size)
# LSTM
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=n_layers,
batch_first=True,
dropout=dropout if n_layers > 1 else 0,
bidirectional=bidirectional
)
# 분류 헤드
classifier_input = hidden_size * self.n_directions
self.classifier = nn.Sequential(
nn.Linear(classifier_input, 64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(64, 32),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(self, x, mask, lengths):
batch_size = x.size(0)
# 입력 정규화
x = self.layer_norm(x)
# pack_padded_sequence
lengths_cpu = lengths.squeeze(-1).cpu()
sorted_lengths, sorted_idx = lengths_cpu.sort(descending=True)
sorted_x = x[sorted_idx]
packed = pack_padded_sequence(
sorted_x, sorted_lengths.clamp(min=1), batch_first=True
)
packed_out, (h_n, c_n) = self.lstm(packed)
output, _ = pad_packed_sequence(packed_out, batch_first=True)
# 원래 순서 복원
_, unsorted_idx = sorted_idx.sort()
output = output[unsorted_idx]
lengths_restored = lengths_cpu[unsorted_idx]
# 마지막 유효 시점의 hidden state
last_hidden = output[
torch.arange(batch_size),
(lengths_restored - 1).clamp(min=0),
:
]
return self.classifier(last_hidden)1.6.3 학습 루프 (조기 종료 + AUC 평가)
def train_model(model, train_loader, val_loader, epochs=100, patience=10, lr=1e-3):
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", patience=5, factor=0.5
)
criterion = nn.BCELoss()
best_val_loss = float("inf")
best_val_auc = 0.0
no_improve = 0
for epoch in range(epochs):
# --- Train ---
model.train()
train_loss = 0
for seq, mask, target, lengths in train_loader:
pred = model(seq, mask, lengths)
loss = criterion(pred, target)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
# --- Validation ---
model.eval()
val_loss = 0
all_preds, all_targets = [], []
with torch.no_grad():
for seq, mask, target, lengths in val_loader:
pred = model(seq, mask, lengths)
loss = criterion(pred, target)
val_loss += loss.item()
all_preds.extend(pred.numpy().flatten())
all_targets.extend(target.numpy().flatten())
val_loss /= len(val_loader)
val_auc = roc_auc_score(all_targets, all_preds)
scheduler.step(val_loss)
if (epoch + 1) % 5 == 0:
print(f"Epoch {epoch+1}: train_loss={train_loss:.4f}, "
f"val_loss={val_loss:.4f}, val_AUC={val_auc:.4f}")
# 조기 종료
if val_loss < best_val_loss:
best_val_loss = val_loss
best_val_auc = val_auc
torch.save(model.state_dict(), "best_lstm_model.pt")
no_improve = 0
else:
no_improve += 1
if no_improve >= patience:
print(f"Early stopping at epoch {epoch+1}. Best AUC: {best_val_auc:.4f}")
break
return best_val_auc
# 사용 예
features = ["satisfaction", "turn_count", "emotion_score", "personalized"]
train_dataset = LongitudinalDataset(train_df, features, "will_churn_next_week")
val_dataset = LongitudinalDataset(val_df, features, "will_churn_next_week")
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
model = LSTMChurnPredictor(input_size=len(features), bidirectional=False)
best_auc = train_model(model, train_loader, val_loader)1.7 Bidirectional LSTM
양방향 LSTM은 시퀀스를 앞→뒤와 뒤→앞 두 방향으로 동시에 처리한다.
순방향: h₁→ → h₂→ → h₃→ → h₄→
역방향: h₁← ← h₂← ← h₃← ← h₄←
결합: h_t = [h_t→ ; h_t←] (concatenation)
1.7.1 언제 사용하는가
| 상황 | 단방향 | 양방향 |
|---|---|---|
| 실시간 예측 (미래 모름) | O | X |
| 전체 시퀀스 사후 분류 | O | O (더 좋음) |
| 시퀀스 임베딩 추출 | O | O (더 좋음) |
종단 데이터에서:
- 이탈 예측 (실시간): 단방향 — 현재까지의 데이터만 사용
- 사후 패턴 분류: 양방향 — 전체 시퀀스를 보고 판단
# 양방향 LSTM 사용
model_bi = LSTMChurnPredictor(
input_size=len(features),
hidden_size=64,
bidirectional=True # hidden 출력이 128차원이 됨
)1.8 실무 예시: AI Agent 만족도 시퀀스 → 이탈 예측
1.8.1 데이터 구조
import pandas as pd
# AI Agent 개인화 실험 데이터
# 200명 사용자 × 가변 주 (3~12주) × 4개 피처
data = {
"user_id": [],
"week": [],
"satisfaction": [], # 1~10 만족도
"turn_count": [], # 주당 대화 횟수
"emotion_score": [], # 감정 분석 점수 (-1~1)
"personalized": [], # 개인화 적용 여부 (0/1)
"will_churn_next_week": [] # 다음 주 이탈 여부
}
# 이탈자 패턴: 만족도 하락 → 대화 감소 → 이탈
# 유지자 패턴: 만족도 안정/상승 → 대화 유지1.8.2 학습 → 평가 → 해석
# 1. 데이터 분할
from sklearn.model_selection import train_test_split
user_ids = df["user_id"].unique()
train_ids, test_ids = train_test_split(user_ids, test_size=0.2, random_state=42)
train_ids, val_ids = train_test_split(train_ids, test_size=0.2, random_state=42)
train_df = df[df["user_id"].isin(train_ids)]
val_df = df[df["user_id"].isin(val_ids)]
test_df = df[df["user_id"].isin(test_ids)]
# 2. 학습
model = LSTMChurnPredictor(input_size=4, hidden_size=64, n_layers=2)
best_auc = train_model(model, train_loader, val_loader)
# 3. 테스트 평가
model.load_state_dict(torch.load("best_lstm_model.pt"))
model.eval()
test_preds, test_labels = [], []
with torch.no_grad():
for seq, mask, target, lengths in test_loader:
pred = model(seq, mask, lengths)
test_preds.extend(pred.numpy().flatten())
test_labels.extend(target.numpy().flatten())
test_auc = roc_auc_score(test_labels, test_preds)
print(f"Test AUC: {test_auc:.4f}")
# 예상 출력: Test AUC: 0.82341.9 LSTM의 한계
| 한계 | 설명 | 대안 |
|---|---|---|
| 순차 처리 | \(h_t\)가 \(h_{t-1}\)에 의존 → 병렬화 불가 | TCN, Transformer |
| 긴 시퀀스 비효율 | T > 100이면 학습 느림 | TCN (병렬 convolution) |
| 장기 의존성 한계 | 기울기 소실이 완화되었지만 완전히 해결되지 않음 | Transformer (직접 참조) |
| 불규칙 시점 | 이산 시점 가정 | Neural ODE |
| 해석 가능성 | hidden state가 블랙박스 | Transformer (Attention) |
1.10 요약
| 항목 | 내용 |
|---|---|
| Vanilla RNN | 기울기 소실로 장기 의존성 학습 불가 |
| LSTM | 3개 게이트 + Cell state로 장기 기억 유지 |
| GRU | 2개 게이트, 파라미터 75% 수준, 비슷한 성능 |
| 가변 길이 처리 | 패딩 + 마스킹 또는 pack_padded_sequence |
| 양방향 LSTM | 사후 분류에 유리, 실시간 예측에는 단방향 |
| 핵심 한계 | 순차 처리, 긴 시퀀스 비효율 |