Hugging Face: PLM 생태계의 중심

실무에서 바로 사용할 수 있는 사전 학습 모델의 허브

Hugging Face는 현재 NLP 분야에서 가장 중요한 라이브러리이자 플랫폼이다. 수만 개의 사전 학습 모델을 제공하며, 몇 줄의 코드만으로 최신 PLM을 활용할 수 있게 해준다. 토크나이저부터 파인튜닝, 배포까지 전체 ML 워크플로우를 지원하는 Hugging Face의 핵심 기능들과 실무 활용 전략을 상세히 분석한다.

NLP
Deep Learning
저자

Kwangmin Kim

공개

2025년 01월 27일

한국어 혐오 발언 다중 레이블 분류 : K-MHaS (Korean Multi-label Hate Speech Dataset)

1 KorNLI 분류

데이터 셋: https://github.com/adlnlp/K-MHaS

2 데이터셋 로드 및 구조 확인


from datasets import load_dataset

dataset = load_dataset("jeanlee/kmhas_korean_hate_speech")
dataset

만약 테스트 데이터만 별도로 로드하고 싶다면 load_dataset()으로 데이터를 불러올 당시에 split=“test”의 인자를 추가적으로 사용하면 됩니다.

dataset = load_dataset("jeanlee/kmhas_korean_hate_speech", split="test")
dataset
dataset.features
print('테스트 데이터 셋의 크기 :', len(dataset['text']))
print('첫번째 샘플 출력 :', dataset['text'][0])
print('첫번째 샘플의 레이블 출력 :', dataset['label'][0])

데이터셋의 깃허브로부터 확인할 수 있는 각 레이블이 의미하는 바는 다음과 같습니다. 레이블은 0부터 8까지 총 9개의 레이블이 존재합니다.

class_label:
  names:
    0: origin (출신차별)
    1: physical (외모차별)
    2: politics (정치성향차별)
    3: profanity (혐오욕설)
    4: age (연령차별)
    5: gender (성차별)
    6: race (인종차별)
    7: religion (종교차별)
    8: not_hate_speech (혐오아님)
print('두번째 샘플 출력 :', dataset['text'][1])
print('두번째 샘플의 레이블 출력 :', dataset['label'][1])

3 전처리

from datasets import load_dataset

dataset = load_dataset("jeanlee/kmhas_korean_hate_speech")

import pandas as pd
import numpy as np
import random
import time
import datetime
from tqdm import tqdm

import csv
import os

import tensorflow as tf
import torch

# BERT 사용을 위함
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

# for padding
from tensorflow.keras.preprocessing.sequence import pad_sequences 

# 전처리 및 평가 지표
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score, hamming_loss

# 훈련 데이터, 검증 데이터, 테스트 데이터를 로드합니다.

train = load_dataset("jeanlee/kmhas_korean_hate_speech", split="train")
validation = load_dataset("jeanlee/kmhas_korean_hate_speech", split="validation")
test = load_dataset("jeanlee/kmhas_korean_hate_speech", split="test")

훈련 데이터, 검증 데이터, 테스트 데이터에 대해서 [CLS] 문장 [SEP] 구조를 만듭니다. [CLS]는 분류를 하기 위해 BERT가 사용하는 첫번째 입력 토큰이며, [SEP]는 입력 문장의 종료를 나타내기 위해 사용하는 스페셜 토큰입니다.

# 훈련 데이터, 검증 데이터, 테스트 데이터에 대해서 `[CLS] 문장 [SEP]` 구조를 만듭니다.

train_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', train['text']))
validation_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', validation['text']))
test_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', test['text']))

다중 레이블의 경우에는 모든 카테고리 문제에 대해서 이진 분류 문제를 푸는 것과 같습니다. 현재 이 문제의 경우 총 9개의 카테고리가 존재하므로 모델은 다음과 같이 모든 샘플에 대해서 아래의 질문을 모두 수행합니다.

Ex) 이 텍스트가 0번 레이블에 해당하는가? yes or no => no. 이 텍스트가 1번 레이블에 해당하는가? yes or no => no. 이 텍스트가 2번 레이블에 해당하는가? yes or no => yes. 이 텍스트가 3번 레이블에 해당하는가? yes or no => yes. 이 텍스트가 4번 레이블에 해당하는가? yes or no => no. 이 텍스트가 5번 레이블에 해당하는가? yes or no => no. 이 텍스트가 6번 레이블에 해당하는가? yes or no => no. 이 텍스트가 7번 레이블에 해당하는가? yes or no => no. 이 텍스트가 8번 레이블에 해당하는가? yes or no => no.

예측 : [2, 3]

# 정답인 레이블의 위치에는 1, 나머지 위치에는 0을 기록합니다.
# 레이블 전처리 예시)
# [8]    -> [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0] : 의미적으로는 [no, no, no, no, no, no, no, no, no, yes]
# [2, 3] -> [0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0] : 의미적으로는 [no, no, yes, yes, no, no, no, no, no, no]

from sklearn.preprocessing import MultiLabelBinarizer

enc = MultiLabelBinarizer()

def multi_label(example):
    enc_label = enc.fit_transform(example['label'])
    float_arr = np.vstack(enc_label[:]).astype(float)
    update_label = float_arr.tolist()
    return update_label

train_labels = multi_label(train)
validation_labels = multi_label(validation)
test_labels = multi_label(test)
test_sentences[:5]
# 각 레이블은 기존에 [8], [2, 3], [2], [0], [8] 이었으며 전처리 후 아래와 같이 변경됨.
test_labels[:5]

4 BERT 토크나이저를 이용한 전처리

BERT를 사용하기 위해서는 토크나이저와 모델이 반드시 맵핑 관계여야만 합니다. 다시 말해 아래의 이름에 들어가는 모델이름은 반드시 동일해야 합니다.

  • BertTokenizer.from_pretrained('모델이름')
  • BertForSequenceClassification.from_pretrained("모델이름")

토크나이저는 내부적으로 Vocabulary를 갖고 있어 정수 인코딩을 수행해주는 모듈입니다.

# 한국어 BERT 중 하나인 'klue/bert-base'를 사용.
tokenizer = BertTokenizer.from_pretrained('klue/bert-base')
tokenized_text = tokenizer.tokenize('안녕하세요. 자연어 처리를 배울거에요.')
input_id = tokenizer.convert_tokens_to_ids(tokenized_text)

print('토큰화 된 문장 :', tokenized_text)
print('정수 인코딩 된 문장 :', input_id)

MAX_LEN = 128

def data_to_tensor (sentences, labels):
  # 정수 인코딩 과정. 각 텍스트를 토큰화한 후에 Vocabulary에 맵핑되는 정수 시퀀스로 변환한다.
  # ex) ['안녕하세요'] ==> ['안', '녕', '하세요'] ==> [231, 52, 45]
  tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
  input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

  # pad_sequences는 패딩을 위한 모듈. 주어진 최대 길이를 위해서 뒤에서 0으로 채워준다.
  # ex) [231, 52, 45] ==> [231, 52, 45, 0, 0, 0]
  input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post") 

  attention_masks = []

  for seq in input_ids:
      seq_mask = [float(i > 0) for i in seq]
      attention_masks.append(seq_mask)

  tensor_inputs = torch.tensor(input_ids)
  tensor_labels = torch.tensor(labels)
  tensor_masks = torch.tensor(attention_masks)

  return tensor_inputs, tensor_labels, tensor_masks

# 훈련 데이터, 검증 데이터, 텍스트 데이터에 대해서 data_to_tensor 함수를 통해서 정수 인코딩 된 데이터, 레이블, 어텐션 마스크를 얻습니다.
train_inputs, train_labels, train_masks = data_to_tensor(train_sentences, train_labels)
validation_inputs, validation_labels, validation_masks = data_to_tensor(validation_sentences, validation_labels)
test_inputs, test_labels, test_masks = data_to_tensor(test_sentences, test_labels)

배치 크기는 32로 하고 파이토치의 데이터로더(배치 단위로 데이터를 꺼내올 수 있도록 하는 모듈)로 변환합니다.

batch_size = 32

train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

print('훈련 데이터의 크기:', len(train_labels))
print('검증 데이터의 크기:', len(validation_labels))
print('테스트 데이터의 크기:', len(test_labels))

5 GPU가 정상 셋팅되었는지 확인.

Colab에서 GPU를 사용하기 위해서는 아래와 같이 설정이 되어있어야만 합니다.

런타임 > 런타임 유형 변경 > 하드웨어 가속기 > ‘GPU’ 선택

if torch.cuda.is_available():    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print('No GPU available, using the CPU instead.')

6 모델로드

BERT를 사용하여 텍스트를 분류하는 BERT 아키텍처는 BertForSequenceClassification.from_pretrained(“모델이름”)을 넣어서 가능합니다.

이때 인자값으로 “problem_type”에 “multi_label_classification” 값으로 넣으면 다중 레이블 분류 형태로 모델을 로드합니다.

num_labels = 9

model = BertForSequenceClassification.from_pretrained("klue/bert-base", num_labels=num_labels, problem_type="multi_label_classification")
model.cuda()

# 옵티마이저 선택
optimizer = AdamW(model.parameters(),
                  lr = 2e-5,
                  eps = 1e-8
                )
# 몇 번의 에포크(전체 데이터에 대한 학습 횟수)를 할 것인지 선택
epochs = 4
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

def format_time(elapsed):
    elapsed_rounded = int(round((elapsed)))
    return str(datetime.timedelta(seconds=elapsed_rounded))  # hh:mm:ss

def multi_label_metrics(predictions, labels, threshold=0.5):
    
    # 모델의 예측에 대해서 시그모이드 함수값을 통과시킨다. (batch_size, num_labels)
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))

    # 만약 threshold 값을 넘는 경우에는 1로 예측했다고 간주한다.
    # threshold 값은 일반적으로 로지스틱 회귀 방식에 의하여 0.5를 선택하는 것이 일반적이다.
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1

    y_true = labels

    # 사용 가능한 메트릭들을 사용한다.
    accuracy = accuracy_score(y_true, y_pred)
    f1_macro_average = f1_score(y_true=y_true, y_pred=y_pred, average='macro', zero_division=0)
    f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro', zero_division=0)
    f1_weighted_average = f1_score(y_true=y_true, y_pred=y_pred, average='weighted', zero_division=0)
    roc_auc = roc_auc_score(y_true, y_pred, average = 'micro')

    # 메트릭 결과에 대해서 리턴
    metrics = {'accuracy': accuracy,
               'f1_macro': f1_macro_average,
               'f1_micro': f1_micro_average,
               'f1_weighted': f1_weighted_average,
               'roc_auc': roc_auc}

    return metrics

7 모델 학습

# 랜덤 시드값.
seed_val = 777
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

model.zero_grad()
for epoch_i in range(0, epochs):
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    t0 = time.time()
    total_loss = 0

    model.train()

    for step, batch in tqdm(enumerate(train_dataloader)):
        if step % 500 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))

        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch

        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask, 
                        labels=b_labels)
        
        loss = outputs[0]
        total_loss += loss.item()
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # gradient clipping if it is over a threshold
        optimizer.step()
        scheduler.step()

        model.zero_grad()

    avg_train_loss = total_loss / len(train_dataloader)            

    print("")
    print("  Average training loss: {0:.4f}".format(avg_train_loss))
    print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))

8 검증 데이터에 대한 평가

t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []

for batch in validation_dataloader:
    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    for b in logits:
        accum_logits.append(list(b))

    for b in label_ids:
        accum_label_ids.append(list(b))

accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = multi_label_metrics(accum_logits, accum_label_ids)

print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))
print("ROC-AUC: {0:.4f}".format(results['roc_auc']))

9 모델 저장과 로드

%pwd

# 폴더 생성
%mkdir model

path = '/content/model/'

# 모델 저장
torch.save(model.state_dict(), path+"BERT_kornli.pt")

# 모델 로드
model.load_state_dict(torch.load(path+"BERT_kornli.pt"))

10 테스트 데이터에 대한 평가

t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []

for step, batch in tqdm(enumerate(test_dataloader)):
    if step % 100 == 0 and not step == 0:
        elapsed = format_time(time.time() - t0)
        print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(test_dataloader), elapsed))

    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()
    
    for b in logits:
        accum_logits.append(list(b))

    for b in label_ids:
        accum_label_ids.append(list(b))

accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = multi_label_metrics(accum_logits, accum_label_ids)

print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))
print("ROC-AUC: {0:.4f}".format(results['roc_auc']))

11 예측


from transformers import pipeline
pipe = pipeline("text-classification", model=model.cuda(), tokenizer=tokenizer, device=0, max_length=512,
                return_all_scores=True, function_to_apply='sigmoid')

result = pipe('틀니들은 왜 그렇게 민폐를 끼치냐?')
print(result)

def prediction(text):
  result = pipe(text)
  return [label_dict[res['label']] for res in result[0] if res['score'] > 0.5]

Subscribe

Enjoy this blog? Get notified of new posts by email: