한국어 혐오 발언 다중 레이블 분류 : K-MHaS (Korean Multi-label Hate Speech Dataset)
1 KorNLI 분류
데이터 셋: https://github.com/adlnlp/K-MHaS
2 데이터셋 로드 및 구조 확인
from datasets import load_dataset
dataset = load_dataset("jeanlee/kmhas_korean_hate_speech")
dataset만약 테스트 데이터만 별도로 로드하고 싶다면 load_dataset()으로 데이터를 불러올 당시에 split=“test”의 인자를 추가적으로 사용하면 됩니다.
dataset = load_dataset("jeanlee/kmhas_korean_hate_speech", split="test")
dataset
dataset.features
print('테스트 데이터 셋의 크기 :', len(dataset['text']))
print('첫번째 샘플 출력 :', dataset['text'][0])
print('첫번째 샘플의 레이블 출력 :', dataset['label'][0])데이터셋의 깃허브로부터 확인할 수 있는 각 레이블이 의미하는 바는 다음과 같습니다. 레이블은 0부터 8까지 총 9개의 레이블이 존재합니다.
class_label:
names:
0: origin (출신차별)
1: physical (외모차별)
2: politics (정치성향차별)
3: profanity (혐오욕설)
4: age (연령차별)
5: gender (성차별)
6: race (인종차별)
7: religion (종교차별)
8: not_hate_speech (혐오아님)
3 전처리
from datasets import load_dataset
dataset = load_dataset("jeanlee/kmhas_korean_hate_speech")
import pandas as pd
import numpy as np
import random
import time
import datetime
from tqdm import tqdm
import csv
import os
import tensorflow as tf
import torch
# BERT 사용을 위함
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
# for padding
from tensorflow.keras.preprocessing.sequence import pad_sequences
# 전처리 및 평가 지표
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score, hamming_loss
# 훈련 데이터, 검증 데이터, 테스트 데이터를 로드합니다.
train = load_dataset("jeanlee/kmhas_korean_hate_speech", split="train")
validation = load_dataset("jeanlee/kmhas_korean_hate_speech", split="validation")
test = load_dataset("jeanlee/kmhas_korean_hate_speech", split="test")훈련 데이터, 검증 데이터, 테스트 데이터에 대해서 [CLS] 문장 [SEP] 구조를 만듭니다. [CLS]는 분류를 하기 위해 BERT가 사용하는 첫번째 입력 토큰이며, [SEP]는 입력 문장의 종료를 나타내기 위해 사용하는 스페셜 토큰입니다.
# 훈련 데이터, 검증 데이터, 테스트 데이터에 대해서 `[CLS] 문장 [SEP]` 구조를 만듭니다.
train_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', train['text']))
validation_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', validation['text']))
test_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', test['text']))다중 레이블의 경우에는 모든 카테고리 문제에 대해서 이진 분류 문제를 푸는 것과 같습니다. 현재 이 문제의 경우 총 9개의 카테고리가 존재하므로 모델은 다음과 같이 모든 샘플에 대해서 아래의 질문을 모두 수행합니다.
Ex) 이 텍스트가 0번 레이블에 해당하는가? yes or no => no. 이 텍스트가 1번 레이블에 해당하는가? yes or no => no. 이 텍스트가 2번 레이블에 해당하는가? yes or no => yes. 이 텍스트가 3번 레이블에 해당하는가? yes or no => yes. 이 텍스트가 4번 레이블에 해당하는가? yes or no => no. 이 텍스트가 5번 레이블에 해당하는가? yes or no => no. 이 텍스트가 6번 레이블에 해당하는가? yes or no => no. 이 텍스트가 7번 레이블에 해당하는가? yes or no => no. 이 텍스트가 8번 레이블에 해당하는가? yes or no => no.
예측 : [2, 3]
# 정답인 레이블의 위치에는 1, 나머지 위치에는 0을 기록합니다.
# 레이블 전처리 예시)
# [8] -> [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0] : 의미적으로는 [no, no, no, no, no, no, no, no, no, yes]
# [2, 3] -> [0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0] : 의미적으로는 [no, no, yes, yes, no, no, no, no, no, no]
from sklearn.preprocessing import MultiLabelBinarizer
enc = MultiLabelBinarizer()
def multi_label(example):
enc_label = enc.fit_transform(example['label'])
float_arr = np.vstack(enc_label[:]).astype(float)
update_label = float_arr.tolist()
return update_label
train_labels = multi_label(train)
validation_labels = multi_label(validation)
test_labels = multi_label(test)
test_sentences[:5]
# 각 레이블은 기존에 [8], [2, 3], [2], [0], [8] 이었으며 전처리 후 아래와 같이 변경됨.
test_labels[:5]4 BERT 토크나이저를 이용한 전처리
BERT를 사용하기 위해서는 토크나이저와 모델이 반드시 맵핑 관계여야만 합니다. 다시 말해 아래의 이름에 들어가는 모델이름은 반드시 동일해야 합니다.
BertTokenizer.from_pretrained('모델이름')BertForSequenceClassification.from_pretrained("모델이름")
토크나이저는 내부적으로 Vocabulary를 갖고 있어 정수 인코딩을 수행해주는 모듈입니다.
# 한국어 BERT 중 하나인 'klue/bert-base'를 사용.
tokenizer = BertTokenizer.from_pretrained('klue/bert-base')
tokenized_text = tokenizer.tokenize('안녕하세요. 자연어 처리를 배울거에요.')
input_id = tokenizer.convert_tokens_to_ids(tokenized_text)
print('토큰화 된 문장 :', tokenized_text)
print('정수 인코딩 된 문장 :', input_id)
MAX_LEN = 128
def data_to_tensor (sentences, labels):
# 정수 인코딩 과정. 각 텍스트를 토큰화한 후에 Vocabulary에 맵핑되는 정수 시퀀스로 변환한다.
# ex) ['안녕하세요'] ==> ['안', '녕', '하세요'] ==> [231, 52, 45]
tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
# pad_sequences는 패딩을 위한 모듈. 주어진 최대 길이를 위해서 뒤에서 0으로 채워준다.
# ex) [231, 52, 45] ==> [231, 52, 45, 0, 0, 0]
input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
attention_masks = []
for seq in input_ids:
seq_mask = [float(i > 0) for i in seq]
attention_masks.append(seq_mask)
tensor_inputs = torch.tensor(input_ids)
tensor_labels = torch.tensor(labels)
tensor_masks = torch.tensor(attention_masks)
return tensor_inputs, tensor_labels, tensor_masks
# 훈련 데이터, 검증 데이터, 텍스트 데이터에 대해서 data_to_tensor 함수를 통해서 정수 인코딩 된 데이터, 레이블, 어텐션 마스크를 얻습니다.
train_inputs, train_labels, train_masks = data_to_tensor(train_sentences, train_labels)
validation_inputs, validation_labels, validation_masks = data_to_tensor(validation_sentences, validation_labels)
test_inputs, test_labels, test_masks = data_to_tensor(test_sentences, test_labels)
배치 크기는 32로 하고 파이토치의 데이터로더(배치 단위로 데이터를 꺼내올 수 있도록 하는 모듈)로 변환합니다.
batch_size = 32
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)
test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)
print('훈련 데이터의 크기:', len(train_labels))
print('검증 데이터의 크기:', len(validation_labels))
print('테스트 데이터의 크기:', len(test_labels))5 GPU가 정상 셋팅되었는지 확인.
Colab에서 GPU를 사용하기 위해서는 아래와 같이 설정이 되어있어야만 합니다.
런타임 > 런타임 유형 변경 > 하드웨어 가속기 > ‘GPU’ 선택
6 모델로드
BERT를 사용하여 텍스트를 분류하는 BERT 아키텍처는 BertForSequenceClassification.from_pretrained(“모델이름”)을 넣어서 가능합니다.
이때 인자값으로 “problem_type”에 “multi_label_classification” 값으로 넣으면 다중 레이블 분류 형태로 모델을 로드합니다.
num_labels = 9
model = BertForSequenceClassification.from_pretrained("klue/bert-base", num_labels=num_labels, problem_type="multi_label_classification")
model.cuda()
# 옵티마이저 선택
optimizer = AdamW(model.parameters(),
lr = 2e-5,
eps = 1e-8
)
# 몇 번의 에포크(전체 데이터에 대한 학습 횟수)를 할 것인지 선택
epochs = 4
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer,
num_warmup_steps = 0,
num_training_steps = total_steps)
def format_time(elapsed):
elapsed_rounded = int(round((elapsed)))
return str(datetime.timedelta(seconds=elapsed_rounded)) # hh:mm:ss
def multi_label_metrics(predictions, labels, threshold=0.5):
# 모델의 예측에 대해서 시그모이드 함수값을 통과시킨다. (batch_size, num_labels)
sigmoid = torch.nn.Sigmoid()
probs = sigmoid(torch.Tensor(predictions))
# 만약 threshold 값을 넘는 경우에는 1로 예측했다고 간주한다.
# threshold 값은 일반적으로 로지스틱 회귀 방식에 의하여 0.5를 선택하는 것이 일반적이다.
y_pred = np.zeros(probs.shape)
y_pred[np.where(probs >= threshold)] = 1
y_true = labels
# 사용 가능한 메트릭들을 사용한다.
accuracy = accuracy_score(y_true, y_pred)
f1_macro_average = f1_score(y_true=y_true, y_pred=y_pred, average='macro', zero_division=0)
f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro', zero_division=0)
f1_weighted_average = f1_score(y_true=y_true, y_pred=y_pred, average='weighted', zero_division=0)
roc_auc = roc_auc_score(y_true, y_pred, average = 'micro')
# 메트릭 결과에 대해서 리턴
metrics = {'accuracy': accuracy,
'f1_macro': f1_macro_average,
'f1_micro': f1_micro_average,
'f1_weighted': f1_weighted_average,
'roc_auc': roc_auc}
return metrics7 모델 학습
# 랜덤 시드값.
seed_val = 777
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)
model.zero_grad()
for epoch_i in range(0, epochs):
print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
t0 = time.time()
total_loss = 0
model.train()
for step, batch in tqdm(enumerate(train_dataloader)):
if step % 500 == 0 and not step == 0:
elapsed = format_time(time.time() - t0)
print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))
batch = tuple(t.to(device) for t in batch)
b_input_ids, b_input_mask, b_labels = batch
outputs = model(b_input_ids,
token_type_ids=None,
attention_mask=b_input_mask,
labels=b_labels)
loss = outputs[0]
total_loss += loss.item()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # gradient clipping if it is over a threshold
optimizer.step()
scheduler.step()
model.zero_grad()
avg_train_loss = total_loss / len(train_dataloader)
print("")
print(" Average training loss: {0:.4f}".format(avg_train_loss))
print(" Training epcoh took: {:}".format(format_time(time.time() - t0)))8 검증 데이터에 대한 평가
t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []
for batch in validation_dataloader:
batch = tuple(t.to(device) for t in batch)
b_input_ids, b_input_mask, b_labels = batch
with torch.no_grad():
outputs = model(b_input_ids,
token_type_ids=None,
attention_mask=b_input_mask)
logits = outputs[0]
logits = logits.detach().cpu().numpy()
label_ids = b_labels.to('cpu').numpy()
for b in logits:
accum_logits.append(list(b))
for b in label_ids:
accum_label_ids.append(list(b))
accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = multi_label_metrics(accum_logits, accum_label_ids)
print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))
print("ROC-AUC: {0:.4f}".format(results['roc_auc']))9 모델 저장과 로드
10 테스트 데이터에 대한 평가
t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []
for step, batch in tqdm(enumerate(test_dataloader)):
if step % 100 == 0 and not step == 0:
elapsed = format_time(time.time() - t0)
print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(test_dataloader), elapsed))
batch = tuple(t.to(device) for t in batch)
b_input_ids, b_input_mask, b_labels = batch
with torch.no_grad():
outputs = model(b_input_ids,
token_type_ids=None,
attention_mask=b_input_mask)
logits = outputs[0]
logits = logits.detach().cpu().numpy()
label_ids = b_labels.to('cpu').numpy()
for b in logits:
accum_logits.append(list(b))
for b in label_ids:
accum_label_ids.append(list(b))
accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = multi_label_metrics(accum_logits, accum_label_ids)
print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))
print("ROC-AUC: {0:.4f}".format(results['roc_auc']))11 예측
from transformers import pipeline
pipe = pipeline("text-classification", model=model.cuda(), tokenizer=tokenizer, device=0, max_length=512,
return_all_scores=True, function_to_apply='sigmoid')
result = pipe('틀니들은 왜 그렇게 민폐를 끼치냐?')
print(result)
def prediction(text):
result = pipe(text)
return [label_dict[res['label']] for res in result[0] if res['score'] > 0.5]