Seq2Seq模型实现文本翻译¶
概述¶
序列到序列模型(sequence to sequence model),又名Seq2Seq模型。它是一种循环神经网络(Recurrent Neural Network,RNN)的变种,突破了原本RNN模型对于输入和输出序列长度的限制,做到将输入序列映射到另一个长度不同的输出序列,因此常用于机器翻译的任务。
Seq2Seq模型一般结构为编码器(encoder)+ 解码器(decoder),前者负责把输入序列编码成一个固定长度的向量,后者将这个向量转化为可变长度的向量。
后来,人们在encoder-decoder的基础上引入了注意力机制(attention),使模型在各个任务中的表现更为出色。
数据准备¶
我们本次使用的数据集为Multi30K数据集,它是一个大规模的图像-文本数据集,包含30K+图片,每张图片对应两类不同的文本描述:
英语描述,及对应的德语翻译;
五个独立的、非翻译而来的英语和德语描述,描述中包含的细节并不相同;
因其收集的不同语言对于图片的描述相互独立,所以训练出的模型可以更好地适用于有噪声的多模态内容。
图片来源:
Elliott, D., Frank, S., Sima’an, K., & Specia, L. (2016). Multi30K: Multilingual English-German Image Descriptions. CoRR, 1605.00459.
首先,我们需要安装如下依赖:
BLEU Score计算:
pip install nltk
数据下载模块¶
使用download
进行数据下载,并将tar.gz
文件解压到指定文件夹。
下载好的数据集目录结构如下:
home_path/.mindspore_examples
├─test
│ test2016.de
│ test2016.en
│ test2016.fr
│
├─train
│ train.de
│ train.en
│
└─valid
val.de
val.en
[2]:
from download import download
from pathlib import Path
from tqdm import tqdm
import os
# 训练、验证、测试数据集下载地址
urls = {
'train': 'http://www.quest.dcs.shef.ac.uk/wmt16_files_mmt/training.tar.gz',
'valid': 'http://www.quest.dcs.shef.ac.uk/wmt16_files_mmt/validation.tar.gz',
'test': 'http://www.quest.dcs.shef.ac.uk/wmt17_files_mmt/mmt_task1_test2016.tar.gz'
}
# 指定保存路径为 `home_path/.mindspore_examples`
cache_dir = Path.home() / '.mindspore_examples'
train_path = download(urls['train'], os.path.join(cache_dir, 'train'), kind='tar.gz')
valid_path = download(urls['valid'], os.path.join(cache_dir, 'valid'), kind='tar.gz')
test_path = download(urls['test'], os.path.join(cache_dir, 'test'), kind='tar.gz')
数据预处理¶
在使用数据进行模型训练等操作时,我们需要对数据进行预处理,流程如下:
加载数据集,目前数据为句子形式的文本,需要进行分词,即将句子拆解为单独的词元(token,可以为字符或者单词);
将每个词元映射到从0开始的数字索引中(为节约存储空间,可过滤掉词频低的词元),词元和数字索引所构成的集合叫做词典(vocabulary);
添加特殊占位符,标明序列的起始与结束,统一序列长度,并创建数据迭代器;
数据加载器¶
[3]:
import re
class Multi30K():
"""Multi30K数据集加载器
加载Multi30K数据集并处理为一个Python迭代对象。
"""
def __init__(self, path):
self.data = self._load(path)
def _load(self, path):
def tokenize(text):
# 对句子进行分词,统一大小写
text = text.rstrip()
return [tok.lower() for tok in re.findall(r'\w+|[^\w\s]', text)]
# 读取Multi30K数据,并进行分词
members = {i.split('.')[-1]: i for i in os.listdir(path)}
de_path = os.path.join(path, members['de'])
en_path = os.path.join(path, members['en'])
with open(de_path, 'r') as de_file:
de = de_file.readlines()[:-1]
de = [tokenize(i) for i in de]
with open(en_path, 'r') as en_file:
en = en_file.readlines()[:-1]
en = [tokenize(i) for i in en]
return list(zip(de, en))
def __getitem__(self, idx):
return self.data[idx]
def __len__(self):
return len(self.data)
[4]:
train_dataset, valid_dataset, test_dataset = Multi30K(train_path), Multi30K(valid_path), Multi30K(test_path)
对解压和分词结果进行测试,打印测试数据集第一组英德语文本,可以看到每一个单词和标点符号已经被单独分离出来。
[5]:
for de, en in test_dataset:
print(f'de = {de}')
print(f'en = {en}')
break
de = ['ein', 'mann', 'mit', 'einem', 'orangefarbenen', 'hut', ',', 'der', 'etwas', 'anstarrt', '.']
en = ['a', 'man', 'in', 'an', 'orange', 'hat', 'starring', 'at', 'something', '.']
词典¶
[7]:
class Vocab:
"""通过词频字典,构建词典"""
special_tokens = ['<unk>', '<pad>', '<bos>', '<eos>']
def __init__(self, word_count_dict, min_freq=1):
self.word2idx = {}
for idx, tok in enumerate(self.special_tokens):
self.word2idx[tok] = idx
# 过滤低词频的词元
filted_dict = {
w: c
for w, c in word_count_dict.items() if c >= min_freq
}
for w, _ in filted_dict.items():
self.word2idx[w] = len(self.word2idx)
self.idx2word = {idx: word for word, idx in self.word2idx.items()}
self.bos_idx = self.word2idx['<bos>'] # 特殊占位符:序列开始
self.eos_idx = self.word2idx['<eos>'] # 特殊占位符:序列结束
self.pad_idx = self.word2idx['<pad>'] # 特殊占位符:补充字符
self.unk_idx = self.word2idx['<unk>'] # 特殊占位符:低词频词元或未曾出现的词元
def _word2idx(self, word):
"""单词映射至数字索引"""
if word not in self.word2idx:
return self.unk_idx
return self.word2idx[word]
def _idx2word(self, idx):
"""数字索引映射至单词"""
if idx not in self.idx2word:
raise ValueError('input index is not in vocabulary.')
return self.idx2word[idx]
def encode(self, word_or_list):
"""将单个单词或单词数组映射至单个数字索引或数字索引数组"""
if isinstance(word_or_list, list):
return [self._word2idx(i) for i in word_or_list]
return self._word2idx(word_or_list)
def decode(self, idx_or_list):
"""将单个数字索引或数字索引数组映射至单个单词或单词数组"""
if isinstance(idx_or_list, list):
return [self._idx2word(i) for i in idx_or_list]
return self._idx2word(idx_or_list)
def __len__(self):
return len(self.word2idx)
通过自定义词频字典进行测试,我们可以看到词典已去除词频少于2的词元c,并加入了默认的四个特殊占位符,故词典整体长度为:4 - 1 + 4 = 7
[8]:
word_count = {'a':20, 'b':10, 'c':1, 'd':2}
vocab = Vocab(word_count, min_freq=2)
len(vocab)
[8]:
7
使用collections
中的Counter
和OrderedDict
统计英/德语每个单词在整体文本中出现的频率。构建词频字典,然后再将词频字典转为词典。
在分配数字索引时有一个小技巧:常用的词元对应数值较小的索引,这样可以节约空间。
[9]:
from collections import Counter, OrderedDict
def build_vocab(dataset):
de_words, en_words = [], []
for de, en in dataset:
de_words.extend(de)
en_words.extend(en)
de_count_dict = OrderedDict(sorted(Counter(de_words).items(), key=lambda t: t[1], reverse=True))
en_count_dict = OrderedDict(sorted(Counter(en_words).items(), key=lambda t: t[1], reverse=True))
return Vocab(de_count_dict, min_freq=2), Vocab(en_count_dict, min_freq=2)
[10]:
de_vocab, en_vocab = build_vocab(train_dataset)
print('Unique tokens in de vocabulary:', len(de_vocab))
Unique tokens in de vocabulary: 7882
数据迭代器¶
数据预处理的最后一步是创建数据迭代器,我们在进一步处理数据(包括批处理,添加起始和终止符号,统一序列长度)后,将数据以张量的形式返回。
创建数据迭代器需要如下参数:
dataset
:分词后的数据集de_vocab
:德语词典en_vocab
:英语词典batch_size
:批量大小,即一个batch中包含多少个序列max_len
:序列最大长度,为最长有效文本长度 + 2(序列开始、序列结束占位符),如不满则补齐,如超过则丢弃drop_remainder
:是否在最后一个batch未满时,丢弃该batch
[11]:
import mindspore
class Iterator():
"""创建数据迭代器"""
def __init__(self, dataset, de_vocab, en_vocab, batch_size, max_len=32, drop_reminder=False):
self.dataset = dataset
self.de_vocab = de_vocab
self.en_vocab = en_vocab
self.batch_size = batch_size
self.max_len = max_len
self.drop_reminder = drop_reminder
length = len(self.dataset) // batch_size
self.len = length if drop_reminder else length + 1 # 批量数量
def __call__(self):
def pad(idx_list, vocab, max_len):
"""统一序列长度,并记录有效长度"""
idx_pad_list, idx_len = [], []
# 当前序列度超过最大长度时,将超出的部分丢弃;当前序列长度小于最大长度时,用占位符补齐
for i in idx_list:
if len(i) > max_len - 2:
idx_pad_list.append(
[vocab.bos_idx] + i[:max_len-2] + [vocab.eos_idx]
)
idx_len.append(max_len)
else:
idx_pad_list.append(
[vocab.bos_idx] + i + [vocab.eos_idx] + [vocab.pad_idx] * (max_len - len(i) - 2)
)
idx_len.append(len(i) + 2)
return idx_pad_list, idx_len
def sort_by_length(src, trg):
"""对德/英语的字段长度进行排序"""
data = zip(src, trg)
data = sorted(data, key=lambda t: len(t[0]), reverse=True)
return zip(*list(data))
def encode_and_pad(batch_data, max_len):
"""将批量中的文本数据转换为数字索引,并统一每个序列的长度"""
# 将当前批量数据中的词元转化为索引
src_data, trg_data = zip(*batch_data)
src_idx = [self.de_vocab.encode(i) for i in src_data]
trg_idx = [self.en_vocab.encode(i) for i in trg_data]
# 统一序列长度
src_idx, trg_idx = sort_by_length(src_idx, trg_idx)
src_idx_pad, src_len = pad(src_idx, de_vocab, max_len)
trg_idx_pad, _ = pad(trg_idx, en_vocab, max_len)
return src_idx_pad, src_len, trg_idx_pad
for i in range(self.len):
# 获取当前批量的数据
if i == self.len - 1 and not self.drop_reminder:
batch_data = self.dataset[i * self.batch_size:]
else:
batch_data = self.dataset[i * self.batch_size: (i+1) * self.batch_size]
src_idx, src_len, trg_idx = encode_and_pad(batch_data, self.max_len)
# 将序列数据转换为tensor
yield mindspore.Tensor(src_idx, mindspore.int32), \
mindspore.Tensor(src_len, mindspore.int32), \
mindspore.Tensor(trg_idx, mindspore.int32)
def __len__(self):
return self.len
[12]:
train_iterator = Iterator(train_dataset, de_vocab, en_vocab, batch_size=128, max_len=32, drop_reminder=True)
valid_iterator = Iterator(valid_dataset, de_vocab, en_vocab, batch_size=128, max_len=32, drop_reminder=False)
test_iterator = Iterator(test_dataset, de_vocab, en_vocab, batch_size=128, max_len=32, drop_reminder=False)
模型构建¶
编码器(Encoder)¶
在编码器中,我们输入一个序列\(X=\{x_1, x_2, ..., x_T\}\),在embedding层将其转化为向量,循环计算隐藏状态\(H=\{h_1, h_2, ..., h_T\}\),并在最后的隐藏状态中返回上下文向量\(z=h_T\)。
实现编码器的方式有很多种,在这里我们使用的是门控循环单元模型(Gated Rrecurrent Units, GRU)。它在原始RNN的基础上引入了门机制(gate mechanism),用以控制输入隐藏状态和从隐藏状态输出的信息。其中,更新门(update gate, 又称记忆门,一般用\(z_t\)表示)用于控制前一时刻的状态信息\(h_{t-1}\)被带入到当前状态\(h_t\)中的程度。重置门(reset gate,一般用\(r_t\)表示)控制前一状态\(h_t\)有多少信息被写入到当前候选集\(n_t\)上。
在进行文本翻译类任务时,我们一般使用双向GRU,即在训练中同时考虑当前词语之前及之后的文本内容。双向GRU的每层由两个RNN构成,前向RNN由左至右循环计算隐藏状态,反向RNN从右至左计算隐藏状态,公式表达如下:
每个RNN网络在观察到句子中的最后一个词后,输出一个上下文向量,前向RNN的输出为\(z^\rightarrow=h_T^\rightarrow\),反向RNN的输出为\(z^\leftarrow=h_T^\leftarrow\)。
编码器最终会返回两项:outputs
和hidden
。
outputs
为双向GRU最上层隐藏状态,形状为[max_len, batch_size, hid_dim * num_directions]。以\(t=1\)时刻为例,其对应的output为前向RNN中\(t=1\)时刻最上层隐藏状态和反向RNN中\(t=T\)时刻的结合,即\(h_1 = [h_1^\rightarrow; h_{T}^\leftarrow]\);hidden
表示每层的最终隐藏状态,即上文提到的上下文向量。后续将作为编码器初始时刻的隐藏状态\(s_0\),但由于编码器(decoder)的结构并不是双向的,仅仅需要一个上下文向量\(z\),为了与之对应,我们将编码器中的两个向量组合起来,放入全连接层\(g\)中,并最后使用激活函数\(tanh\);
MindSpore为大家提供了GRU的接口,可以在编码器搭建中直接调用,通过设置参数bidirectional=True
使用双向GRU。
[13]:
import mindspore
import mindspore.nn as nn
import mindspore.ops as ops
class Encoder(nn.Cell):
def __init__(self, input_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, is_ascend):
super().__init__()
self.embedding = nn.Embedding(input_dim, emb_dim) # Embedding层
if is_ascend:
self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True).to_float(compute_dtype) # 双向GRU层
self.fc = nn.Dense(enc_hid_dim * 2, dec_hid_dim).to_float(compute_dtype) # 全连接层
else:
self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True) # 双向GRU层
self.fc = nn.Dense(enc_hid_dim * 2, dec_hid_dim) # 全连接层
self.dropout = nn.Dropout(p=1-dropout) # dropout,防止过拟合
def construct(self, src, src_len):
"""构建编码器
Args:
src: 源序列,为已经转换为数字索引并统一长度的序列;shape = [src len, batch_size]
src_len: 有效长度;shape = [batch_size, ]
"""
# 将输入源序列转化为向量,并进行暂退(dropout)
# shape = [src len, batch size, emb dim]
embedded = self.dropout(self.embedding(src))
# 计算输出
# shape = [src len, batch size, enc hid dim*2]
outputs, hidden = self.rnn(embedded, seq_length=src_len)
# 为适配解码器,合并两个上下文函数
# shape = [batch size, dec hid dim]
hidden = ops.tanh(self.fc(ops.concat((hidden[-2, :, :], hidden[-1, :, :]), axis=1)))
return outputs, hidden
注意力层(Attention)¶
在机器翻译中,每个生成的词可能对应源句子中不同的词,而传统的无注意力机制的Seq2Seq模型更偏向于关注句子中的最后一个词。为了进一步优化模型,我们引入了注意力机制。
注意力机制便是赋予源句子和目标句子中对应的词以更高的权重,它整合了我们目前为止编码与解码的所有信息,并输出一个表示注意力权重的向量\(a_t\),用来决定在下一步的预测\(\hat{y}_{t+}\)中应该给予哪些词更高的关注度。
首先,我们需要明确编码器中的每一个隐藏状态和解码器中上一个时刻隐藏状态之间的匹配程度\(E_t\)。
截止到当前的时刻\(t\),编码器(encoder)中的所有信息为全部前向和后向RNN的隐藏状态的组合\(H\),是一个有\(T\)个张量的序列;解码器(decoder)中的所有信息为上一时刻的隐藏状态\(s_{t-1}\),是一个单独的张量。为了统一二者的维度,我们需要将解码器中上一时刻的隐藏状态\(s_{t-1}\)重复\(T\)次,接着把处理好的解码器信息与编码器信息堆叠起来,并输入到线性层att
和激活函数\(\text{tanh}\)中,计算编码器与解码器隐藏状态之间的能量\(E_t\)。
当前\(E_t\)的每个batch中tensor的形状为[dec hid dim, src len],但是注意最终的注意力权重是需要作用在源序列之上的,所以注意力权重的维度也应该与源句子的维度[src len]相对应。为此,我们引入了一个可学习的张量\(v\)。
我们可以将\(v\)看作是所有编码器隐藏状态的加权和的权重,简单来说便是对源序列中的每个词的关注程度。\(v\)的参数是随机初始化的,它会在反向传播中与模型的其余部分一起学习。此外,\(v\)并不依赖于时间,所以在解码中每个时间步长使用的\(v\)是一致的。
最终,我们使用\(\text{softmax}\)函数,来保证注意力向量\(a_t\)中每一个元素的大小都在0-1之间,并且所有元素加和为1。
[14]:
class Attention(nn.Cell):
def __init__(self, enc_hid_dim, dec_hid_dim, is_ascend):
super().__init__()
if is_ascend:
# attention线性层
self.attn = nn.Dense((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim).to_float(compute_dtype)
# v, 用不带有bias的线性层表示
# shape = [1, dec hid dim]
self.v = nn.Dense(dec_hid_dim, 1, has_bias=False).to_float(compute_dtype)
else:
self.attn = nn.Dense((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)
self.v = nn.Dense(dec_hid_dim, 1, has_bias=False)
def construct(self, hidden, encoder_outputs, mask):
"""Attention层
Args:
hidden: 解码器上一个时刻的隐藏状态;shape = [batch size, dec hid dim]
encoder_outputs: 编码器的输出,前向与反向RNN的隐藏状态;shape = [src len, batch size, enc hid dim * 2]
mask: 将<pad>占位符的注意力权重替换为0或者很小的数值;shape = [batch size, src len]
"""
src_len = encoder_outputs.shape[0]
# 重复解码器隐藏状态src len次,对齐维度
# shape = [batch size, src len, dec hid dim]
hidden = ops.tile(hidden.expand_dims(1), (1, src_len, 1))
# 将编码器输出中的第1、2维度进行交换,对齐维度
# shape = [batch size, src len, enc hid dim*2]
encoder_outputs = encoder_outputs.transpose(1, 0, 2)
# 计算E_t
# shape = [batch size, src len, dec hid dim]
energy = ops.tanh(self.attn(ops.concat((hidden, encoder_outputs), axis=2)))
# 计算v * E_t
# shape = [batch size, src len]
attention = self.v(energy).squeeze(2)
# 不需要考虑序列中<pad>占位符的注意力权重
attention = attention.masked_fill(mask == 0, -1e10)
return ops.softmax(attention, axis=1)
解码器(Decoder)¶
解码器中包含了上述的注意力层,在获得注意力权重向量\(a_t\)后,我们将其应用在编码器的隐藏状态\(H\)上,得到一个表示编码器隐藏状态加权和的向量\(w_t\)。
我们将该向量\(w_t\),连同embedding后的输入\(d(y_t)\),上一时刻的隐藏状态\(s_{t-1}\),一起放入编码器的RNN网络中,并将输出送入线性层\(f\),得到关于目标句子中下一时刻出现的单词的预测。
[15]:
class Decoder(nn.Cell):
def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention, is_ascend):
super().__init__()
self.is_ascend = is_ascend
self.output_dim = output_dim
self.attention = attention
self.embedding = nn.Embedding(output_dim, emb_dim)
if is_ascend:
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim).to_float(compute_dtype)
self.fc_out = nn.Dense((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim).to_float(compute_dtype)
else:
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
self.fc_out = nn.Dense((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)
self.dropout = nn.Dropout(p=1-dropout)
def construct(self, inputs, hidden, encoder_outputs, mask):
"""构建解码器
Args:
input: 输入的单词;shape = [batch size]
hidden: 解码器上一时刻的隐藏状态;shape = [batch size, dec hid dim]
encoder_outputs: 编码器的输出,前向与反向RNN的隐藏状态;shape = [src len, batch size, enc hid dim * 2]
mask: 将<pad>占位符的注意力权重替换为0或者很小的数值;shape = [batch size, src len]
"""
# 为输入增加额外维度
# shape = [1, batch size]
inputs = inputs.expand_dims(0)
# 输入词的embedding输出, d(y_t)
# shape = [1, batch size, emb dim]
embedded = self.dropout(self.embedding(inputs))
if self.is_ascend:
embedded = embedded.astype(compute_dtype)
# 注意力权重向量, a_t
# shape = [batch size, src len]
a = self.attention(hidden, encoder_outputs, mask)
# 为注意力权重增加额外维度
# shape = [batch size, 1, src len]
a = a.expand_dims(1)
# 将编码器隐藏状态中的第1、2维度进行交换
# shape = [batch size, src len, enc hid dim * 2]
encoder_outputs = encoder_outputs.transpose(1, 0, 2)
# 计算w_t
# shape = [batch size, 1, enc hid dim * 2]
weighted = ops.bmm(a, encoder_outputs)
# 将w_t的第1、2维度进行交换
# shape = [1, batch size, enc hid dim * 2]
weighted = weighted.transpose(1, 0, 2)
# 将emdedded与weighted堆叠在一起,后输入进RNN层
# rnn_input shape = [1, batch size, (enc hid dim * 2) + emb dim]
# output shape = [seq len = 1, batch size, dec hid dim * n directions]
# hidden shape = [n layers (1) * n directions (1) = 1, batch size, dec hid dim]
rnn_input = ops.concat((embedded, weighted), axis=2)
output, hidden = self.rnn(rnn_input, hidden.expand_dims(0))
# 去除多余的第1维度
embedded = embedded.squeeze(0)
output = output.squeeze(0)
weighted = weighted.squeeze(0)
# 将embedded,weighted和hidden堆叠起来,并输入线性层,预测下一个词
# shape = [batch size, output dim]
prediction = self.fc_out(ops.concat((output, weighted, embedded), axis=1))
return prediction, hidden.squeeze(0), a.squeeze(1)
Seq2Seq¶
Seq2Seq封装器将我们之前创建的编码器与解码器合并起来。
简单梳理一下整体过程:
初始化空数列
outputs
,用于储存每次的预测结果;源序列\(X\)作为编码器的输入,输出\(z\)和\(H\);
解码器初始时刻的隐藏状态为编码器中输出的上下文向量,即编码器最后时刻的隐藏状态,\(s_0 = z = h_T\);
解码器最开始的输入\(y_1\)为表示序列开始的占位符<bos>;
重复以下步骤:
将此时刻\(t\)的输入\(y_t\),上一时刻的隐藏状态\(s_{t-1}\),编码器中的所有隐藏状态\(H\)作为输入;
输出对下一时刻的预测\(\hat{y}_{t+1}\),以及新的隐藏状态\(s_t\);
将预测结果存入
outputs
中确定是否使用teacher forcing,如是,\(y_{t+1} = \hat{y}_{t+1}\),如否,下一时刻的输入为目标序列中的词;
[16]:
from mindspore import Tensor
class Seq2Seq(nn.Cell):
def __init__(self, encoder, decoder, src_pad_idx, teacher_forcing_ratio):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.src_pad_idx = src_pad_idx
self.teacher_forcing_ratio = teacher_forcing_ratio # 使用teacher forcing的可能性
def create_mask(self, src):
"""标记出每个序列中<pad>占位符的位置"""
mask = (src != self.src_pad_idx).astype(mindspore.int32).swapaxes(1, 0)
return mask
def construct(self, src, src_len, trg, trg_len=None):
"""构建seq2seq模型
Args:
src: 源序列;shape = [src len, batch size]
src_len: 源序列长度;shape = [batch size]
trg: 目标序列;shape = [trg len, batch size]
trg_len: 目标序列长度;shape = [trg len, batch size]
"""
if trg_len is None:
trg_len = trg.shape[0]
#存储解码器输出
outputs = []
# 编码器(encoder):
# 输入:源序列、源序列长度
# 输出1:编码器中所有前向与反向RNN 的隐藏状态 encoder_outputs
# 输出2:编码器中前向与反向RNN中最后时刻的隐藏状态放入线性层后的输出 hidden
encoder_outputs, hidden = self.encoder(src, src_len)
#解码器的第一个输入是表示序列开始的占位符<bos>
inputs = trg[0]
# 标记源序列中<pad>占位符的位置
# shape = [batch size, src len]
mask = self.create_mask(src)
for t in range(1, trg_len):
# 解码器(decoder):
# 输入:源句子序列 inputs、前一时刻的隐藏状态 hidden、编码器所有前向与反向RNN的隐藏状态
# 标明每个句子中的<pad>,方便计算注意力权重时忽略该部分
# 输出:预测结果 output、新的隐藏状态 hidden、注意力权重(忽略)
output, hidden, _ = self.decoder(inputs, hidden, encoder_outputs, mask)
# 将预测结果放入之前的存储中
outputs.append(output)
#找出对应预测概率最大的词元
top1 = output.argmax(1).astype(mindspore.int32)
if self.training:
#如果目前为模型训练状态,则按照之前设定的概率使用teacher forcing
minval = Tensor(0, mindspore.float32)
maxval = Tensor(1, mindspore.float32)
teacher_force = ops.uniform((1,), minval, maxval) < self.teacher_forcing_ratio
# 如使用teacher forcing,则将目标序列中对应的词元作为下一个输入
# 如不使用teacher forcing,则将预测结果作为下一个输入
inputs = trg[t] if teacher_force else top1
else:
inputs = top1
# 将所有输出整合为tensor
outputs = ops.stack(outputs, axis=0)
return outputs.astype(dtype)
模型训练¶
模型参数,编码器,注意力层,解码器以及Seq2Seq网络初始化。
这里我们手动实现了混合精度,即在过程中以compute_dtype
(mindspore.float16)进行计算,在最后输出时将结果转换回dtype
(mindspore.float32)。
[17]:
input_dim = len(de_vocab) # 输入维度
output_dim = len(en_vocab) # 输出维度
enc_emb_dim = 256 # Encoder Embedding层维度
dec_emb_dim = 256 # Decoder Embedding层维度
enc_hid_dim = 512 # Encoder 隐藏层维度
dec_hid_dim = 512 # Decoder 隐藏层维度
enc_dropout = 0.5 # Encoder Dropout
dec_dropout = 0.5 # Decoder Dropout
src_pad_idx = de_vocab.pad_idx # 德语词典中pad占位符的数字索引
trg_pad_idx = en_vocab.pad_idx # 英语词典中pad占位符的数字索引
is_ascend = mindspore.get_context('device_target') == 'Ascend'
compute_dtype = mindspore.float16 # 计算中数据的类型
dtype = mindspore.float32 # 返回数据的类型
attn = Attention(enc_hid_dim, dec_hid_dim, is_ascend)
encoder = Encoder(input_dim, enc_emb_dim, enc_hid_dim, dec_hid_dim, enc_dropout, is_ascend)
decoder = Decoder(output_dim, dec_emb_dim, enc_hid_dim, dec_hid_dim, dec_dropout, attn, is_ascend)
model = Seq2Seq(encoder, decoder, src_pad_idx, 0.5)
损失函数、优化器初始化。
[18]:
opt = nn.Adam(model.trainable_params(), learning_rate=0.001) # 损失函数
loss_fn = nn.CrossEntropyLoss(ignore_index=trg_pad_idx) # 优化器
注意在模型训练中,可能会出现权重更新过大的情况。这会导致数值上溢或者下溢,最终造成梯度爆炸(gradient explosion)。为解决这个问题,我们需要在反向传播计算梯度之后,使用梯度裁剪(gradient clipping),再将裁剪后的梯度传入优化器进行网络更新。
[19]:
import mindspore.ops as ops
def clip_by_norm(clip_norm, t, axis=None):
"""给定张量t和裁剪参数clip_norm,对t进行正则化
使得t在axes维度上的L2-norm小于等于clip_norm。
Args:
t: tensor,数据类型为float
clip_norm: scalar,数值需大于0;梯度裁剪阈值,数据类型为float
axis: Union[None, int, tuple(int)],数据类型为int32;计算L2-norm参考的维度,如为Norm,则参考所有维度
"""
# 计算L2-norm
t2 = t * t
l2sum = t2.sum(axis=axis, keepdims=True)
pred = l2sum > 0
# 将加和中等于0的元素替换为1,避免后续出现NaN
l2sum_safe = ops.select(pred, l2sum, ops.ones_like(l2sum))
l2norm = ops.select(pred, ops.sqrt(l2sum_safe), l2sum)
# 比较L2-norm和clip_norm,如L2-norm超过阈值,进行裁剪
# 剪裁方法:output(x) = (x * clip_norm)/max(|x|, clip_norm)
intermediate = t * clip_norm
cond = l2norm > clip_norm
t_clip = intermediate / ops.select(cond, l2norm, clip_norm)
return t_clip
模型训练,训练途中使用验证数据集进行验证评估,并保存效果最好的模型。
[20]:
def forward_fn(src, src_len, trg):
"""前向网络"""
src = src.swapaxes(0, 1)
trg = trg.swapaxes(0, 1)
output = model(src, src_len, trg)
output_dim = output.shape[-1]
output = output.view(-1, output_dim)
trg = trg[1:].view(-1)
loss = loss_fn(output, trg)
return loss
# 反向传播计算梯度
grad_fn = mindspore.value_and_grad(forward_fn, None, opt.parameters)
def train_step(src, src_len, trg, clip):
"""单步训练"""
loss, grads = grad_fn(src, src_len, trg)
grads = ops.HyperMap()(ops.partial(clip_by_norm, clip), grads) # 梯度裁剪
opt(grads) # 更新网络参数
return loss
def train(iterator, clip, epoch=0):
"""模型训练"""
model.set_train(True)
num_batches = len(iterator)
total_loss = 0 # 所有batch训练loss的累加
total_steps = 0 # 训练步数
with tqdm(total=num_batches) as t:
t.set_description(f'Epoch: {epoch}')
for src, src_len, trg in iterator():
loss = train_step(src, src_len, trg, clip) # 当前batch的loss
total_loss += loss.asnumpy()
total_steps += 1
curr_loss = total_loss / total_steps # 当前的平均loss
t.set_postfix({'loss': f'{curr_loss:.2f}'})
t.update(1)
return total_loss / total_steps
def evaluate(iterator):
"""模型验证"""
model.set_train(False)
num_batches = len(iterator)
total_loss = 0 # 所有batch训练loss的累加
total_steps = 0 # 训练步数
with tqdm(total=num_batches) as t:
for src, src_len, trg in iterator():
loss = forward_fn(src, src_len, trg) # 当前batch的loss
total_loss += loss.asnumpy()
total_steps += 1
curr_loss = total_loss / total_steps # 当前的平均loss
t.set_postfix({'loss': f'{curr_loss:.2f}'})
t.update(1)
return total_loss / total_steps
[21]:
from mindspore import save_checkpoint
num_epochs = 10 # 训练迭代数
clip = 1.0 # 梯度裁剪阈值
best_valid_loss = float('inf') # 当前最佳验证损失
ckpt_file_name = os.path.join(cache_dir, 'seq2seq.ckpt') # 模型保存路径
for i in range(num_epochs):
# 模型训练,网络权重更新
train_loss = train(train_iterator, clip, i)
# 网络权重更新后对模型进行验证
valid_loss = evaluate(valid_iterator)
# 保存当前效果最好的模型
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
save_checkpoint(model, ckpt_file_name)
Epoch: 0: 100%|██████████| 226/226 [04:17<00:00, 1.14s/it, loss=4.90]
100%|██████████| 8/8 [00:06<00:00, 1.24it/s, loss=4.74]
Epoch: 1: 100%|██████████| 226/226 [02:45<00:00, 1.37it/s, loss=3.88]
100%|██████████| 8/8 [00:01<00:00, 4.60it/s, loss=3.98]
Epoch: 2: 100%|██████████| 226/226 [02:46<00:00, 1.36it/s, loss=3.19]
100%|██████████| 8/8 [00:01<00:00, 4.54it/s, loss=3.63]
Epoch: 3: 100%|██████████| 226/226 [02:47<00:00, 1.35it/s, loss=2.73]
100%|██████████| 8/8 [00:01<00:00, 4.49it/s, loss=3.46]
Epoch: 4: 100%|██████████| 226/226 [02:48<00:00, 1.34it/s, loss=2.40]
100%|██████████| 8/8 [00:01<00:00, 4.56it/s, loss=3.38]
Epoch: 5: 100%|██████████| 226/226 [02:47<00:00, 1.35it/s, loss=2.12]
100%|██████████| 8/8 [00:01<00:00, 4.50it/s, loss=3.37]
Epoch: 6: 100%|██████████| 226/226 [02:45<00:00, 1.37it/s, loss=1.91]
100%|██████████| 8/8 [00:01<00:00, 4.55it/s, loss=3.40]
Epoch: 7: 100%|██████████| 226/226 [02:45<00:00, 1.36it/s, loss=1.74]
100%|██████████| 8/8 [00:01<00:00, 4.60it/s, loss=3.44]
Epoch: 8: 100%|██████████| 226/226 [02:45<00:00, 1.37it/s, loss=1.59]
100%|██████████| 8/8 [00:01<00:00, 4.54it/s, loss=3.44]
Epoch: 9: 100%|██████████| 226/226 [02:44<00:00, 1.37it/s, loss=1.47]
100%|██████████| 8/8 [00:01<00:00, 4.57it/s, loss=3.50]
模型推理¶
[23]:
def translate_sentence(sentence, de_vocab, en_vocab, model, max_len=32):
"""给定德语句子,返回英文翻译"""
model.set_train(False)
# 对输入句子进行分词
if isinstance(sentence, str):
tokens = [tok.lower() for tok in re.findall(r'\w+|[^\w\s]', sentence.rstrip())]
else:
tokens = [token.lower() for token in sentence]
# 补充起始、终止占位符,统一序列长度
if len(tokens) > max_len - 2:
src_len = max_len
tokens = ['<bos>'] + tokens[:max_len - 2] + ['<eos>']
else:
src_len = len(tokens) + 2
tokens = ['<bos>'] + tokens + ['<eos>'] + ['<pad>'] * (max_len - src_len)
# 将德语单词转化为数字索引
src = de_vocab.encode(tokens)
src = mindspore.Tensor(src, mindspore.int32).expand_dims(1)
src_len = mindspore.Tensor([src_len], mindspore.int32)
trg = mindspore.Tensor([en_vocab.bos_idx], mindspore.int32).expand_dims(1)
# 获得预测结果,并将其转化为英语单词
outputs = model(src, src_len, trg, max_len)
trg_indexes = [int(i.argmax(1).asnumpy()) for i in outputs]
eos_idx = trg_indexes.index(en_vocab.eos_idx) if en_vocab.eos_idx in trg_indexes else -1
trg_tokens = en_vocab.decode(trg_indexes[:eos_idx])
return trg_tokens
使用测试数据集中的任意一组文本数据进行预测。
[24]:
from mindspore import load_checkpoint, load_param_into_net
# 加载之前训练好的模型
param_dict = load_checkpoint(ckpt_file_name)
load_param_into_net(model, param_dict)
# 以测试数据集中的第一组语句为例,进行测试
example_idx = 0
src = test_dataset[example_idx][0]
trg = test_dataset[example_idx][1]
print(f'src = {src}')
print(f'trg = {trg}')
src = ['ein', 'mann', 'mit', 'einem', 'orangefarbenen', 'hut', ',', 'der', 'etwas', 'anstarrt', '.']
trg = ['a', 'man', 'in', 'an', 'orange', 'hat', 'starring', 'at', 'something', '.']
查看预测结果。
[25]:
translation = translate_sentence(src, de_vocab, en_vocab, model)
print(f'predicted trg = {translation}')
predicted trg = ['a', 'man', 'in', 'an', 'orange', 'hat', ',', 'something', '.']
BLEU得分¶
双语替换评测得分(bilingual evaluation understudy,BLEU)为衡量文本翻译模型生成出来的语句好坏的一种算法,它的核心在于评估机器翻译的译文 \(\text{pred}\) 与人工翻译的参考译文 \(\text{label}\) 的相似度。通过对机器译文的片段与参考译文进行比较,计算出各个片段的的分数,并配以权重进行加和,基本规则为:
惩罚过短的预测,即如果机器翻译出来的译文相对于人工翻译的参考译文过于短小,则命中率越高,需要施加更多的惩罚;
对长段落匹配更高的权重,即如果出现长段落的完全命中,说明机器翻译的译文更贴近人工翻译的参考译文;
BLEU的公式如下:
len(label)
:人工翻译的译文长度len(pred)
:机器翻译的译文长度p_n
:n-gram的精度
[26]:
from nltk.translate.bleu_score import corpus_bleu
def calculate_bleu(dataset, de_vocab, en_vocab, model, max_len=50):
trgs = []
pred_trgs = []
for data in dataset:
src = data[0] # 源语句:德语
trg = data[1] # 目标语句:英语
# 获取模型预测结果
pred_trg = translate_sentence(src, de_vocab, en_vocab, model, max_len)
pred_trgs.append(pred_trg)
trgs.append([trg])
return corpus_bleu(trgs, pred_trgs)
# 计算BLEU Score
bleu_score = calculate_bleu(test_dataset, de_vocab, en_vocab, model)
print(f'BLEU score = {bleu_score*100:.2f}')
BLEU score = 31.54
参考文献¶
Dzmitry Bahdanau, Kyunghyun Cho, and Yoshua Bengio. 2014. Neural machine translation by jointly learning to align and translate. arXiv preprint arXiv:1409.0473.