Hugging Face: PLM 생태계의 중심

실무에서 바로 사용할 수 있는 사전 학습 모델의 허브

Hugging Face는 현재 NLP 분야에서 가장 중요한 라이브러리이자 플랫폼이다. 수만 개의 사전 학습 모델을 제공하며, 몇 줄의 코드만으로 최신 PLM을 활용할 수 있게 해준다. 토크나이저부터 파인튜닝, 배포까지 전체 ML 워크플로우를 지원하는 Hugging Face의 핵심 기능들과 실무 활용 전략을 상세히 분석한다.

NLP
Deep Learning
저자

Kwangmin Kim

공개

2025년 01월 27일

1 한국어 금융 뉴스 긍정, 부정 분류

dataset: https://github.com/ukairia777/finance_sentiment_corpus

!pip install transformers
!pip install datasets

# 데이터 다운로드

!wget https://raw.githubusercontent.com/ukairia777/finance_sentiment_corpus/main/finance_data.csv
import pandas as pd
df = pd.read_csv('../data/NLP/finance_data.csv')
print('샘플의 개수 :', len(df)) # 샘플의 개수 : 4846

df.head()
df['labels'] = df['labels'].replace(['neutral', 'positive', 'negative'],[0, 1, 2])
df.head()

df.to_csv('finance_data.csv', index=False, encoding='utf-8-sig') # 값을 변경한 데이터프레임을 다시 csv로 저장합니다.

# csv 파일로부터 datasets을 로드할 수 있습니다.
from datasets import load_dataset

all_data = load_dataset(
        "csv",
        data_files={
            "train": "finance_data.csv",
        },
    )

#현재 train에 모든 데이터가 저장되어져 있습니다.
all_data

#DatasetDict({
#    train: Dataset({
#        features: ['labels', 'sentence', 'kor_sentence'],
#        num_rows: 4846
#    })
#})

이를 dataset의 train_test_split() 기능을 사용하여 8:2 비율로 분리하고 훈련 데이터와 테스트 데이터로 저장합니다.


cs = all_data['train'].train_test_split(0.2)
train_cs = cs["train"]
test_cs = cs["test"]

print(train_cs)
print(test_cs)

검증 데이터를 위해 훈련 데이터를 다시 8:2로 훈련 데이터와 검증 데이터로 저장합니다.

# 훈련 데이터를 다시 8:2로 분리 후 훈련 데이터와 검증 데이터로 저장
cs = train_cs.train_test_split(0.2)
train_cs = cs["train"]
valid_cs = cs["test"]

데이터셋의 구조는 다음과 같습니다. 훈련 데이터, 검증 데이터, 테스트 데이터로 구성되며 우리가 사용할 열은 kor_text열과 labels열입니다.

print(train_cs)
print(valid_cs)
print(test_cs)

print('두번째 샘플 출력 :', train_cs['kor_sentence'][1])
print('두번째 샘플의 레이블 출력 :', train_cs['labels'][1])

1.1 데이터셋 전처리


import pandas as pd
import numpy as np
import random
import time
import datetime
from tqdm import tqdm

import csv
import os

import tensorflow as tf
import torch

# BERT 사용을 위함
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

# for padding
from tensorflow.keras.preprocessing.sequence import pad_sequences 

# 전처리 및 평가 지표
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score, hamming_loss

훈련 데이터, 검증 데이터, 테스트 데이터에 대해서 [CLS] 문장 [SEP] 구조를 만듭니다. [CLS]는 분류를 하기 위해 BERT가 사용하는 첫번째 입력 토큰이며, [SEP]는 입력 문장의 종료를 나타내기 위해 사용하는 스페셜 토큰입니다.

# 훈련 데이터, 검증 데이터, 테스트 데이터에 대해서 `[CLS] 문장 [SEP]` 구조를 만듭니다.

train_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', train_cs['kor_sentence']))
validation_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', valid_cs['kor_sentence']))
test_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', test_cs['kor_sentence']))

train_labels = train_cs['labels']
validation_labels = valid_cs['labels']
test_labels = test_cs['labels']

test_sentences[:5]

