bongminkim

chatbot_py files

This diff is collapsed. Click to expand it.
import torch
from get_data import tokenizer1
from torch.autograd import Variable
from chatspace import ChatSpace
spacer = ChatSpace()
def inference(device, args, TEXT, LABEL, model, sa_model):
from KoBERT.Sentiment_Analysis_BERT_main import bert_inference
sentence = input("문장을 입력하세요 : ")
se_list = [sentence]
# https://github.com/SKTBrain/KoBERT
# SKT 에서 공개한 KoBert Sentiment Analysis 를 통해 입력문장의 긍정 부정 판단.
sa_label = int(bert_inference(sa_model, se_list))
sa_token = ''
# SA Label 에 따른 encoder input 변화.
if sa_label == 0:
sa_token = TEXT.vocab.stoi['<nega>']
else:
sa_token = TEXT.vocab.stoi['<posi>']
enc_input = tokenizer1(sentence)
enc_input_index = []
for tok in enc_input:
enc_input_index.append(TEXT.vocab.stoi[tok])
# encoder input string to index tensor and plus <pad>
if args.per_soft:
enc_input_index.append(sa_token)
for j in range(args.max_len - len(enc_input_index)):
enc_input_index.append(TEXT.vocab.stoi['<pad>'])
enc_input_index = Variable(torch.LongTensor([enc_input_index]))
dec_input = torch.LongTensor([[LABEL.vocab.stoi['<sos>']]])
#print("긍정" if sa_label == 1 else "부정")
model.eval()
pred = []
for i in range(args.max_len):
y_pred = model(enc_input_index.to(device), dec_input.to(device))
y_pred_ids = y_pred.max(dim=-1)[1]
if (y_pred_ids[0, -1] == LABEL.vocab.stoi['<eos>']):
y_pred_ids = y_pred_ids.squeeze(0)
print(">", end=" ")
for idx in range(len(y_pred_ids)):
if LABEL.vocab.itos[y_pred_ids[idx]] == '<eos>':
pred_sentence = "".join(pred)
pred_str = spacer.space(pred_sentence)
print(pred_str)
break
else:
pred.append(LABEL.vocab.itos[y_pred_ids[idx]])
return 0
dec_input = torch.cat(
[dec_input.to(torch.device('cpu')),
y_pred_ids[0, -1].unsqueeze(0).unsqueeze(0).to(torch.device('cpu'))], dim=-1)
return 0
\ No newline at end of file
import torch
from torchtext import data
from torchtext.data import TabularDataset
from torchtext.data import BucketIterator
from torchtext.vocab import Vectors
from konlpy.tag import Mecab
import re
from Styling import styling, make_special_token
# tokenizer
def tokenizer1(text):
result_text = re.sub('[-=+.,#/\:$@*\"※&%ㆍ!?』\\‘|\(\)\[\]\<\>`\'…》;]', '', text)
a = Mecab().morphs(result_text)
return ([a[i] for i in range(len(a))])
# 데이터 전처리 및 loader return
def data_preprocessing(args, device):
# ID는 사용하지 않음. SA는 Sentiment Analysis 라벨(0,1) 임.
ID = data.Field(sequential=False,
use_vocab=False)
TEXT = data.Field(sequential=True,
use_vocab=True,
tokenize=tokenizer1,
batch_first=True,
fix_length=args.max_len,
dtype=torch.int32
)
LABEL = data.Field(sequential=True,
use_vocab=True,
tokenize=tokenizer1,
batch_first=True,
fix_length=args.max_len,
init_token='<sos>',
eos_token='<eos>',
dtype=torch.int32
)
SA = data.Field(sequential=False,
use_vocab=False)
train_data, test_data = TabularDataset.splits(
path='.', train='chatbot_0325_ALLLABEL_train.txt', test='chatbot_0325_ALLLABEL_test.txt', format='tsv',
fields=[('id', ID), ('text', TEXT), ('target_text', LABEL), ('SA', SA)], skip_header=True
)
vectors = Vectors(name="kr-projected.txt")
# TEXT, LABEL 에 필요한 special token 만듦.
text_specials, label_specials = make_special_token(args)
TEXT.build_vocab(train_data, vectors=vectors, max_size=15000, specials=text_specials)
LABEL.build_vocab(train_data, vectors=vectors, max_size=15000, specials=label_specials)
train_loader = BucketIterator(dataset=train_data, batch_size=args.batch_size, device=device, shuffle=True)
test_loader = BucketIterator(dataset=test_data, batch_size=args.batch_size, device=device, shuffle=True)
# BucketIterator(dataset=traing_data check)
return TEXT, LABEL, train_loader, test_loader
import torch
# acc 출력
def acc(yhat, y):
with torch.no_grad():
yhat = yhat.max(dim=-1)[1] # [0]: max value, [1]: index of max value
acc = (yhat == y).float()[y != 1].mean() # padding은 acc에서 제거
return acc
# 학습시 모델에 넣는 입력과 모델의 예측 출력.
def train_test(step, y_pred, dec_output, real_value_index, enc_input, args, TEXT, LABEL):
if 0 <= step < 3:
_, ix = y_pred[real_value_index].data.topk(1)
train_Q = enc_input[0]
print("<<Q>> :", end=" ")
for i in train_Q:
if TEXT.vocab.itos[i] == "<pad>":
break
print(TEXT.vocab.itos[i], end=" ")
print("\n<<trg A>> :", end=" ")
for jj, jx in enumerate(dec_output[real_value_index]):
if LABEL.vocab.itos[jx] == "<eos>":
break
print(LABEL.vocab.itos[jx], end=" ")
print("\n<<pred A>> :", end=" ")
for jj, ix in enumerate(ix):
if jj == args.max_len:
break
if LABEL.vocab.itos[ix] == '<eos>':
break
print(LABEL.vocab.itos[ix], end=" ")
print("\n")
import torch
import torch.nn as nn
import math
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
class Transformer(nn.Module):
def __init__(self, args, SRC_vocab, TRG_vocab):
super(Transformer, self).__init__()
self.d_model = args.embedding_dim
self.n_head = args.nhead
self.num_encoder_layers = args.nlayers
self.num_decoder_layers = args.nlayers
self.dim_feedforward = args.embedding_dim
self.dropout = args.dropout
self.SRC_vo = SRC_vocab
self.TRG_vo = TRG_vocab
self.pos_encoder = PositionalEncoding(self.d_model, self.dropout)
self.src_embedding = nn.Embedding(len(self.SRC_vo.vocab), self.d_model)
self.trg_embedding = nn.Embedding(len(self.TRG_vo.vocab), self.d_model)
self.transfomrer = torch.nn.Transformer(d_model=self.d_model,
nhead=self.n_head,
num_encoder_layers=self.num_encoder_layers,
num_decoder_layers=self.num_decoder_layers,
dim_feedforward=self.dim_feedforward,
dropout=self.dropout)
self.proj_vocab_layer = nn.Linear(
in_features=self.dim_feedforward, out_features=len(self.TRG_vo.vocab))
#self.apply(self._initailze)
def forward(self, en_input, de_input):
x_en_embed = self.src_embedding(en_input.long()) * math.sqrt(self.d_model)
x_de_embed = self.trg_embedding(de_input.long()) * math.sqrt(self.d_model)
x_en_embed = self.pos_encoder(x_en_embed)
x_de_embed = self.pos_encoder(x_de_embed)
# Masking
src_key_padding_mask = en_input == self.SRC_vo.vocab.stoi['<pad>']
tgt_key_padding_mask = de_input == self.TRG_vo.vocab.stoi['<pad>']
memory_key_padding_mask = src_key_padding_mask
tgt_mask = self.transfomrer.generate_square_subsequent_mask(de_input.size(1))
x_en_embed = torch.einsum('ijk->jik', x_en_embed)
x_de_embed = torch.einsum('ijk->jik', x_de_embed)
feature = self.transfomrer(src=x_en_embed,
tgt=x_de_embed,
src_key_padding_mask=src_key_padding_mask,
tgt_key_padding_mask=tgt_key_padding_mask,
memory_key_padding_mask=memory_key_padding_mask,
tgt_mask=tgt_mask.to(device))
logits = self.proj_vocab_layer(feature)
logits = torch.einsum('ijk->jik', logits)
return logits
def _initailze(self, layer):
if isinstance(layer, (nn.Linear)):
nn.init.kaiming_uniform_(layer.weight)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout, max_len=15000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
from torch.optim.lr_scheduler import _LRScheduler
from torch.optim.lr_scheduler import ReduceLROnPlateau
class GradualWarmupScheduler(_LRScheduler):
""" Gradually warm-up(increasing) learning rate in optimizer.
Proposed in 'Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour'.
Args:
optimizer (Optimizer): Wrapped optimizer.
multiplier: target learning rate = base lr * multiplier
total_epoch: target learning rate is reached at total_epoch, gradually
after_scheduler: after target_epoch, use this scheduler(eg. ReduceLROnPlateau)
"""
def __init__(self, optimizer, multiplier, total_epoch, after_scheduler=None):
self.multiplier = multiplier
if self.multiplier <= 1.:
raise ValueError('multiplier should be greater than 1.')
self.total_epoch = total_epoch
self.after_scheduler = after_scheduler
self.finished = False
super().__init__(optimizer)
def get_lr(self):
if self.last_epoch > self.total_epoch:
if self.after_scheduler:
if not self.finished:
self.after_scheduler.base_lrs = [base_lr * self.multiplier for base_lr in self.base_lrs]
self.finished = True
return self.after_scheduler.get_lr()
return [base_lr * self.multiplier for base_lr in self.base_lrs]
return [base_lr * ((self.multiplier - 1.) * self.last_epoch / self.total_epoch + 1.) for base_lr in self.base_lrs]
def step_ReduceLROnPlateau(self, metrics, epoch=None):
if epoch is None:
epoch = self.last_epoch + 1
self.last_epoch = epoch if epoch != 0 else 1 # ReduceLROnPlateau is called at the end of epoch, whereas others are called at beginning
if self.last_epoch <= self.total_epoch:
warmup_lr = [base_lr * ((self.multiplier - 1.) * self.last_epoch / self.total_epoch + 1.) for base_lr in self.base_lrs]
for param_group, lr in zip(self.optimizer.param_groups, warmup_lr):
param_group['lr'] = lr
else:
if epoch is None:
self.after_scheduler.step(metrics, None)
else:
self.after_scheduler.step(metrics, epoch - self.total_epoch)
def step(self, epoch=None, metrics=None):
if type(self.after_scheduler) != ReduceLROnPlateau:
if self.finished and self.after_scheduler:
if epoch is None:
self.after_scheduler.step(None)
else:
self.after_scheduler.step(epoch - self.total_epoch)
else:
return super(GradualWarmupScheduler, self).step(epoch)
else:
self.step_ReduceLROnPlateau(metrics, epoch)