LSTM/GRU for Longitudinal Data

게이트 기반 순환 신경망으로 시퀀스 패턴 학습

종단 데이터의 가변 길이 시퀀스를 처리하는 LSTM/GRU를 상세히 다룬다. Vanilla RNN의 기울기 소실 문제부터 LSTM의 게이트 메커니즘, GRU의 단순화, 양방향 LSTM, 그리고 실무 구현까지 설명한다.

Statistics
Deep Learning
저자

Kwangmin Kim

공개

2026년 03월 08일

1 LSTM/GRU for Longitudinal Data

1.1 왜 RNN/LSTM인가

종단 데이터는 가변 길이 시퀀스다.

  • 사용자 A: 8주 관측 (Week 1~8)
  • 사용자 B: 3주 관측 (Week 1~3, 이후 이탈)
  • 사용자 C: 12주 관측 (Week 1~12)

XGBoost나 로지스틱 회귀는 고정 길이 피처 벡터를 입력받는다. 가변 길이 시퀀스를 다루려면 수동으로 lag 피처를 만들거나, 시퀀스를 잘라야 한다. 이 과정에서 시간적 의존성(temporal dependency)이 손실된다.

순환 신경망(RNN)은 시퀀스 길이에 관계없이 한 시점씩 처리하면서 내부 상태(hidden state)를 업데이트한다. 이전 시점의 정보가 hidden state에 축적되므로, 과거가 현재에 미치는 영향을 자연스럽게 모델링한다.

시퀀스: x₁, x₂, x₃, ..., x_T
         ↓    ↓    ↓          ↓
상태:   h₁ → h₂ → h₃ → ... → h_T → 예측 ŷ

1.2 Vanilla RNN의 한계: 기울기 소실/폭발

1.2.1 RNN 기본 구조

시점 \(t\)에서의 hidden state 업데이트:

\[h_t = \tanh(W_{hh} h_{t-1} + W_{xh} x_t + b_h)\]

출력:

\[\hat{y}_t = W_{hy} h_t + b_y\]

1.2.2 기울기 소실 문제

역전파 시 \(\frac{\partial L}{\partial h_1}\)을 계산하려면 체인 룰을 적용한다:

\[\frac{\partial L}{\partial h_1} = \frac{\partial L}{\partial h_T} \prod_{t=2}^{T} \frac{\partial h_t}{\partial h_{t-1}}\]

각 항은:

\[\frac{\partial h_t}{\partial h_{t-1}} = \text{diag}(\tanh'(z_t)) \cdot W_{hh}\]

여기서 \(\tanh'(z) \in (0, 1]\)이므로, 이 곱이 \(T\)번 반복되면:

  • \(\|W_{hh}\| < 1\)일 때: 기울기가 지수적으로 감소 (기울기 소실)
  • \(\|W_{hh}\| > 1\)일 때: 기울기가 지수적으로 증가 (기울기 폭발)
시퀀스 길이 T 기울기 크기 (\(\|W_{hh}\|=0.9\)) 실질적 의미
10 \(0.9^{10} = 0.35\) 35% 전달
50 \(0.9^{50} = 0.005\) 0.5% 전달
100 \(0.9^{100} \approx 0\) 사실상 전달 불가

결론: Vanilla RNN은 20~30 시점 이상의 장기 의존성을 학습하지 못한다.


1.3 LSTM 아키텍처 상세

LSTM(Long Short-Term Memory)은 게이트(gate) 메커니즘으로 기울기 소실 문제를 해결한다.

1.3.1 두 가지 상태

  • Cell state \(C_t\): 장기 기억. 컨베이어 벨트처럼 정보를 거의 변형 없이 전달
  • Hidden state \(h_t\): 단기 기억. 현재 시점의 출력에 사용

1.3.2 Forget Gate (잊기 게이트)

이전 Cell state에서 무엇을 잊을지 결정:

\[f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f)\]

  • \(\sigma\): 시그모이드 함수 (출력 0~1)
  • \(f_t \approx 0\): 해당 정보를 완전히 잊음
  • \(f_t \approx 1\): 해당 정보를 완전히 유지

1.3.3 Input Gate (입력 게이트)

새 정보 중 무엇을 기억할지 결정:

\[i_t = \sigma(W_i \cdot [h_{t-1}, x_t] + b_i)\]

후보 Cell state:

\[\tilde{C}_t = \tanh(W_C \cdot [h_{t-1}, x_t] + b_C)\]

1.3.4 Cell State 업데이트

\[C_t = f_t \odot C_{t-1} + i_t \odot \tilde{C}_t\]

  • \(\odot\): 원소별 곱(Hadamard product)
  • 첫째 항: 이전 기억 중 유지할 부분
  • 둘째 항: 새로 추가할 정보

이 구조가 기울기 소실을 해결하는 이유: \(C_t\)의 역전파 경로에서 \(f_t\)가 곱해지는데, \(f_t \approx 1\)이면 기울기가 거의 감쇄 없이 전달된다. Vanilla RNN의 \(W_{hh}\) 반복 곱과 달리 게이트가 적응적으로 기울기 흐름을 제어한다.

1.3.5 Output Gate (출력 게이트)

현재 Hidden state로 무엇을 내보낼지 결정:

\[o_t = \sigma(W_o \cdot [h_{t-1}, x_t] + b_o)\]

\[h_t = o_t \odot \tanh(C_t)\]

1.3.6 전체 흐름 요약

입력: x_t, h_{t-1}, C_{t-1}
         ↓
    ┌─────────────────────────────────┐
    │  f_t = σ(W_f · [h_{t-1}, x_t]) │  Forget Gate
    │  i_t = σ(W_i · [h_{t-1}, x_t]) │  Input Gate
    │  C̃_t = tanh(W_C · [h,x])       │  Candidate
    │  C_t = f_t⊙C_{t-1} + i_t⊙C̃_t  │  Cell Update
    │  o_t = σ(W_o · [h_{t-1}, x_t]) │  Output Gate
    │  h_t = o_t ⊙ tanh(C_t)         │  Hidden State
    └─────────────────────────────────┘
출력: h_t, C_t

1.4 GRU: LSTM 대비 단순화

GRU(Gated Recurrent Unit)는 LSTM을 2개 게이트로 단순화한다.

1.4.1 GRU 게이트

Reset gate (리셋 게이트):

\[r_t = \sigma(W_r \cdot [h_{t-1}, x_t] + b_r)\]

Update gate (업데이트 게이트):

\[z_t = \sigma(W_z \cdot [h_{t-1}, x_t] + b_z)\]

1.4.2 GRU 상태 업데이트

후보 hidden state:

\[\tilde{h}_t = \tanh(W_h \cdot [r_t \odot h_{t-1}, x_t] + b_h)\]

최종 hidden state:

\[h_t = (1 - z_t) \odot h_{t-1} + z_t \odot \tilde{h}_t\]

  • \(z_t\)가 Forget gate와 Input gate의 역할을 동시에 수행
  • Cell state가 없고 Hidden state만 존재

1.4.3 파라미터 수 비교

모델 게이트 수 상태 수 hidden_size=64 기준 파라미터
LSTM 3 (forget, input, output) + candidate 2 (C, h) \(4 \times (64 \times (64 + d) + 64)\)
GRU 2 (reset, update) 1 (h) \(3 \times (64 \times (64 + d) + 64)\)

GRU는 LSTM 대비 약 75% 파라미터로 비슷한 성능을 낸다. 소규모 데이터에서는 GRU가 오히려 유리할 수 있다.


1.5 종단 데이터 적용: 가변 길이 처리

1.5.1 패딩과 마스킹

가변 길이 시퀀스를 미니배치로 묶으려면 패딩이 필요하다.

사용자 A: [x₁, x₂, x₃, x₄, x₅]      (길이 5)
사용자 B: [x₁, x₂, x₃]               (길이 3)
사용자 C: [x₁, x₂, x₃, x₄, x₅, x₆]  (길이 6)

패딩 후 (max_len=6):
사용자 A: [x₁, x₂, x₃, x₄, x₅, 0 ]  mask: [1,1,1,1,1,0]
사용자 B: [x₁, x₂, x₃, 0,  0,  0 ]  mask: [1,1,1,0,0,0]
사용자 C: [x₁, x₂, x₃, x₄, x₅, x₆]  mask: [1,1,1,1,1,1]

1.5.2 pack_padded_sequence

PyTorch의 pack_padded_sequence는 패딩된 시퀀스를 압축하여 LSTM이 패딩을 무시하도록 한다. 마스킹보다 연산 효율이 높다.

from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

# 시퀀스 길이 기준 내림차순 정렬 (pack 요구 조건)
lengths = mask.sum(dim=1).long()
sorted_lengths, sorted_idx = lengths.sort(descending=True)
sorted_x = x[sorted_idx]

# 패킹
packed = pack_padded_sequence(sorted_x, sorted_lengths.cpu(), batch_first=True)

# LSTM 통과
packed_out, (h_n, c_n) = lstm(packed)

# 언패킹
output, _ = pad_packed_sequence(packed_out, batch_first=True)

# 원래 순서 복원
_, unsorted_idx = sorted_idx.sort()
output = output[unsorted_idx]

1.6 PyTorch 구현: 완전한 학습 파이프라인

1.6.1 데이터셋

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import numpy as np
from sklearn.metrics import roc_auc_score


class LongitudinalDataset(Dataset):
    """사용자별 가변 길이 시계열 데이터셋"""

    def __init__(self, df, features, target, max_len=12):
        self.sequences = []
        self.targets = []
        self.lengths = []

        for uid in df["user_id"].unique():
            user_df = df[df["user_id"] == uid].sort_values("week")
            seq = user_df[features].values
            tgt = user_df[target].values[-1]

            T = min(len(seq), max_len)
            seq = seq[:T]
            self.sequences.append(seq)
            self.targets.append(tgt)
            self.lengths.append(T)

        self.max_len = max_len
        self.n_features = len(features)

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        T = self.lengths[idx]
        pad_len = self.max_len - T

        # 패딩
        if pad_len > 0:
            seq = np.vstack([seq, np.zeros((pad_len, self.n_features))])

        mask = np.array([1] * T + [0] * pad_len, dtype=np.float32)

        return (
            torch.FloatTensor(seq),
            torch.FloatTensor(mask),
            torch.FloatTensor([self.targets[idx]]),
            torch.LongTensor([T])
        )

1.6.2 LSTM 이탈 예측 모델

class LSTMChurnPredictor(nn.Module):
    """LSTM 기반 이탈 예측 — 확장 버전"""

    def __init__(self, input_size, hidden_size=64, n_layers=2,
                 dropout=0.3, bidirectional=False):
        super().__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.bidirectional = bidirectional
        self.n_directions = 2 if bidirectional else 1

        # 입력 정규화
        self.layer_norm = nn.LayerNorm(input_size)

        # LSTM
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=n_layers,
            batch_first=True,
            dropout=dropout if n_layers > 1 else 0,
            bidirectional=bidirectional
        )

        # 분류 헤드
        classifier_input = hidden_size * self.n_directions
        self.classifier = nn.Sequential(
            nn.Linear(classifier_input, 64),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward(self, x, mask, lengths):
        batch_size = x.size(0)

        # 입력 정규화
        x = self.layer_norm(x)

        # pack_padded_sequence
        lengths_cpu = lengths.squeeze(-1).cpu()
        sorted_lengths, sorted_idx = lengths_cpu.sort(descending=True)
        sorted_x = x[sorted_idx]

        packed = pack_padded_sequence(
            sorted_x, sorted_lengths.clamp(min=1), batch_first=True
        )
        packed_out, (h_n, c_n) = self.lstm(packed)
        output, _ = pad_packed_sequence(packed_out, batch_first=True)

        # 원래 순서 복원
        _, unsorted_idx = sorted_idx.sort()
        output = output[unsorted_idx]
        lengths_restored = lengths_cpu[unsorted_idx]

        # 마지막 유효 시점의 hidden state
        last_hidden = output[
            torch.arange(batch_size),
            (lengths_restored - 1).clamp(min=0),
            :
        ]

        return self.classifier(last_hidden)

1.6.3 학습 루프 (조기 종료 + AUC 평가)

def train_model(model, train_loader, val_loader, epochs=100, patience=10, lr=1e-3):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode="min", patience=5, factor=0.5
    )
    criterion = nn.BCELoss()

    best_val_loss = float("inf")
    best_val_auc = 0.0
    no_improve = 0

    for epoch in range(epochs):
        # --- Train ---
        model.train()
        train_loss = 0
        for seq, mask, target, lengths in train_loader:
            pred = model(seq, mask, lengths)
            loss = criterion(pred, target)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        # --- Validation ---
        model.eval()
        val_loss = 0
        all_preds, all_targets = [], []
        with torch.no_grad():
            for seq, mask, target, lengths in val_loader:
                pred = model(seq, mask, lengths)
                loss = criterion(pred, target)
                val_loss += loss.item()
                all_preds.extend(pred.numpy().flatten())
                all_targets.extend(target.numpy().flatten())

        val_loss /= len(val_loader)
        val_auc = roc_auc_score(all_targets, all_preds)
        scheduler.step(val_loss)

        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}: train_loss={train_loss:.4f}, "
                  f"val_loss={val_loss:.4f}, val_AUC={val_auc:.4f}")

        # 조기 종료
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_val_auc = val_auc
            torch.save(model.state_dict(), "best_lstm_model.pt")
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= patience:
                print(f"Early stopping at epoch {epoch+1}. Best AUC: {best_val_auc:.4f}")
                break

    return best_val_auc


# 사용 예
features = ["satisfaction", "turn_count", "emotion_score", "personalized"]
train_dataset = LongitudinalDataset(train_df, features, "will_churn_next_week")
val_dataset = LongitudinalDataset(val_df, features, "will_churn_next_week")

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

model = LSTMChurnPredictor(input_size=len(features), bidirectional=False)
best_auc = train_model(model, train_loader, val_loader)

1.7 Bidirectional LSTM

양방향 LSTM은 시퀀스를 앞→뒤뒤→앞 두 방향으로 동시에 처리한다.

순방향: h₁→ → h₂→ → h₃→ → h₄→
역방향: h₁← ← h₂← ← h₃← ← h₄←

결합: h_t = [h_t→ ; h_t←]  (concatenation)

1.7.1 언제 사용하는가

상황 단방향 양방향
실시간 예측 (미래 모름) O X
전체 시퀀스 사후 분류 O O (더 좋음)
시퀀스 임베딩 추출 O O (더 좋음)

종단 데이터에서:

  • 이탈 예측 (실시간): 단방향 — 현재까지의 데이터만 사용
  • 사후 패턴 분류: 양방향 — 전체 시퀀스를 보고 판단
# 양방향 LSTM 사용
model_bi = LSTMChurnPredictor(
    input_size=len(features),
    hidden_size=64,
    bidirectional=True  # hidden 출력이 128차원이 됨
)