[‘[CLS] 덴마크 로열유니브루가 소유한 칼나필리오타우로그루페(칼나필리스타우라스그룹)는 7개월 동안 맥주 판매량이 14.5% 급증한 4050만ℓ를 기록하며 시장점유율을 23.74%에서 25.18%로 끌어올렸다. [SEP]’, ‘[CLS] 순이자 수입은 152.2 mn으로 2008년 101.0 mn에서 증가하였다. [SEP]’, ‘[CLS] 인도된 충전기 수가 6590만대로 41% 증가하면서 순매출액은 전년 대비 25.5% 증가한 59.6m를 기록했다. [SEP]’, ‘[CLS] 뉴스, 의견 또는 배포에 대한 보상 없음. [SEP]’, ‘[CLS] 국내 및 지역에서의 강력한 브랜드 가시성은 가정 판매, 차량 및 소비자 광고에서 가장 중요합니다. [SEP]’]

중립 = 0 긍정 = 1 부정 = 2

test_labels[:5]

[1, 1, 1, 0, 0]

1.2 BERT 토크나이저를 이용한 전처리

BERT를 사용하기 위해서는 토크나이저와 모델이 반드시 맵핑 관계여야만 합니다. 다시 말해 아래의 이름에 들어가는 모델이름은 반드시 동일해야 합니다.

BertTokenizer.from_pretrained('모델이름') BertForSequenceClassification.from_pretrained("모델이름")

토크나이저는 내부적으로 Vocabulary를 갖고 있어 정수 인코딩을 수행해주는 모듈입니다.

# 한국어 BERT 중 하나인 'klue/bert-base'를 사용.
tokenizer = BertTokenizer.from_pretrained('klue/bert-base')

MAX_LEN = 128

def data_to_tensor (sentences, labels):
  # 정수 인코딩 과정. 각 텍스트를 토큰화한 후에 Vocabulary에 맵핑되는 정수 시퀀스로 변환한다.
  # ex) ['안녕하세요'] ==> ['안', '녕', '하세요'] ==> [231, 52, 45]
  tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
  input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

  # pad_sequences는 패딩을 위한 모듈. 주어진 최대 길이를 위해서 뒤에서 0으로 채워준다.
  # ex) [231, 52, 45] ==> [231, 52, 45, 0, 0, 0]
  input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post") 

  attention_masks = []

  for seq in input_ids:
      seq_mask = [float(i > 0) for i in seq]
      attention_masks.append(seq_mask)

  tensor_inputs = torch.tensor(input_ids)
  tensor_labels = torch.tensor(labels)
  tensor_masks = torch.tensor(attention_masks)

  return tensor_inputs, tensor_labels, tensor_masks

훈련 데이터, 검증 데이터, 텍스트 데이터에 대해서 data_to_tensor 함수를 통해서 정수 인코딩 된 데이터, 레이블, 어텐션 마스크를 얻습니다.

train_inputs, train_labels, train_masks = data_to_tensor(train_sentences, train_labels)
validation_inputs, validation_labels, validation_masks = data_to_tensor(validation_sentences, validation_labels)
test_inputs, test_labels, test_masks = data_to_tensor(test_sentences, test_labels)

print(train_inputs[0])
print(train_masks[0])

tokenizer.decode([2]) # [CLS]
tokenizer.decode([3]) # [SEP]

배치 크기는 32로 하고 파이토치의 데이터로더(배치 단위로 데이터를 꺼내올 수 있도록 하는 모듈)로 변환합니다.

batch_size = 32

train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

print('훈련 데이터의 크기:', len(train_labels))
print('검증 데이터의 크기:', len(validation_labels))
print('테스트 데이터의 크기:', len(test_labels))

1.3 GPU가 정상 셋팅되었는지 확인.

Colab에서 GPU를 사용하기 위해서는 아래와 같이 설정이 되어있어야만 합니다.

  • 런타임 > 런타임 유형 변경 > 하드웨어 가속기 > ‘GPU’ 선택

if torch.cuda.is_available():    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print('No GPU available, using the CPU instead.')

1.4 모델 로드

BERT를 사용하여 텍스트를 분류하는 BERT 아키텍처는 BertForSequenceClassification.from_pretrained(“모델이름”)을 넣어서 가능합니다. 레이블 수로 num_labels라는 인자값에 레이블의 수를 기재해줍니다.

num_labels = 3

model = BertForSequenceClassification.from_pretrained("klue/bert-base", num_labels=num_labels)
model.cuda()

# 옵티마이저 선택
optimizer = AdamW(model.parameters(),
                  lr = 2e-5,
                  eps = 1e-8
                )


# 몇 번의 에포크(전체 데이터에 대한 학습 횟수)를 할 것인지 선택
epochs = 2
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)


def format_time(elapsed):
    elapsed_rounded = int(round((elapsed)))
    return str(datetime.timedelta(seconds=elapsed_rounded))  # hh:mm:ss

def metrics(predictions, labels):
    y_pred = predictions
    y_true = labels

    # 사용 가능한 메트릭들을 사용한다.
    accuracy = accuracy_score(y_true, y_pred)
    f1_macro_average = f1_score(y_true=y_true, y_pred=y_pred, average='macro', zero_division=0)
    f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro', zero_division=0)
    f1_weighted_average = f1_score(y_true=y_true, y_pred=y_pred, average='weighted', zero_division=0)

    # 메트릭 결과에 대해서 리턴
    metrics = {'accuracy': accuracy,
               'f1_macro': f1_macro_average,
               'f1_micro': f1_micro_average,
               'f1_weighted': f1_weighted_average}

    return metrics

1.5 모델 학습

# 랜덤 시드값.
seed_val = 777
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

model.zero_grad()
for epoch_i in range(0, epochs):
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    t0 = time.time()
    total_loss = 0

    model.train()

    for step, batch in tqdm(enumerate(train_dataloader)):
        if step % 500 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))

        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch

        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask, 
                        labels=b_labels)
        
        loss = outputs[0]
        total_loss += loss.item()
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # gradient clipping if it is over a threshold
        optimizer.step()
        scheduler.step()

        model.zero_grad()

    avg_train_loss = total_loss / len(train_dataloader)            

    print("")
    print("  Average training loss: {0:.4f}".format(avg_train_loss))
    print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))

1.6 검증 데이터에 대한 평가

t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []

for batch in validation_dataloader:
    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    for b in logits:
        # 3개의 값 중 가장 큰 값을 예측한 인덱스로 결정
        # ex) [ 3.5134246  -0.30875662 -2.111316  ] ==> 0
        accum_logits.append(np.argmax(b))

    for b in label_ids:
        accum_label_ids.append(b)

accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = metrics(accum_logits, accum_label_ids)

print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))

1.6.1 모델 저장과 로드

%pwd
# 폴더 생성
%mkdir model

path = '/content/model/'

# 모델 저장
torch.save(model.state_dict(), path+"BERT_news_positive_negative_model.pt")

# 모델 로드
model.load_state_dict(torch.load(path+"BERT_news_positive_negative_model.pt"))

2 테스트 데이터에 대한 평가

t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []

for step, batch in tqdm(enumerate(test_dataloader)):
    if step % 100 == 0 and not step == 0:
        elapsed = format_time(time.time() - t0)
        print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(test_dataloader), elapsed))

    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        outputs = model(b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()
    
    for b in logits:
        # 3개의 값 중 가장 큰 값을 예측한 인덱스로 결정
        # ex) [ 3.5134246  -0.30875662 -2.111316  ] ==> 0
        accum_logits.append(np.argmax(b))

    for b in label_ids:
        accum_label_ids.append(b)

accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = metrics(accum_logits, accum_label_ids)

print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))

2.1 예측

from transformers import pipeline
pipe = pipeline("text-classification", model=model.cuda(), tokenizer=tokenizer, device=0, max_length=512,
                return_all_scores=True, function_to_apply='softmax')
result = pipe('SK하이닉스가 매출이 급성장하였다')
print(result)

pipe = pipeline("text-classification", model=model.cuda(), tokenizer=tokenizer, device=0, max_length=512, function_to_apply='softmax')
result = pipe('SK하이닉스가 매출이 급성장하였다')
print(result)

label_dict = {'LABEL_0' : '중립', 'LABEL_1' : '긍정', 'LABEL_2' : '부정'}

def prediction(text):
  result = pipe(text)
  
  return [label_dict[result[0]['label']]]

prediction('패스트캠퍼스가 매출이 급성장하였다')
prediction('ChatGPT의 등장으로 인공지능 스타트업들은 비상이다')
prediction('인공지능 기술의 발전으로 누군가는 기회를 얻을 것이고, 누군가는 얻지 못할 것이다')

현재 데이터셋의 경우 레이블이 3개이지만 만약 레이블이 2개(긍정, 부정)인 이진 분류 문제였다면? 모델 로드 시에 num_labels를 바꿔주면 된다.

num_labels = 2

model = BertForSequenceClassification.from_pretrained("klue/bert-base", num_labels=num_labels)
model.cuda()

Subscribe

Enjoy this blog? Get notified of new posts by email: