这是一个面向小白(比如,本人)的关于情感分析的系列教程 [1]。老鸽子整理了“4 - Convolutional Sentiment Analysis.ipynb”中的内容。
本文任务:使用卷积神经网络(CNN)来实现句子分类。
简介
CNN用于分析图像,包含一个或多个卷积层,紧跟一个或多个线性层。在卷积层中,使用滤波器扫描图像。扫描后的图像送入另一个卷积层或线性层。每个滤波器都有大小,如:每次使用大小的滤波器来扫描大小的图像区域,滤波器有9个对应的权重。
类似于使用滤波器查看图像区域,使用大小的滤波器来查看文本中两个连续的词(bi-gram)。
准备数据
不同于FastText模型,不用明确创建bi-grams并添加到句尾。
因为卷积层的第一个维度是batch,所以可以设置field中的batch_first为True。
划分训练集和验证集;构建词汇表;创建迭代器。
import torch
from torchtext import data
from torchtext import datasets
import random
import numpy as np
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
TEXT = data.Field(tokenize='spacy', batch_first=True)
LABEL = data.LabelField(dtype=torch.float)
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
train_data, valid_data = train_data.split(random_state = random.seed(SEED))
#-----------------------------
MAX_VOCAB_SIZE = 25_000
TEXT.build_vocab(
train_data,
max_size = MAX_VOCAB_SIZE,
vectors = "glove.6B.100d",
unk_init = torch.Tensor.normal_
)
LABEL.build_vocab(train_data)
#-----------------------------
BATCH_SIZE = 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size=BATCH_SIZE,
device=device
)
构建模型
在二维中可视化句中的词。每个词沿一个轴,嵌入沿另一个轴。
使用一个大小的滤波器,每次扫描n个词。
下图有4个词,每个词有5维嵌入,因此得到一个大小的“图像”张量。使用一个大小的滤波器,每次扫描两个词(黄色)。滤波器的每个元素包含一个权重,它的输出为10个嵌入元素的加权和。滤波器向下扫描(跨句)到下一个bi-gram,输出另一个加权和。滤波器再次向下扫描,计算最后的加权和。
滤波器的宽度等于“图像”的宽度,输出一个向量,它的长度等于“图像”的高度减滤波器的高度加1:。
本文所用的模型将有不同大小的滤波器,它们的高度分别为3、4和5,每个大小有100个,从而能够寻找与评论的情感相关的不同的tri-grams、4-grams和5-grams。
为了确定与情绪相关的最重要的n-gram,最大池化卷积层的输出。
因为模型有300个不同且重要的n-grams,所以可以认为全连接层用于加权这些n-grams,从而给出最终的判断。
应用细节
使用nn.Conv2d来实现卷积层:in_channels为输入通道数;图像的通道数为3(红-绿-蓝),而文本的通道数为1;out_channels为输出通道数,kernel_size为滤波器大小:,其中的为n-grams的大小。
RNN的batch位于第二维,而CNN的batch位于第一维。CNN的第二维为通道数(为嵌入添加大小为1的新维度,与卷积层的in_channels=1一致)。
卷积后的激活函数为ReLU。池化层可以处理不同长度的句子,卷积层的输出取决于它的输入,不同的batches包含了不同长度的句子。当没有最大池化层时,线性层的输入取决于输入句子的大小。为了解决该问题,可以修剪或填充所有句子到等长。当有最大池化层时,线性层的输入数等于滤波器个数。
当句子比最大的滤波器还短时,必须填充句子到最大的滤波器长度。因为IMDb数据中的评论都大于5个字长,所以不用担心该情况。
对级联的滤波器输出使用Dropout,再经过线性层得到预测结果。
import torch.nn as nn
import torch.nn.functional as F
class CNN(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(
vocab_size,
embedding_dim,
padding_idx=pad_idx
)
self.conv_0 = nn.Conv2d(
in_channels=1,
out_channels=n_filters,
kernel_size=(filter_sizes[0], embedding_dim)
)
self.conv_1 = nn.Conv2d(
in_channels=1,
out_channels=n_filters,
kernel_size=(filter_sizes[1], embedding_dim)
)
self.conv_2 = nn.Conv2d(
in_channels=1,
out_channels=n_filters,
kernel_size=(filter_sizes[2], embedding_dim)
)
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text: [batch size, sent len]
# embedded: [batch size, sent len, emb dim]
embedded = self.embedding(text)
# embedded: [batch size, 1, sent len, emb dim]
embedded = embedded.unsqueeze(1)
# conved_n: [batch size, n_filters, sent len - filter_sizes[n] + 1]
conved_0 = F.relu(self.conv_0(embedded).squeeze(3))
conved_1 = F.relu(self.conv_1(embedded).squeeze(3))
conved_2 = F.relu(self.conv_2(embedded).squeeze(3))
# pooled_n: [batch size, n_filters]
pooled_0 = F.max_pool1d(conved_0, conved_0.shape[2]).squeeze(2)
pooled_1 = F.max_pool1d(conved_1, conved_1.shape[2]).squeeze(2)
pooled_2 = F.max_pool1d(conved_2, conved_2.shape[2]).squeeze(2)
# cat: [batch size, n_filters * len(filter_sizes)]
cat = self.dropout(torch.cat((pooled_0, pooled_1, pooled_2), dim=1))
return self.fc(cat)
CNN模型仅用了3种不同大小的滤波器。为了使用任意数量的滤波器,将所有的卷积层放入nn.ModuleList。
class CNN2d(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(
vocab_size,
embedding_dim,
padding_idx=pad_idx
)
self.convs = nn.ModuleList([
nn.Conv2d(
in_channels=1,
out_channels=n_filters,
kernel_size=(fs, embedding_dim)
)
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text: [batch size, sent len]
# embedded: [batch size, sent len, emb dim]
embedded = self.embedding(text)
# embedded: [batch size, 1, sent len, emb dim]
embedded = embedded.unsqueeze(1)
# conved_n: [batch size, n_filters, sent len - filter_sizes[n] + 1]
conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
# pooled_n: [batch size, n_filters]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
# cat: [batch size, n_filters * len(filter_sizes)]
cat = self.dropout(torch.cat(pooled, dim=1))
return self.fc(cat)
使用一维卷积层,滤波器的“深度”和宽分别为嵌入的维度和句中的词数。
class CNN1d(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(
vocab_size,
embedding_dim,
padding_idx=pad_idx
)
self.convs = nn.ModuleList([
nn.Conv1d(
in_channels=embedding_dim,
out_channels=n_filters,
kernel_size=fs
)
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text: [batch size, sent len]
# embedded: [batch size, sent len, emb dim]
embedded = self.embedding(text)
# embedded: [batch size, emb dim, sent len]
embedded = embedded.permute(0, 2, 1)
# conved_n: [batch size, n_filters, sent len - filter_sizes[n] + 1]
conved = [F.relu(conv(embedded)) for conv in self.convs]
# pooled_n: [batch size, n_filters]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
# cat: [batch size, n_filters * len(filter_sizes)]
cat = self.dropout(torch.cat(pooled, dim=1))
return self.fc(cat)
创建CNN2d(或CNN、CNN1d)类的实例;打印模型中可训练的参数量;清0“”和“”的初始权重。
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3, 4, 5]
OUTPUT_DIM = 1
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
model = CNN2d(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
#-----------------------------
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
#-----------------------------
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
训练模型、测试和用户输入
该部分与前面几节对应的代码相同。当测试模型时,损失为0.337,准确率为85.69%。
用户的预测结果分别为0.11990132927894592(负面情绪)和0.9671174883842468(正面情绪)。
import torch.optim as optim
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()
model = model.to(device)
criterion = criterion.to(device)
def binary_accuracy(preds, y):
# round predictions to the closest integer
rounded_preds = torch.round(torch.sigmoid(preds))
correct = (rounded_preds == y).float() # convert into float for division
acc = correct.sum() / len(correct)
return acc
#-----------------------------
def train(model, iterator, optimizer, criterion):
epoch_loss = 0
epoch_acc = 0
model.train()
for batch in iterator:
optimizer.zero_grad()
predictions = model(batch.text).squeeze(1)
loss = criterion(predictions, batch.label)
acc = binary_accuracy(predictions, batch.label)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
#-----------------------------
def evaluate(model, iterator, criterion):
epoch_loss = 0
epoch_acc = 0
model.eval()
with torch.no_grad():
for batch in iterator:
predictions = model(batch.text).squeeze(1)
loss = criterion(predictions, batch.label)
acc = binary_accuracy(predictions, batch.label)
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
#-----------------------------
import time
def epoch_time(start_time, end_time):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
#-----------------------------
N_EPOCHS = 5
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss best_valid_loss = valid_loss
torch.save(model.state_dict(), 'tut4-model.pt')
print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%')
#=============================
# Test.
model.load_state_dict(torch.load('tut4-model.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
#=============================
# User input.
import spacy
nlp = spacy.load('en')
def predict_sentiment(model, sentence, min_len=5):
model.eval()
tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
if len(tokenized) tokenized += [''] * (min_len - len(tokenized))
indexed = [TEXT.vocab.stoi[t] for t in tokenized]
tensor = torch.LongTensor(indexed).to(device)
tensor = tensor.unsqueeze(0)
prediction = torch.sigmoid(model(tensor))
return prediction.item()
print(predict_sentiment(model, "This film is terrible"))
print(predict_sentiment(model, "This film is great"))
[1]https://github.com/bentrevett/pytorch-sentiment-analysis