基于BERT Masked LM(Language Model) 的数据增强技术
TinyBert的data_augmentation.py代码解读
0 序
通过样本增强技术,在样本数量较少、样本不均衡或者需要提高模型的鲁棒性的场景下通过样本增强技术提升模型的泛化能力。 在图像处理中,样本增强相对比较成熟,很多算法在预处理的步骤,就默认做了样本增强(反转,裁剪等)。因为语序也是很重要的信息,所以文本处理中的样本增强相对要麻烦些。
比如 this is good, 调换顺序后 is this good。完全改变了整个语句的意义。
基于BERT Masked LM(Language Model) 的文本数据增强技术,最初在TinyBert中提出,本文分析TinyBERT工程下,data_augmentation.py 文件的关键代码。可能部分有误,欢迎指出。
代码所在位置:
https://github.com/huawei-noah/Pretrained-Language-Model/tree/master/TinyBERT/data_augmentation.py
1 主流程
主流程如下:
图 <TinyBert_Distilling Bert for natural Language Understanding>论文中的数据增强伪代码
2 代码解读
def main():
#.....其余部分省略
# Prepare data augmentor
# 定义分词器
tokenizer = BertTokenizer.from_pretrained(args.pretrained_bert_model)
# 定义模型:BertForMaskedLM模型
model = BertForMaskedLM.from_pretrained(args.pretrained_bert_model)
model.eval()
# 加载词典,获取归一化后的词向量矩阵:emb_norm,词典:vocab, id和词的映射:ids_to_tokens
emb_norm, vocab, ids_to_tokens = prepare_embedding_retrieval(args.glove_embs)
# 定义数据增强类
data_augmentor = DataAugmentor(model, tokenizer, emb_norm, vocab, ids_to_tokens, args.M, args.N, args.p)
# Do data augmentation
processor = AugmentProcessor(data_augmentor, args.data_path)
processor.read_augment_write()
加载词典部分的代码解析
def prepare_embedding_retrieval(glove_file, vocab_size=100000):
cnt = 0
words = []
embeddings = {}
with open(glove_file, 'r', encoding='utf-8') as fin:
for line in fin:
# line 词 + 300 维向量表示组成, 比如[the 0.04656 0.21318 -0.0074364 -0.45854 -0.035639 0.23643 -0.28836 0.21521 -0.13486 -1.6413 -0.26091 0.032434 0.056621 ....]
items = line.strip().split()
# 将词加入words, items[0]为'the'
words.append(items[0])
# 将词向量加入embeddings
# embeddings['the']=[0.04656 0.21318 -0.0074364 -0.45854 -0.035639 0.23643 -0.28836 0.21521 -0.13486 -1.6413 -0.26091 0.032434 0.056621 ....]
embeddings[items[0]] = [float(x) for x in items[1:]]
cnt += 1
# vocab_size的数量限制在vocab_size范围内
if cnt == vocab_size:
break
# vocab记录,词和它的下索引。如vocab['the'] = 0
vocab = {w: idx for idx, w in enumerate(words)}
# ids_to_tokens 记录 id(索引)和词之间的对应。如ids_to_tokens[0] = 'the'
ids_to_tokens = {idx: w for idx, w in enumerate(words)}
# 记录词向量的维度, 如:300
vector_dim = len(embeddings[ids_to_tokens[0]])
# emb_matrix,用一个大小为[vocab_size, vector_dim] 的矩阵记录所有的词词向量。每一行为一个词向量
emb_matrix = np.zeros((vocab_size, vector_dim))
for word, v in embeddings.items():
if word == '<unk>':
continue
emb_matrix[vocab[word], :] = v
# normalize each word vector:emb_matrix归一化为emb_norm
d = (np.sum(emb_matrix**2, 1)**0.5)
emb_norm = (emb_matrix.T / d).T
return emb_norm, vocab, ids_to_tokens
class AugmentProcessor(object):
def __init__(self, augmentor, data_path):
self.augmentor = augmentor
self.data_path = data_path
def read_augment_write(self):
filename = f'aug_{os.path.split(self.data_path)[1]}'
aug_train_path = os.path.join(os.path.split(self.data_path)[0], filename)
# 新生成的数据写入到 “aug_{filename}.txt”, 从“{filename}.txt读取每一行样本”
with open(aug_train_path, 'w', newline='', encoding='utf8') as fw, open(self.data_path, 'r', encoding='utf8') as fr:
for (i, line) in enumerate(fr):
sent = line.strip('\n').strip()
if not line:
continue
# 关键函数:调用augment(sentence)接口,生成augmented_sents列表
augmented_sents = self.augmentor.augment(sent)
# 记录所有增强后生成的新语句
for augment_sent in augmented_sents:
fw.write(f'{augment_sent}\n')
if (i + 1) % 1000 == 0:
logger.info("Having been processing {} examples".format(str(i + 1)))
数据增强实现类
class DataAugmentor(object):
def _masked_language_model(self, sent, word_pieces, mask_id):
'''
input param: sent 原始句子
input param:word_pieces:被分词之后的列表,在句子的开头加了CLS,即[CLS,词1,词2,词3......]
input param: mask_id,需要被mask的id, 中文的情况即是当前被选中的词的id
'''
tokenized_text = self.tokenizer.tokenize(sent)
tokenized_text = ['[CLS]'] + tokenized_text
tokenized_len = len(tokenized_text)
# 拼接成两个句子,这里不明白为什么一定要两个句子??
# 拼接之后的句子为[[CLS] 词1,词2,词3...... [SEP] 词1,词2,词3...... [SEP]]
tokenized_text = word_pieces + ['[SEP]'] + tokenized_text[1:] + ['[SEP]']
if len(tokenized_text) > 512:
tokenized_text = tokenized_text[:512]
# 把词转换为对应的id数值。即词数值化表示。[[CLS] 词1,词2,词3...... [SEP] 词1,词2,词3...... [SEP]]-->[101 722 103......]
token_ids = self.tokenizer.convert_tokens_to_ids(tokenized_text)
# 第一句标识为0, 第二句标识为1
segments_ids = [0] * (tokenized_len + 1) + [1] * (len(tokenized_text) - tokenized_len - 1)
# 类型转换为tensor
tokens_tensor = torch.tensor([token_ids]).to(device)
segments_tensor = torch.tensor([segments_ids]).to(device)
self.model.to(device)
# 得到模型的输出,维度为[1, seq_len, vocab_size],
# predictions[0, mask_id],表示取[0, mask_id], mask_id对应seq_len处处的[vocab_size]。这里的[vocab_size]类似于mask_id位置处,即[MASK]标记处的字的one-hot表示。
predictions = self.model(tokens_tensor, segments_tensor)
# 直接取概率前self.M的索引, predictions : bsz x len x vocab_size
# 选取self.M个候选词
word_candidates = torch.argsort(predictions[0, mask_id], descending=True)[:self.M].tolist()
word_candidates = self.tokenizer.convert_ids_to_tokens(word_candidates)
# 过滤掉带有##的token
return list(filter(lambda x: x.find("##"), word_candidates))
def _word_augment(self, sentence, mask_token_idx, mask_token):
'''
input param: sentence 原始句子
input param: mask_token_idx 当前被选中的词的索引,即被选中的词在原句子中的位置,从0开始
input param: mask_token: 当前被选中的词
'''
word_pieces = self.tokenizer.tokenize(sentence)
word_pieces = ['[CLS]'] + word_pieces
tokenized_len = len(word_pieces)
token_idx = -1
# 0 is [CLS], so start from 1
# mask_token_idx指的是完整词的索引,而经过tokenizer之后,就变成了没有##的token的索引。因为wordpiece的token是类似
# 如 ['word', '##love', 'happy', '##ness']
for i in range(1, tokenized_len):
if "##" not in word_pieces[i]:
# 中文字符不存在"##"的情况,所以我们只需要看这个分支即可
# token_idx 从0开始
token_idx = token_idx + 1
if token_idx < mask_token_idx:
word_piece_ids = []
elif token_idx == mask_token_idx:
word_piece_ids = [i]
else:
break
else:
word_piece_ids.append(i)
# 是完整单词的话,长度为1,全中文情况,word_piece_ids都为1,进行mask
if len(word_piece_ids) == 1:
word_pieces[word_piece_ids[0]] = '[MASK]'
# 得到M(默认值为15)个word_piece_ids[0]对应的词的后选词
candidate_words = self._masked_language_model(sentence, word_pieces, word_piece_ids[0])
elif len(word_piece_ids) > 1:
candidate_words = self._word_distance(mask_token)
else:
logger.info("invalid input sentence!")
# 如果没有找到后选词,candidate_words存储[MASK]处对应的原字
if len(candidate_words) == 0:
candidate_words.append(mask_token)
return candidate_words
def augment(self, sent):
# candidate_sents 为构造生成的所有的语句,第一个句子为原句子
candidate_sents = [sent]
# 分词
tokens = self.tokenizer.basic_tokenizer.tokenize(sent)
candidate_words = {}
for (idx, word) in enumerate(tokens):
if _is_valid(word) and word not in StopWordsList:
# 关键部分:为每一个词去找它的候选词,候选词的生成方法,请见_word_augment接口的实现。
# idx 为 tokens 的索引,从0开始
candidate_words[idx] = self._word_augment(sent, idx, word)
logger.info(candidate_words)
cnt = 0
# 产生N(默认值为30)个新的语句
while cnt < self.N:
new_sent = list(tokens)
for idx in candidate_words.keys():
# 从candidate_words[idx],即M个候选词中随机选取一个
candidate_word = random.choice(candidate_words[idx])
# 以p(默认值为0.4)的概率用从candidate_words替换到原词
x = random.random()
if x < self.p:
new_sent[idx] = candidate_word
if " ".join(new_sent) not in candidate_sents:
candidate_sents.append(' '.join(new_sent))
cnt += 1
# 如果使用默认值30,最后candidate_sents为31个句子,即样本扩充了30倍。
return candidate_sents
采用的模型为BertForMaskedLM : BERT model with the masked language modeling head.
class BertForMaskedLM(BertPreTrainedModel):
def __init__(self, config):
super(BertForMaskedLM, self).__init__(config)
self.bert = BertModel(config)
self.cls = BertOnlyMLMHead(
config, self.bert.embeddings.word_embeddings.weight)
self.apply(self.init_bert_weights)
def forward(self, input_ids, token_type_ids=None, attention_mask=None, masked_lm_labels=None,
output_att=False, infer=False):
# sequence_output 所有隐藏层状态的输出
sequence_output, _ = self.bert(input_ids, token_type_ids, attention_mask,
output_all_encoded_layers=True, output_att=output_att)
if output_att:
sequence_output, att_output = sequence_output
# sequence_output[-1]最后一层隐层状态输出, 维度为[batch_size, seq_len, embedding_size]
# 最终调用的为BertLMPredictionHead::forward部分
# prediction_scores的维度为[batch_size, seq_len, vocab_size]
prediction_scores = self.cls(sequence_output[-1])
if masked_lm_labels is not None:
loss_fct = CrossEntropyLoss(ignore_index=-1)
masked_lm_loss = loss_fct(
prediction_scores.view(-1, self.config.vocab_size), masked_lm_labels.view(-1))
if not output_att:
return masked_lm_loss
else:
return masked_lm_loss, att_output
else:
if not output_att:
return prediction_scores
else:
return prediction_scores, att_output
class BertOnlyMLMHead(nn.Module):
def __init__(self, config, bert_model_embedding_weights):
super(BertOnlyMLMHead, self).__init__()
self.predictions = BertLMPredictionHead( config, bert_model_embedding_weights) def forward(self, sequence_output):
prediction_scores = self.predictions(sequence_output)
return prediction_scores
代码所在文件:TinyBert/transformer/modeling.py
class BertLMPredictionHead(nn.Module):
def __init__(self, config, bert_model_embedding_weights):
super(BertLMPredictionHead, self).__init__()
# 变换矩阵的维度为[embedding_size, embedding_size]
self.transform = BertPredictionHeadTransform(config)
# The output weights are the same as the input embeddings, but there is # an output-only bias for each token.
# 矩阵维度为[embedding_size, vocab_size]
self.decoder = nn.Linear(bert_model_embedding_weights.size(1), bert_model_embedding_weights.size(0), bias=False)
self.decoder.weight = bert_model_embedding_weights
# bias维度为[vocab_size]
self.bias = nn.Parameter(torch.zeros(bert_model_embedding_weights.size(0)))
def forward(self, hidden_states):
# 维度变换[seq_len, embedding_size]-->[seq_len, embedding_size]
idden_states = self.transform(hidden_states)
# 全连接层
# 维度变换为[seq_len, embedding_size]*[embeeding_size, vocab_size] + [vocab_size] = [seq_len, vocab_size]
hidden_states = self.decoder(hidden_states) + self.bias
# 输出维度为[seq_len, vocab_size]
return hidden_states