基于神经网络的情感分析
依旧是采用上一篇文章中Kaggle上的比赛,来对神经网络中最基础的前馈神经网络、卷积神经网络、循环神经网络做简要的介绍。
神经网络可以理解为一张有向无环图,被划分为输入层、隐藏层、输出层三部分,给定的输入被输入层接收后,经过隐藏层的一系列计算,最后把结果给到输出层。
隐藏层的每一个节点(神经元)会以一定的权重系数接收上一层部分或全部的计算结果,并将得到的值经过一个函数(激励函数)计算后传递给下一层的节点。
其中接收时的系数,就是我们要训练的部分。
理论上任意一个连续函数,都能用神经网络来拟合。
1.数据处理
文件读取
和之前一样,用pandas打开并读取相应行的文件,在此不赘述。
由于神经网络中,需要训练的参数要比线性模型多得多,所以这里只提取了前一千个词来训练。
train_data = pd.read_csv(‘../train/train.tsv‘, sep = ‘\t‘) test_data = pd.read_csv(‘../test/test.tsv‘, sep = ‘\t‘) train_phrase = list(train_data[‘Phrase‘][:1000]) train_sentiment = list(train_data[‘Sentiment‘][:1000]) test_phrase = list(test_data[‘Phrase‘])
分词器
不同于线性模型,由于神经网络本身就可以反应一定的顺序关系,所以这里我们把每个单词对应到某个数字,再把句子里的单词换成对应的数字,从而减少数据量,提高运行速度。
torchtext库提供了一系列非常高效的分词工具,但由于这个实验中用不到这么多的功能,所以我自己手写了一个分词器。
首先提取所有句子中的单词,来生成词典。
def build_vocabulary(phrase): ret = dict() cnt = 1 for s in phrase: for x in s.split(): if(x in ret.keys()): continue ret[x] = cnt cnt = cnt + 1 return ret vocabulary = build_vocabulary(train_phrase + test_phrase)
之后用分词器,来将句子中的单词变为数字。
def tokenizer(phrase, vocabulary): ret = [] for s in phrase: tmp = [] for x in s.split(): tmp.append(vocabulary[x]) ret.append(tmp) return ret train_phrase = tokenizer(train_phrase, vocabulary) test_phrase = tokenizer(test_phrase, vocabulary)
Embedding
上面分词中,用数字代替单词,有一个很大的问题。
问题在于,由于拟合时是直接用这些数字来计算,数字本身的大小会很大程度上影响拟合的效果。
由于单词本身也会有一定的感情倾向,相同感情的单词一定程度上会聚集在一定区域,这启发我们用一些向量来代替这些单词。
也就是说,把一个长度为length的句子,转化为一个length×dim的二维矩阵,其中dim为向量的维数。
这就是embedding的思想。
由于pytorch中将embedding直接作为神经层,可以和神经网络一起来训练,所以代码在神经网络部分给出。
批训练
受内存限制,一次在神经网络中同时训练的数据不能太多,所以需要分批放入神经网络中训练。
torch_dataset = Data.TensorDataset(train_x, train_y) loader = Data.DataLoader( dataset = torch_dataset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 2 ) for epoch in range(EPOCH): for step, (batch_x, batch_y) in enumerate(loader): # training...
这里第一层循环表示训练的代数,第二层循环来获取每一个batch中的数据。
长度统一
由于不同句子的单词数量不同,而同一个batch只能传入一个矩阵,所以我们需要将所有句子的长度统一起来。
一个直观的想法是把空位用0来填充。
def expand_with_zero(phrase, length): ret = [] for s in phrase: tmp = s.copy() for i in range(length - len(s)): tmp.append(0) ret.append(tmp) return ret max_length = len(max(train_phrase + test_phrase, key = len, default = ‘‘)) train_phrase = expand_with_zero(train_phrase, max_length + 1) test_phrase = expand_with_zero(test_phrase, max_length + 1)
但这样的问题在于,0在embedding中也会有一个向量来对应,这个向量会影响训练的结果。
尤其是在一个特别短的句子里,过多的0会有很大的影响。
一个解决方案是用pad_squence系列函数,来将数据压缩打包,再传给神经网络,但这一方法和pytorch自带的embedding不兼容。
另一个解决方法是运用pytorch动态图的特性,只处理向量非零的部分。
具体代码见下面神经网络部分。
2.搭建网络
前馈神经网络
是三个网络中最简单的一个,隐藏层的每一层都和上一层的神经元完全连接。
class FNN(torch.nn.Module): def __init__(self, n_feature, n_hidden, n_output): super(FNN, self).__init__() self.hidden = torch.nn.Linear(n_feature, n_hidden) self.predict = torch.nn.Linear(n_hidden, n_output) def forward(self, x): x = function.relu(self.hidden(x)) x = self.predict(x) return x
卷积神经网络
有两个特殊的隐藏层:卷积层和汇聚层。
简单来说,卷积层可以提高单个节点信息的密度(或者干脆理解为增大某个维度),汇聚层可以降低单个节点的大小(降低某个维度)。
class CNN(torch.nn.Module): def __init__(self, n_feature, embedding_dim, n_output, max_length): super(CNN, self).__init__() self.embedding = torch.nn.Embedding(n_feature, embedding_dim) self.conv = torch.nn.Sequential( torch.nn.Conv1d( in_channels = embedding_dim, out_channels = 16, kernel_size = 5, stride = 1, padding = 2 ), torch.nn.ReLU(), torch.nn.MaxPool1d( kernel_size = 2, padding = max_length % 2 ) ) self.out = torch.nn.Linear(math.ceil(max_length / 2) * 16, n_output) def forward(self, x): x = self.embedding(x) x = x.permute(0, 2, 1) x = self.conv(x) x = x.view(x.size(0), -1) x = self.out(x) return x
这里的embedding如上文所述,最后的view用来将信息展开,使得最后的信息除batch_size外为一维,可以被输出层接收。
循环神经网络
每个节点更新时,除了用上一层的节点,还要用到该层节点前几次更新时的信息。
class RNN(torch.nn.Module): def __init__(self, n_feature, embedding_dim, n_hidden, n_output): super(RNN, self).__init__() self.embedding = torch.nn.Embedding(n_feature, embedding_dim) self.hidden = torch.nn.LSTM(embedding_dim, n_hidden, batch_first = True) self.out = torch.nn.Linear(n_hidden, n_output) def forward(self, x): length = [] for s in x: for i in range(len(s)): if(s[i] == 0): length.append(i - 1) break x = self.embedding(x) x, _ = self.hidden(x) for i in range(len(x)): current = x[i][length[i]].unsqueeze(0) if(i == 0): t = current else: t = torch.cat((t, current), dim = 0) x = self.out(t) return function.softmax(x, dim = 1)
这里forward处理不同长度的情况,最后传递到输出层时,只处理最后一次有效更新的位置(最后一个非零位)。
3.模型训练及预测
模型训练采用梯度法改进的Adam法。
rnn = RNN(len(vocabulary) + 1, 16, 40, 5) optimizer = torch.optim.Adam(rnn.parameters(), lr = LR) loss_func = torch.nn.CrossEntropyLoss() for epoch in range(EPOCH): for step, (batch_x, batch_y) in enumerate(loader): out = rnn(batch_x) loss = loss_func(out, batch_y) optimizer.zero_grad() loss.backward() optimizer.step()
预测时取输出向量最大的维度(概率)作为预测值。
test_y = torch.max(rnn(test_x), 1)[1] outData = pd.DataFrame({‘PhraseId‘:test_data.PhraseId, ‘Sentiment‘:test_y}) outData.to_csv(‘../test/RNN_result.csv‘, index = False)
最后给出各神经网络完整的代码:
import numpy import pandas as pd import torch from torch.autograd import Variable import torch.nn.functional as function from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler from sklearn.feature_extraction.text import CountVectorizer class FNN(torch.nn.Module): def __init__(self, n_feature, n_hidden, n_output): super(FNN, self).__init__() self.hidden = torch.nn.Linear(n_feature, n_hidden) self.predict = torch.nn.Linear(n_hidden, n_output) def forward(self, x): x = function.relu(self.hidden(x)) x = self.predict(x) return x if __name__ == ‘__main__‘: trainData = pd.read_csv("../train/train.tsv", sep = ‘\t‘) trainPhrase = trainData[‘Phrase‘][:1000] trainSentiment = trainData[‘Sentiment‘][:1000] testData = pd.read_csv("../test/test.tsv", sep = ‘\t‘) testPhrase = testData[‘Phrase‘] vectorizer = CountVectorizer(ngram_range = (1, 1)) vectorizer.fit(pd.concat([trainPhrase, testPhrase])) trainX = vectorizer.transform(trainPhrase).todense() testX = vectorizer.transform(testPhrase).todense() trainY = numpy.array(list(trainSentiment)) scaler = StandardScaler() trainX = scaler.fit_transform(trainX) pca = PCA(n_components = 0.9).fit(trainX) trainX = pca.transform(trainX) testX = pca.transform(testX) (ndim, dim) = trainX.shape trainX = torch.from_numpy(trainX).type(torch.FloatTensor) testX = torch.from_numpy(testX).type(torch.FloatTensor) trainY = torch.from_numpy(trainY).type(torch.LongTensor) trainX, testX, trainY = Variable(trainX), Variable(testX), Variable(trainY) fnn = FNN(dim, dim, 5) optimizer = torch.optim.SGD(fnn.parameters(), lr = 0.2) loss_func = torch.nn.CrossEntropyLoss() for i in range(50): out = fnn(trainX) loss = loss_func(out, trainY) optimizer.zero_grad() loss.backward() optimizer.step() testY = torch.max(function.softmax(fnn(testX), dim = 1), 1)[1] outData = pd.DataFrame({‘PhraseId‘:testData.PhraseId, ‘Sentiment‘:testY}) outData.to_csv(‘../test/FNN_result.csv‘, index = False)
FNN
import math import numpy as np import pandas as pd import torch from torch.autograd import Variable import torch.nn.functional as function import torch.utils.data as Data BATCH_SIZE = 1000 EPOCH = 20 def build_vocabulary(phrase): ret = dict() cnt = 1 for s in phrase: for x in s.split(): if(x in ret.keys()): continue ret[x] = cnt cnt = cnt + 1 return ret def tokenizer(phrase, vocabulary): ret = [] for s in phrase: tmp = [] for x in s.split(): tmp.append(vocabulary[x]) ret.append(tmp) return ret def expand_with_zero(phrase, length): ret = [] for s in phrase: tmp = s.copy() for i in range(length - len(s)): tmp.append(0) ret.append(tmp) return ret class CNN(torch.nn.Module): def __init__(self, n_feature, embedding_dim, n_output, max_length): super(CNN, self).__init__() self.embedding = torch.nn.Embedding(n_feature, embedding_dim) self.conv = torch.nn.Sequential( torch.nn.Conv1d( in_channels = embedding_dim, out_channels = 16, kernel_size = 5, stride = 1, padding = 2 ), torch.nn.ReLU(), torch.nn.MaxPool1d( kernel_size = 2, padding = max_length % 2 ) ) self.out = torch.nn.Linear(math.ceil(max_length / 2) * 16, n_output) def forward(self, x): x = self.embedding(x) x = x.permute(0, 2, 1) x = self.conv(x) x = x.view(x.size(0), -1) x = self.out(x) return x if __name__ == ‘__main__‘: train_data = pd.read_csv(‘../train/train.tsv‘, sep = ‘\t‘) test_data = pd.read_csv(‘../test/test.tsv‘, sep = ‘\t‘) train_phrase = list(train_data[‘Phrase‘][:10000]) train_sentiment = list(train_data[‘Sentiment‘][:10000]) test_phrase = list(test_data[‘Phrase‘]) vocabulary = build_vocabulary(train_phrase + test_phrase) train_phrase = tokenizer(train_phrase, vocabulary) test_phrase = tokenizer(test_phrase, vocabulary) max_length = len(max(train_phrase + test_phrase, key = len, default = ‘‘)) train_phrase = expand_with_zero(train_phrase, max_length) test_phrase = expand_with_zero(test_phrase, max_length) train_x = torch.from_numpy(np.array(train_phrase)).type(torch.LongTensor) test_x = torch.from_numpy(np.array(test_phrase)).type(torch.LongTensor) train_y = torch.from_numpy(np.array(train_sentiment)).type(torch.LongTensor) cnn = CNN(len(vocabulary) + 1, 32, 5, max_length) optimizer = torch.optim.Adam(cnn.parameters(), lr = 0.2) loss_func = torch.nn.CrossEntropyLoss() torch_dataset = Data.TensorDataset(train_x, train_y) loader = Data.DataLoader( dataset = torch_dataset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 2 ) for epoch in range(EPOCH): for step, (batch_x, batch_y) in enumerate(loader): out = cnn(batch_x) loss = loss_func(out, batch_y) optimizer.zero_grad() loss.backward() optimizer.step() test_y = torch.max(function.softmax(cnn(test_x), dim = 1), 1)[1] outData = pd.DataFrame({‘PhraseId‘:test_data.PhraseId, ‘Sentiment‘:test_y}) outData.to_csv(‘../test/CNN_result.csv‘, index = False)
CNN
import math import numpy as np import pandas as pd import torch from torch.autograd import Variable import torch.nn.functional as function import torch.utils.data as Data BATCH_SIZE = 1000 EPOCH = 20 LR = 0.2 def build_vocabulary(phrase): ret = dict() cnt = 1 for s in phrase: for x in s.split(): if(x in ret.keys()): continue ret[x] = cnt cnt = cnt + 1 return ret def tokenizer(phrase, vocabulary): ret = [] for s in phrase: tmp = [] for x in s.split(): tmp.append(vocabulary[x]) ret.append(tmp) return ret def expand_with_zero(phrase, length): ret = [] for s in phrase: tmp = s.copy() for i in range(length - len(s)): tmp.append(0) ret.append(tmp) return ret class RNN(torch.nn.Module): def __init__(self, n_feature, embedding_dim, n_hidden, n_output): super(RNN, self).__init__() self.embedding = torch.nn.Embedding(n_feature, embedding_dim) self.hidden = torch.nn.LSTM(embedding_dim, n_hidden, batch_first = True) self.out = torch.nn.Linear(n_hidden, n_output) def forward(self, x): length = [] for s in x: for i in range(len(s)): if(s[i] == 0): length.append(i - 1) break x = self.embedding(x) x, _ = self.hidden(x) for i in range(len(x)): current = x[i][length[i]].unsqueeze(0) if(i == 0): t = current else: t = torch.cat((t, current), dim = 0) x = self.out(t) return function.softmax(x, dim = 1) if __name__ == ‘__main__‘: train_data = pd.read_csv(‘../train/train.tsv‘, sep = ‘\t‘) test_data = pd.read_csv(‘../test/test.tsv‘, sep = ‘\t‘) train_phrase = list(train_data[‘Phrase‘][:1000]) train_sentiment = list(train_data[‘Sentiment‘][:1000]) test_phrase = list(test_data[‘Phrase‘]) vocabulary = build_vocabulary(train_phrase + test_phrase) train_phrase = tokenizer(train_phrase, vocabulary) test_phrase = tokenizer(test_phrase, vocabulary) max_length = len(max(train_phrase + test_phrase, key = len, default = ‘‘)) train_phrase = expand_with_zero(train_phrase, max_length + 1) test_phrase = expand_with_zero(test_phrase, max_length + 1) train_x = torch.from_numpy(np.array(train_phrase)).type(torch.LongTensor) test_x = torch.from_numpy(np.array(test_phrase)).type(torch.LongTensor) train_y = torch.from_numpy(np.array(train_sentiment)).type(torch.LongTensor) rnn = RNN(len(vocabulary) + 1, 16, 40, 5) optimizer = torch.optim.Adam(rnn.parameters(), lr = LR) loss_func = torch.nn.CrossEntropyLoss() torch_dataset = Data.TensorDataset(train_x, train_y) loader = Data.DataLoader( dataset = torch_dataset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 2 ) for epoch in range(EPOCH): for step, (batch_x, batch_y) in enumerate(loader): out = rnn(batch_x) loss = loss_func(out, batch_y) optimizer.zero_grad() loss.backward() optimizer.step() test_y = torch.max(rnn(test_x), 1)[1] outData = pd.DataFrame({‘PhraseId‘:test_data.PhraseId, ‘Sentiment‘:test_y}) outData.to_csv(‘../test/RNN_result.csv‘, index = False)
RNN
实际上,由于神经网络训练的计算成本较高,所以在训练代数和训练数据规模上,都远逊于线性模型。
但由于近似定理保证了任意连续函数,都能用神经网络来近似,所以神经网络的上限要比线性模型高得多。
CNN由于卷积可以压缩数据,而图片的信息在一定范围内有一定程度的相似性,所以在CV中应用广泛。
FNN由于神经元具有记忆能力,可以反映顺序关系,所以在NLP中应用广泛。