RNNsearch 모델은 기존의 neural machine translation 시스템들이 보조적인 역할에 국한되었던 것에 반해, attention 개념의 alignment를 통해 기존 모델들과 달리 긴 문장 입력의 경우에도 효과적으로 번역하는 독립적인 시스템을 선보인 모델이다.
본 포스트에서는 RNNsearch 모델이 소개된 Neural_Machine_Translation_by_Jointly_Learning_to_Align_and_Translate 논문을 참고하여 구현해볼 것이다. 논문에서 사용한 데이터셋의 경우에는 영어와 프랑스어 사이의 번역 데이터였지만, 다른 포스트에서 구축한 한국어 - 영어 번역 데이터를 이용해볼 것이다.
먼저 모델의 입력과 출력이다. 모델은 K 차원의 coded word vector들을 입력으로 받고 번역한 결과를 출력으로 만들어 낸다. 따라서 우리는 입력된 단어들을 임베딩 레이어를 이용하여 K 차원의 coded word vector들로 임베딩할 것이다.
그리고 encoder는 위와 같이 bidirectional rnn을 사용하여 구현하고 그것의 정방향과 역방향 hidden unit들은 torch.cat 메소드를 이용하여 7번 수식과 같이 묶어줄 것이다.
마지막으로 논문에 따르면 decoder의 첫 번째 hidden state, 즉 encoder가 생성해서 decoder로 넘겨주는 hidden state는 Linear Layer를 거친 후에 tanh activation을 적용해준다고 나와있다. 따라서 encoder의 forward propagation의 마지막 과정에는 이 연산을 수행하도록 구현할 것이다.
지금까지 살펴본 것처럼 하면 다음과 같이 RNNsearch 모델의 encoder가 완성된다.
class Encoder(nn.Module):
def __init__(self, n_inputs, n_embeddings, n_hiddens):
super().__init__()
self.n_hiddens = n_hiddens
self.embedding = nn.Embedding(n_inputs, n_embeddings)
self.bidirectional_gru = nn.GRU(n_embeddings, n_hiddens, bidirectional=True)
self.fc = nn.Linear(n_hiddens * 2, n_hiddens)
def init_hidden(self, batch_size):
weight = next(self.parameters())
h0 = weight.new_zeros(2, batch_size, self.n_hiddens)
return h0
def forward(self, x):
x = self.embedding(x)
output, hidden = self.bidirectional_gru(x)
hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
hidden = torch.tanh(self.fc(hidden))
return output, hidden
Encoder가 구현이 끝났으니 이제는 Decoder를 구현할 차례이다.
먼저 Decoder는 다음과 같은 수식으로 표현이 된다.
RNNsearch의 Decoder도 Encoder와 마찬가지로 GRU를 사용하지만, 단순히 Encoder의 출력 값과 hidden state만 GRU의 입력으로 들어가는 것이 아니라 Attention을 의미하는 context vector c도 함께 들어간다. 따라서 Decoder를 구현하기 위해서는 context vector를 구해하는 것에 필요한 engergy를 의미하는 $e_{ij}$ 혹은 $a(s_{i-1},h_j)$를 만들어내는 alignment model을 먼저 구현한 후에 softmax 연산을 통해 $a_{ij}$를 구하고 마지막으로 context vector를 구할 수 있다.
Alignment 모델은 논문에서 다음 부분을 참조하면 간단히 구현할 수 있다.
완성된 Alignment Model은 다음과 같다.
class Alignment(nn.Module):
def __init__(self, n_hiddens):
super().__init__()
self.n_hiddens = n_hiddens
self.v = nn.Parameter(nn.init.uniform_(torch.empty(n_hiddens)))
self.align = nn.Linear(self.n_hiddens * 3, self.n_hiddens)
def forward(self, h , s):
e = torch.cat([h, s], dim = 2)
e = torch.tanh(self.align(e))
e = e.transpose(1, 2)
v = self.v.repeat(s.size(0), 1).unsqueeze(1)
e = torch.bmm(v, e)
return e.squeeze(1)
위와 같이 alignment model을 구현하였으면 이제는 이를 이용해서 각 hidden state의 attention을 구해야 한다.
Attention은 alignment model에서 구해낸 energy에 soft max를 적용하면 구할 수 있다.
앞서 구현한 alignment model을 활용하여 attention의 구현은 다음과 같다.
class Attention(nn.Module):
def __init__(self, n_hiddens):
super().__init__()
self.n_hiddens = n_hiddens
self.align = Alignment(self.n_hiddens)
def forward(self, h, s):
time_step = s.shape[0]
h = h.unsqueeze(1)
h = h.repeat(1, time_step, 1)
s = s.permute(1, 0, 2)
energy = self.align(h, s)
return F.softmax(energy, dim=1).unsqueeze(1)
이제 본격적으로 Decoder를 구현할 차례이다. Decoder는 앞서 봤던 논문의 다음 부분을 참조하면 된다.
또한 논문에 따르면 Decoder의 출력을 Maxout Layer를 통해 후처리 한다고 나와있다.
Maxout Layer는 Goodfellow의 논문에서 아래의 부분을 참조하여 직접 구현할 수 있는데
나는 이것까지 구현하기는 너무 귀찮아서 그냥 아래 링크에서 가져다 썼다.
class Maxout(nn.Module):
def __init__(self, d_in, d_out, pool_size):
super().__init__()
self.d_in, self.d_out, self.pool_size = d_in, d_out, pool_size
self.lin = nn.Linear(d_in, d_out * pool_size)
def forward(self, inputs):
shape = list(inputs.size())
shape[-1] = self.d_out
shape.append(self.pool_size)
max_dim = len(shape) - 1
out = self.lin(inputs)
m, i = out.view(*shape).max(max_dim)
return m
Maxout Layer를 구현하여 $t_i$를 구하였으니 이제 드디어 Decoder를 구현할 차례이다.
논문에 따르면 $i$번째 단어 $y_i$의 확률은 softmax를 활용하여 구한다고 되어있다.
그래서 이를 반영한 Decoder는 다음과 같다.
class Decoder(nn.Module):
def __init__(self, n_outputs, n_embeddings, n_hiddens, n_maxout):
super().__init__()
self.n_hiddens = n_hiddens
self.embedding = nn.Embedding(n_outputs, n_embeddings)
self.attention_layer = Attention(self.n_hiddens)
self.gru = nn.GRU(n_embeddings + n_hiddens * 2, n_hiddens)
self.maxout = Maxout(n_hiddens * 3 + n_embeddings, n_maxout, 2)
self.out = nn.Linear(n_maxout, n_outputs)
def forward(self, input, h, s):
embedded = self.embedding(input)
attention = self.attention_layer(h, s)
context = attention.bmm(s.transpose(0, 1)).transpose(0, 1)
embedded = embedded.unsqueeze(0)
input = torch.cat([embedded, context], 2)
h = h.unsqueeze(0)
out, hidden = self.gru(input, h)
maxout_input = torch.cat([h, embedded, context], dim=2)
out = self.maxout(maxout_input).squeeze(0)
out = self.out(out)
out = F.log_softmax(out, dim=1)
return out, hidden.squeeze(0)
마지막으로 지금까지 구현한 Encoder와 Decoder를 이용하여 구현한 RNNsearch 모델은 다음과 같다.
아래 코드에서 임의의 확률로 decoder가 정답 문장을 참조하도록 하였는데, 논문에는 언급되지 않았지만, 이렇게 하면 모델의 학습이 조금 더 원활해지기에 임의로 집어넣었다.
class RNNsearch(nn.Module):
def __init__(self, n_inputs, n_outputs, n_embeddings, n_hiddens, n_maxout, device):
super().__init__()
self.n_outputs = n_outputs
self.device = device
self.encoder = Encoder(n_inputs, n_embeddings, n_hiddens)
self.decoder = Decoder(n_outputs, n_embeddings, n_hiddens, n_maxout)
def forward(self, x, target, teacher_forcing_ratio):
encoder_outputs, hiddens = self.encoder(x)
output = target[0, :]
outputs = torch.zeros(target.shape[0], target.shape[1], self.n_outputs).to(self.device)
for t in range(1, target.shape[0]):
output, hiddens = self.decoder(output, hiddens, encoder_outputs)
outputs[t] = output
teacher_force = random.random() < teacher_forcing_ratio
output = target[t] if teacher_force else output.argmax(1)
return outputs
RNNsearch 모델의 구현이 끝났으니 이제는 모델을 학습시켜볼 차례이다. 그러기 위해서는 먼저 모델을 생성해야 하는데, RNNsearch 모델 생성자는 인자로 hidden layer와 input, output, embedding layer, maxout layer의 크기를 요구한다.
RNNsearch은 한국어를 영어로 번역한다. 따라서 input과 output의 크기는 한국어와 영어의 vocab의 크기이다. 그리고 hidden layer의 크기와 embedding, maxout layer의 크기는 논문의 appendix에 상세히 나와있다.
그리고 optimizer로는 $\epsilon = 10^{-6}$, $\rho = 0.95$를 가지는 Adadelta를 사용하였다고 나와있다.
from rnnsearch import RNNsearch
n_hiddens = 1000 #1000
n_inputs = len(kor_vocab)
n_outputs = len(eng_vocab)
n_embeddings = 620 #620
n_maxout = 500 # 500
model = RNNsearch(n_inputs, n_outputs, n_embeddings, n_hiddens, n_hiddens, 'cuda')
model.to('cuda')
optimizer = optim.Adadelta(model.parameters(), eps=1e-6, rho=0.95)
#optimizer = optim.Adam(model.parameters())
이제는 모델을 학습시키는 코드를 구현해볼 차례이다. 그런데 내가 못 찾은 건지는 몰라도 모델을 학습시키는 방법은 cost function의 gradient의 $L_2-norm$을 normalize 하고 batch size는 80으로 정한다는 것 외에는 찾을 수 없었다. 그래서 그냥 임의로 구현해주었다.
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
BATCH_SIZE = 8
PAD_IDX = kor_vocab['<pad>']
BOS_IDX = kor_vocab['<bos>']
EOS_IDX = kor_vocab['<eos>']
device = torch.device('cuda')
def generate_batch(data_batch):
kor_batch, eng_batch = [], []
for (kor_item, eng_item) in data_batch:
kor_batch.append(torch.cat([torch.tensor([BOS_IDX]), kor_item, torch.tensor([EOS_IDX])], dim=0))
eng_batch.append(torch.cat([torch.tensor([BOS_IDX]), eng_item, torch.tensor([EOS_IDX])], dim=0))
kor_batch = pad_sequence(kor_batch, padding_value=PAD_IDX)
eng_batch = pad_sequence(eng_batch, padding_value=PAD_IDX)
return kor_batch, eng_batch
train_iter = DataLoader(train_data, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=generate_batch, num_workers=0)
valid_iter = DataLoader(valid_data, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=generate_batch, num_workers=0)
test_iter = DataLoader(test_data, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=generate_batch, num_workers=0)
import math
import time
writer = SummaryWriter('attention_logs')
step = 1
accumulation_step = 10
PAD_IDX = eng_vocab['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
def train(model: nn.Module,
iterator: torch.utils.data.DataLoader,
optimizer: optim.Optimizer,
criterion: nn.Module,
clip: float):
global step
model.train()
epoch_loss = 0
for _, (src, trg) in tqdm(enumerate(iterator)):
src, trg = src.to(device), trg.to(device)
output = model(src, trg, 0.5)
output = output[1:].view(-1, output.shape[-1])
trg = trg[1:].view(-1)
loss = criterion(output, trg)
loss.backward()
if (_ + 1) % accumulation_step == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
optimizer.zero_grad()
writer.add_scalar('attention_log/train_loss', loss.item(), step)
step += 1
epoch_loss += loss.item()
return epoch_loss / len(iterator)
def evaluate(model: nn.Module,
iterator: torch.utils.data.DataLoader,
criterion: nn.Module):
model.eval()
epoch_loss = 0
with torch.no_grad():
for _, (src, trg) in enumerate(iterator):
src, trg = src.to(device), trg.to(device)
output = model(src, trg, 0) #turn off teacher forcing
output = output[1:].view(-1, output.shape[-1])
trg = trg[1:].view(-1)
loss = criterion(output, trg)
epoch_loss += loss.item()
return epoch_loss / len(iterator)
def epoch_time(start_time: int,
end_time: int):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
N_EPOCHS = 1 # 10
CLIP = 1
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss = train(model, train_iter, optimizer, criterion, CLIP)
valid_loss = evaluate(model, valid_iter, criterion)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
test_loss = evaluate(model, test_iter, criterion)
print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')
일단 Loss Function은 Cross Entropy를 사용하였으며, padding 한 부분은 무시하도록 하였다. 또한 batch size가 16으로 설정하면 학습 도중에 Out of Memory로 프로그램이 죽어버려서 batch size는 8로 설정하고 gradient accumation을 이용하여 논문에 나온 것처럼 80개의 문장마다 학습하도록 하였다. 그리고 논문에서 언급된 것처럼 gradient를 normalize 하기 위해 pytorch의 clip_grad_norm_ 메소드를 사용하였다.
학습 결과는 다음과 같다.
1 epoch만 학습시키는 것만 해도 10시간 정도 소요되어 학습은 1 epoch만 하였다.
그래서 그런지 아래의 inference 함수를 구현하여 임의의 문장을 번역하도록 하였는데 상당히 안 좋은 결과가 나왔다...
다른 사람이 구현한 것을 가져와서 학습시켜도 결과가 좋지 못한 것을 보면 vocab의 크기가 모델에 비해 너무 커서 그런 거 같기도 하고
def inference(model, src_vocab, trg_vocab, src_tokenizer, srcs, device):
tokens = []
for src in srcs:
tokens.append([src_vocab[s] for s in src_tokenizer(src)])
tokens = torch.LongTensor(tokens).cuda().transpose(0, 1)
v = list(trg_vocab.get_stoi().values())
k = list(trg_vocab.get_stoi().keys())
trg_dict = {}
for i in tqdm(range(len(trg_vocab.get_stoi()))):
trg_dict[v[i]] = k[i]
model.eval()
with torch.no_grad():
out = model(tokens, torch.LongTensor([[t for t in range(50)] for t in range(1)]).transpose(0, 1).cuda(), 0).detach().cpu()
out = out.transpose(0, 1)
res = []
for o in out:
res.append(' '.join([trg_dict[t.item()] for t in F.softmax(o, dim=1).argmax(1) if not trg_dict[t.item()] == '<eos>']))
return res
RNNsearch의 전체적인 코드는 다음 링크에 있다.