基于神经网络的情感分析

依旧是采用上一篇文章中Kaggle上的比赛,来对神经网络中最基础的前馈神经网络、卷积神经网络、循环神经网络做简要的介绍。

神经网络可以理解为一张有向无环图,被划分为输入层、隐藏层、输出层三部分,给定的输入被输入层接收后,经过隐藏层的一系列计算,最后把结果给到输出层。

隐藏层的每一个节点(神经元)会以一定的权重系数接收上一层部分或全部的计算结果,并将得到的值经过一个函数(激励函数)计算后传递给下一层的节点。

其中接收时的系数,就是我们要训练的部分。

理论上任意一个连续函数,都能用神经网络来拟合。

1.数据处理

文件读取

和之前一样,用pandas打开并读取相应行的文件,在此不赘述。

由于神经网络中,需要训练的参数要比线性模型多得多,所以这里只提取了前一千个词来训练。

train_data = pd.read_csv(‘../train/train.tsv‘, sep = ‘\t‘)
    test_data = pd.read_csv(‘../test/test.tsv‘, sep = ‘\t‘)

    train_phrase = list(train_data[‘Phrase‘][:1000])
    train_sentiment = list(train_data[‘Sentiment‘][:1000])
    test_phrase = list(test_data[‘Phrase‘])

分词器

不同于线性模型,由于神经网络本身就可以反应一定的顺序关系,所以这里我们把每个单词对应到某个数字,再把句子里的单词换成对应的数字,从而减少数据量,提高运行速度。

torchtext库提供了一系列非常高效的分词工具,但由于这个实验中用不到这么多的功能,所以我自己手写了一个分词器。

首先提取所有句子中的单词,来生成词典。

def build_vocabulary(phrase):
    ret = dict()
    cnt = 1
    
    for s in phrase:
        for x in s.split():
            if(x in ret.keys()):
                continue
                
            ret[x] = cnt
            cnt = cnt + 1

    return ret

vocabulary = build_vocabulary(train_phrase + test_phrase)

之后用分词器,来将句子中的单词变为数字。

def tokenizer(phrase, vocabulary):
    ret = []

    for s in phrase:
        tmp = []
        for x in s.split():
            tmp.append(vocabulary[x])
        ret.append(tmp)

    return ret

train_phrase = tokenizer(train_phrase, vocabulary)
test_phrase = tokenizer(test_phrase, vocabulary)

Embedding

上面分词中,用数字代替单词,有一个很大的问题。

问题在于,由于拟合时是直接用这些数字来计算,数字本身的大小会很大程度上影响拟合的效果。

由于单词本身也会有一定的感情倾向,相同感情的单词一定程度上会聚集在一定区域,这启发我们用一些向量来代替这些单词。

也就是说,把一个长度为length的句子,转化为一个length×dim的二维矩阵,其中dim为向量的维数。

这就是embedding的思想。

由于pytorch中将embedding直接作为神经层,可以和神经网络一起来训练,所以代码在神经网络部分给出。

批训练

受内存限制,一次在神经网络中同时训练的数据不能太多,所以需要分批放入神经网络中训练。

torch_dataset = Data.TensorDataset(train_x, train_y)
    loader = Data.DataLoader(
        dataset = torch_dataset,
        batch_size = BATCH_SIZE,
        shuffle = True,
        num_workers = 2
    )
    
    for epoch in range(EPOCH):
        for step, (batch_x, batch_y) in enumerate(loader):
            # training...

这里第一层循环表示训练的代数,第二层循环来获取每一个batch中的数据。

长度统一

由于不同句子的单词数量不同,而同一个batch只能传入一个矩阵,所以我们需要将所有句子的长度统一起来。

一个直观的想法是把空位用0来填充。

def expand_with_zero(phrase, length):
    ret = []
    for s in phrase:
        tmp = s.copy()
        for i in range(length - len(s)):
            tmp.append(0)
        ret.append(tmp)
    return ret 

max_length = len(max(train_phrase + test_phrase, key = len, default = ‘‘))
train_phrase = expand_with_zero(train_phrase, max_length + 1)
test_phrase = expand_with_zero(test_phrase, max_length + 1)

但这样的问题在于,0在embedding中也会有一个向量来对应,这个向量会影响训练的结果。

尤其是在一个特别短的句子里,过多的0会有很大的影响。

一个解决方案是用pad_squence系列函数,来将数据压缩打包,再传给神经网络,但这一方法和pytorch自带的embedding不兼容。

另一个解决方法是运用pytorch动态图的特性,只处理向量非零的部分。

具体代码见下面神经网络部分。

2.搭建网络

前馈神经网络

是三个网络中最简单的一个,隐藏层的每一层都和上一层的神经元完全连接。

class FNN(torch.nn.Module):
    def __init__(self, n_feature, n_hidden, n_output):
        super(FNN, self).__init__()
        self.hidden = torch.nn.Linear(n_feature, n_hidden)
        self.predict = torch.nn.Linear(n_hidden, n_output)

    def forward(self, x):
        x = function.relu(self.hidden(x))
        x = self.predict(x)
        return x

卷积神经网络

有两个特殊的隐藏层:卷积层和汇聚层。

简单来说,卷积层可以提高单个节点信息的密度(或者干脆理解为增大某个维度),汇聚层可以降低单个节点的大小(降低某个维度)。

class CNN(torch.nn.Module):
    def __init__(self, n_feature, embedding_dim, n_output, max_length):
        super(CNN, self).__init__()

        self.embedding = torch.nn.Embedding(n_feature, embedding_dim)

        self.conv = torch.nn.Sequential(
            torch.nn.Conv1d(
                in_channels = embedding_dim,
                out_channels = 16,
                kernel_size = 5,
                stride = 1,
                padding = 2
            ),
            torch.nn.ReLU(),
            torch.nn.MaxPool1d(
                kernel_size = 2,
                padding = max_length % 2
            )
        )
        
        self.out = torch.nn.Linear(math.ceil(max_length / 2) * 16, n_output)

    def forward(self, x):
        x = self.embedding(x)

        x = x.permute(0, 2, 1)
        x = self.conv(x)
        
        x = x.view(x.size(0), -1)
        x = self.out(x)
        
        return x

这里的embedding如上文所述,最后的view用来将信息展开,使得最后的信息除batch_size外为一维,可以被输出层接收。

循环神经网络

每个节点更新时,除了用上一层的节点,还要用到该层节点前几次更新时的信息。

class RNN(torch.nn.Module):
    def __init__(self, n_feature, embedding_dim, n_hidden, n_output):
        super(RNN, self).__init__()

        self.embedding = torch.nn.Embedding(n_feature, embedding_dim)
        
        self.hidden = torch.nn.LSTM(embedding_dim, n_hidden, batch_first = True)
        
        self.out = torch.nn.Linear(n_hidden, n_output)

    def forward(self, x):
        length = []
        for s in x:
            for i in range(len(s)):
                if(s[i] == 0):
                    length.append(i - 1)
                    break
       
        x = self.embedding(x)

        x, _ = self.hidden(x)

        for i in range(len(x)):
            current = x[i][length[i]].unsqueeze(0)
            if(i == 0):
                t = current
            else:
                t = torch.cat((t, current), dim = 0)

        x = self.out(t)
        
        return function.softmax(x, dim = 1)

这里forward处理不同长度的情况,最后传递到输出层时,只处理最后一次有效更新的位置(最后一个非零位)。

3.模型训练及预测

模型训练采用梯度法改进的Adam法。

rnn = RNN(len(vocabulary) + 1, 16, 40, 5)
    optimizer = torch.optim.Adam(rnn.parameters(), lr = LR)
    loss_func = torch.nn.CrossEntropyLoss()    

    for epoch in range(EPOCH):
        for step, (batch_x, batch_y) in enumerate(loader):
            out = rnn(batch_x)
            loss = loss_func(out, batch_y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

预测时取输出向量最大的维度(概率)作为预测值。

test_y = torch.max(rnn(test_x), 1)[1]

    outData = pd.DataFrame({‘PhraseId‘:test_data.PhraseId, ‘Sentiment‘:test_y})
    outData.to_csv(‘../test/RNN_result.csv‘, index = False)

最后给出各神经网络完整的代码:

import numpy
import pandas as pd
import torch
from torch.autograd import Variable
import torch.nn.functional as function
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import CountVectorizer

class FNN(torch.nn.Module):
    def __init__(self, n_feature, n_hidden, n_output):
        super(FNN, self).__init__()
        self.hidden = torch.nn.Linear(n_feature, n_hidden)
        self.predict = torch.nn.Linear(n_hidden, n_output)

    def forward(self, x):
        x = function.relu(self.hidden(x))
        x = self.predict(x)
        return x

if __name__ == ‘__main__‘:
    trainData = pd.read_csv("../train/train.tsv", sep = ‘\t‘)
    
    trainPhrase = trainData[‘Phrase‘][:1000]
    trainSentiment = trainData[‘Sentiment‘][:1000]
    
    testData = pd.read_csv("../test/test.tsv", sep = ‘\t‘)
    testPhrase = testData[‘Phrase‘]

    vectorizer = CountVectorizer(ngram_range = (1, 1))
    vectorizer.fit(pd.concat([trainPhrase, testPhrase]))

    trainX = vectorizer.transform(trainPhrase).todense()
    testX = vectorizer.transform(testPhrase).todense()
    trainY = numpy.array(list(trainSentiment))

    scaler = StandardScaler()
    trainX = scaler.fit_transform(trainX)

    pca = PCA(n_components = 0.9).fit(trainX)
    trainX = pca.transform(trainX)
    testX = pca.transform(testX)
    (ndim, dim) = trainX.shape

    trainX = torch.from_numpy(trainX).type(torch.FloatTensor)
    testX = torch.from_numpy(testX).type(torch.FloatTensor)
    trainY = torch.from_numpy(trainY).type(torch.LongTensor)

    trainX, testX, trainY = Variable(trainX), Variable(testX), Variable(trainY)

    fnn = FNN(dim, dim, 5)
    optimizer = torch.optim.SGD(fnn.parameters(), lr = 0.2)
    loss_func = torch.nn.CrossEntropyLoss()

    for i in range(50):
        out = fnn(trainX)
        loss = loss_func(out, trainY)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    testY = torch.max(function.softmax(fnn(testX), dim = 1), 1)[1]

    outData = pd.DataFrame({‘PhraseId‘:testData.PhraseId, ‘Sentiment‘:testY})
    outData.to_csv(‘../test/FNN_result.csv‘, index = False)

FNN

import math
import numpy as np
import pandas as pd
import torch
from torch.autograd import Variable
import torch.nn.functional as function
import torch.utils.data as Data

BATCH_SIZE = 1000
EPOCH = 20

def build_vocabulary(phrase):
    ret = dict()
    cnt = 1
    
    for s in phrase:
        for x in s.split():
            if(x in ret.keys()):
                continue
                
            ret[x] = cnt
            cnt = cnt + 1

    return ret

def tokenizer(phrase, vocabulary):
    ret = []

    for s in phrase:
        tmp = []
        for x in s.split():
            tmp.append(vocabulary[x])
        ret.append(tmp)

    return ret

def expand_with_zero(phrase, length):
    ret = []
    for s in phrase:
        tmp = s.copy()
        for i in range(length - len(s)):
            tmp.append(0)
        ret.append(tmp)
    return ret

class CNN(torch.nn.Module):
    def __init__(self, n_feature, embedding_dim, n_output, max_length):
        super(CNN, self).__init__()

        self.embedding = torch.nn.Embedding(n_feature, embedding_dim)

        self.conv = torch.nn.Sequential(
            torch.nn.Conv1d(
                in_channels = embedding_dim,
                out_channels = 16,
                kernel_size = 5,
                stride = 1,
                padding = 2
            ),
            torch.nn.ReLU(),
            torch.nn.MaxPool1d(
                kernel_size = 2,
                padding = max_length % 2
            )
        )
        
        self.out = torch.nn.Linear(math.ceil(max_length / 2) * 16, n_output)

    def forward(self, x):
        x = self.embedding(x)

        x = x.permute(0, 2, 1)
        x = self.conv(x)
        
        x = x.view(x.size(0), -1)
        x = self.out(x)
        
        return x
    
if __name__ == ‘__main__‘: 
    train_data = pd.read_csv(‘../train/train.tsv‘, sep = ‘\t‘)
    test_data = pd.read_csv(‘../test/test.tsv‘, sep = ‘\t‘)

    train_phrase = list(train_data[‘Phrase‘][:10000])
    train_sentiment = list(train_data[‘Sentiment‘][:10000])
    test_phrase = list(test_data[‘Phrase‘])

    vocabulary = build_vocabulary(train_phrase + test_phrase)
    
    train_phrase = tokenizer(train_phrase, vocabulary)
    test_phrase = tokenizer(test_phrase, vocabulary)

    max_length = len(max(train_phrase + test_phrase, key = len, default = ‘‘))
    train_phrase = expand_with_zero(train_phrase, max_length)
    test_phrase = expand_with_zero(test_phrase, max_length)

    train_x = torch.from_numpy(np.array(train_phrase)).type(torch.LongTensor)
    test_x = torch.from_numpy(np.array(test_phrase)).type(torch.LongTensor)
    train_y = torch.from_numpy(np.array(train_sentiment)).type(torch.LongTensor)

    cnn = CNN(len(vocabulary) + 1, 32, 5, max_length)
    optimizer = torch.optim.Adam(cnn.parameters(), lr = 0.2)
    loss_func = torch.nn.CrossEntropyLoss()

    torch_dataset = Data.TensorDataset(train_x, train_y)
    loader = Data.DataLoader(
        dataset = torch_dataset,
        batch_size = BATCH_SIZE,
        shuffle = True,
        num_workers = 2
    )
    
    for epoch in range(EPOCH):
        for step, (batch_x, batch_y) in enumerate(loader):
            out = cnn(batch_x)
            loss = loss_func(out, batch_y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    test_y = torch.max(function.softmax(cnn(test_x), dim = 1), 1)[1]

    outData = pd.DataFrame({‘PhraseId‘:test_data.PhraseId, ‘Sentiment‘:test_y})
    outData.to_csv(‘../test/CNN_result.csv‘, index = False)

CNN

import math
import numpy as np
import pandas as pd
import torch
from torch.autograd import Variable
import torch.nn.functional as function
import torch.utils.data as Data

BATCH_SIZE = 1000
EPOCH = 20
LR = 0.2

def build_vocabulary(phrase):
    ret = dict()
    cnt = 1
    
    for s in phrase:
        for x in s.split():
            if(x in ret.keys()):
                continue
                
            ret[x] = cnt
            cnt = cnt + 1

    return ret

def tokenizer(phrase, vocabulary):
    ret = []

    for s in phrase:
        tmp = []
        for x in s.split():
            tmp.append(vocabulary[x])
        ret.append(tmp)

    return ret

def expand_with_zero(phrase, length):
    ret = []
    for s in phrase:
        tmp = s.copy()
        for i in range(length - len(s)):
            tmp.append(0)
        ret.append(tmp)
    return ret

class RNN(torch.nn.Module):
    def __init__(self, n_feature, embedding_dim, n_hidden, n_output):
        super(RNN, self).__init__()

        self.embedding = torch.nn.Embedding(n_feature, embedding_dim)
        
        self.hidden = torch.nn.LSTM(embedding_dim, n_hidden, batch_first = True)
        
        self.out = torch.nn.Linear(n_hidden, n_output)

    def forward(self, x):
        length = []
        for s in x:
            for i in range(len(s)):
                if(s[i] == 0):
                    length.append(i - 1)
                    break
       
        x = self.embedding(x)

        x, _ = self.hidden(x)

        for i in range(len(x)):
            current = x[i][length[i]].unsqueeze(0)
            if(i == 0):
                t = current
            else:
                t = torch.cat((t, current), dim = 0)

        x = self.out(t)
        
        return function.softmax(x, dim = 1)
    
if __name__ == ‘__main__‘: 
    train_data = pd.read_csv(‘../train/train.tsv‘, sep = ‘\t‘)
    test_data = pd.read_csv(‘../test/test.tsv‘, sep = ‘\t‘)

    train_phrase = list(train_data[‘Phrase‘][:1000])
    train_sentiment = list(train_data[‘Sentiment‘][:1000])
    test_phrase = list(test_data[‘Phrase‘])

    vocabulary = build_vocabulary(train_phrase + test_phrase)
    
    train_phrase = tokenizer(train_phrase, vocabulary)
    test_phrase = tokenizer(test_phrase, vocabulary)

    max_length = len(max(train_phrase + test_phrase, key = len, default = ‘‘))
    train_phrase = expand_with_zero(train_phrase, max_length + 1)
    test_phrase = expand_with_zero(test_phrase, max_length + 1)

    train_x = torch.from_numpy(np.array(train_phrase)).type(torch.LongTensor)
    test_x = torch.from_numpy(np.array(test_phrase)).type(torch.LongTensor)
    train_y = torch.from_numpy(np.array(train_sentiment)).type(torch.LongTensor)

    rnn = RNN(len(vocabulary) + 1, 16, 40, 5)
    optimizer = torch.optim.Adam(rnn.parameters(), lr = LR)
    loss_func = torch.nn.CrossEntropyLoss()

    torch_dataset = Data.TensorDataset(train_x, train_y)
    loader = Data.DataLoader(
        dataset = torch_dataset,
        batch_size = BATCH_SIZE,
        shuffle = True,
        num_workers = 2
    )
    
    for epoch in range(EPOCH):
        for step, (batch_x, batch_y) in enumerate(loader):
            out = rnn(batch_x)
            loss = loss_func(out, batch_y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    test_y = torch.max(rnn(test_x), 1)[1]

    outData = pd.DataFrame({‘PhraseId‘:test_data.PhraseId, ‘Sentiment‘:test_y})
    outData.to_csv(‘../test/RNN_result.csv‘, index = False)

RNN

实际上,由于神经网络训练的计算成本较高,所以在训练代数和训练数据规模上,都远逊于线性模型。

但由于近似定理保证了任意连续函数,都能用神经网络来近似,所以神经网络的上限要比线性模型高得多。

CNN由于卷积可以压缩数据,而图片的信息在一定范围内有一定程度的相似性,所以在CV中应用广泛。

FNN由于神经元具有记忆能力,可以反映顺序关系,所以在NLP中应用广泛。