1.8 실무 예시: AI Agent 만족도 시퀀스 → 이탈 예측

1.8.1 데이터 구조

import pandas as pd

# AI Agent 개인화 실험 데이터
# 200명 사용자 × 가변 주 (3~12주) × 4개 피처
data = {
    "user_id": [],
    "week": [],
    "satisfaction": [],     # 1~10 만족도
    "turn_count": [],       # 주당 대화 횟수
    "emotion_score": [],    # 감정 분석 점수 (-1~1)
    "personalized": [],     # 개인화 적용 여부 (0/1)
    "will_churn_next_week": []  # 다음 주 이탈 여부
}

# 이탈자 패턴: 만족도 하락 → 대화 감소 → 이탈
# 유지자 패턴: 만족도 안정/상승 → 대화 유지

1.8.2 학습 → 평가 → 해석

# 1. 데이터 분할
from sklearn.model_selection import train_test_split

user_ids = df["user_id"].unique()
train_ids, test_ids = train_test_split(user_ids, test_size=0.2, random_state=42)
train_ids, val_ids = train_test_split(train_ids, test_size=0.2, random_state=42)

train_df = df[df["user_id"].isin(train_ids)]
val_df = df[df["user_id"].isin(val_ids)]
test_df = df[df["user_id"].isin(test_ids)]

# 2. 학습
model = LSTMChurnPredictor(input_size=4, hidden_size=64, n_layers=2)
best_auc = train_model(model, train_loader, val_loader)

# 3. 테스트 평가
model.load_state_dict(torch.load("best_lstm_model.pt"))
model.eval()

test_preds, test_labels = [], []
with torch.no_grad():
    for seq, mask, target, lengths in test_loader:
        pred = model(seq, mask, lengths)
        test_preds.extend(pred.numpy().flatten())
        test_labels.extend(target.numpy().flatten())

test_auc = roc_auc_score(test_labels, test_preds)
print(f"Test AUC: {test_auc:.4f}")
# 예상 출력: Test AUC: 0.8234

1.9 LSTM의 한계

한계 설명 대안
순차 처리 \(h_t\)\(h_{t-1}\)에 의존 → 병렬화 불가 TCN, Transformer
긴 시퀀스 비효율 T > 100이면 학습 느림 TCN (병렬 convolution)
장기 의존성 한계 기울기 소실이 완화되었지만 완전히 해결되지 않음 Transformer (직접 참조)
불규칙 시점 이산 시점 가정 Neural ODE
해석 가능성 hidden state가 블랙박스 Transformer (Attention)

1.10 요약

항목 내용
Vanilla RNN 기울기 소실로 장기 의존성 학습 불가
LSTM 3개 게이트 + Cell state로 장기 기억 유지
GRU 2개 게이트, 파라미터 75% 수준, 비슷한 성능
가변 길이 처리 패딩 + 마스킹 또는 pack_padded_sequence
양방향 LSTM 사후 분류에 유리, 실시간 예측에는 단방향
핵심 한계 순차 처리, 긴 시퀀스 비효율

다음: 26 — TCN: Dilated Causal Convolution으로 시퀀스 병렬 처리

Subscribe

Enjoy this blog? Get notified of new posts by email: