代码详解:用Python构建RNN模拟人脑
当你理解一句话的时候,肯定不会每次都会对里面的一词一句进行重新理解吧!这个很容易理解:因为人类拥有记忆力,所以在阅读一篇文章时,我们会根据之前对这些词的理解来掌握文本内容。
那么,算法可以复制这个特征吗?其实与之最为接近的技术是神经网络(NN)。但遗憾的是,传统的NN无法做到这一点。举个例子:当想要预测一段视频中接下来会发生什么时,传统的神经网络是很难产生准确结果的。
这就是循环神经网络(RNN)的概念能够发挥作用的地方。如今,RNN在深度学习领域越来越流行。RNN的一些实际应用包括:
· 语音识别
· 机器翻译
· 音乐创作
· 手写识别
· 语法学习
本文会快速回顾典型的RNN模型中的关键元素,然后将设置问题陈述,通过在Python中从头开始实施RNN模型来最终解决问题。
闪回:循环神经网络概念的回顾
让我们来快速回顾一下循环神经网络的概念。
我们将使用序列数据的例子,比如特定公司的股票。一个简单的机器学习模型或人工神经网络,可以学习根据一些特征预测股票价格,如股票的数量、开盘价值等。除此之外,价格还取决于该股票在之前的几个星期和几个星期是如何表现出色的。对于交易者而言,这些历史数据实际上是进行预测的主要决定因素。
在传统的前馈神经网络中,所有测试用例都被认为是独立的。在预测股价时,你能看出这是不合适的吗?NN模型不会考虑以前的股票价格——不是一个好主意!
当面对时间敏感数据时,我们可以依赖另一个概念——循环神经网络(RNN)!
一个典型的RNN是这样的:
起初这可能看起来很吓人。但是一旦开始,事情开始变得更简单了:
现在,我们可以更容易地想象出这些网络如何考虑股票价格的走势 这有助于预测当天的价格。在这里,时间t(h_t)的每个预测都取决于所有先前的预测和从它们学习的信息。相当直截了当吧?
RNN可以在很大程度上解决序列处理目的,但并非完全如此。
文本是序列数据的另一个很好的例子。能够预测给定文本之后出现的单词或短语可能是非常有用的资产。我们希望模型能够写出莎士比亚的十四行诗!
现在,RNN在涉及短或小的环境时非常棒。但是为了能够编故事并记住它,模型应该能够理解序列背后的背景,就像人脑一样。
利用RNN进行序列预测
在本文中,我们将使用RNN处理序列预测问题。对此最简单的任务之一是正弦波预测。序列包含可见趋势,使用启发式方法很容易解决。这就是正弦波的样子:
我们将首先从头开始设计一个循环神经网络来解决这个问题。RNN模型也应该能够很好地推广,以便我们可以将其应用于其他序列问题。
我们将这样制定问题——给定一个属于正弦波的50个数字的序列,预测系列中的第51个数字。是时候启动Jupyter notebook了(或者是你选择的IDE)!
使用Python对RNN进行编程
步骤0:数据准备
这是任何数据科学项目中不可避免的第一步——做任何其他事情之前准备数据。
网络模型期望数据是什么样的?它将接受长度为50的单个序列作为输入。所以输入数据的形状将是:
(number_of_records x length_of_sequence x types_of_sequences)
在这里,types_of_sequences是1,因为我们只有一种类型的序列——正弦波。
另一方面,每个记录的输出只有一个值。这当然是输入序列中的第51个值。 所以它的形状将是:
(number_of_records x types_of_sequences) #where types_of_sequences is 1
让我们深入研究代码。首先,导入必要的库:
%pylab inline
import math
要创建像数据一样的正弦波,我们将使用Python数学库中的正弦函数:
sin_wave = np.array([math.sin(x) for x in np.arange(200)])
将刚刚生成的正弦波可视化:
plt.plot(sin_wave[:50])
我们将在下面的代码块中创建数据:
X = []
Y = []
seq_len = 50
num_records = len(sin_wave) - seq_len
for i in range(num_records - 50):
X.append(sin_wave[i:i+seq_len])
Y.append(sin_wave[i+seq_len])
X = np.array(X)
X = np.expand_dims(X, axis=2)
Y = np.array(Y)
Y = np.expand_dims(Y, axis=1)
打印数据的形状:
X.shape, Y.shape
((100, 50, 1), (100, 1))
请注意,我们为(num_records - 50)循环,因为我们想要留出50条记录作为验证数据。我们现在可以创建此验证数据:
X_val = []
Y_val = []
for i in range(num_records - 50, num_records):
X_val.append(sin_wave[i:i+seq_len])
Y_val.append(sin_wave[i+seq_len])
X_val = np.array(X_val)
X_val = np.expand_dims(X_val, axis=2)
Y_val = np.array(Y_val)
Y_val = np.expand_dims(Y_val, axis=1)
步骤1:为RNN模型创建架构
下一个任务是定义将在RNN模型中使用的所有必要变量和函数。我们的模型将接受输入序列,通过100个单位的隐藏层处理它,并产生单值输出:
learning_rate = 0.0001
nepoch = 25
T = 50 # length of sequence
hidden_dim = 100
output_dim = 1
bptt_truncate = 5
min_clip_value = -10
max_clip_value = 10
然后我们将定义网络的权重:
U = np.random.uniform(0, 1, (hidden_dim, T))
W = np.random.uniform(0, 1, (hidden_dim, hidden_dim))
V = np.random.uniform(0, 1, (output_dim, hidden_dim))
在这里,
· U是输入和隐藏图层之间权重的权重矩阵
· V是隐藏层和输出层之间权重的权重矩阵
· W是RNN层(隐藏层)中共享权重的权重矩阵
最后,我们将定义要在隐藏层中使用的激活函数sigmoid:
def sigmoid(x):
return 1 / (1 + np.exp(-x))
步骤2:训练模型
既然我们已经定义了模型,终于可以继续训练序列数据了。我们可以将培训过程细分为更小的步骤,即:
· 步骤 2.1 : 检查训练数据中的损失
· 步骤 2.1.1 : 前向传播
· 步骤 2.1.2 : 计算错误
· 步骤 2.2 : 检查验证数据的损失
· 步骤 2.2.1 : 前向传播
· 步骤 2.2.2 : 计算错误
· 步骤 2.3 : 开始真正的训练
· Step 2.3.1 : 前向传播
· Step 2.3.2 : 反向传播错误
· Step 2.3.3 : 更新权重
我们需要重复这些步骤直到结束。如果模型开始过度装备,请停止!或者只是预先定义epoch的数量。
步骤2.1:检查训练数据中的损失
我们将通过RNN模型进行前向传递,并计算所有记录的预测的平方误差,以获得损失值。
for epoch in range(nepoch):
# check loss on train
loss = 0.0
# do a forward pass to get prediction
for i in range(Y.shape[0]):
x, y = X[i], Y[i] # get input, output values of each record
prev_s = np.zeros((hidden_dim, 1)) # here, prev-s is the value of the previous activation of hidden layer; which is initialized as all zeroes
for t in range(T):
new_input = np.zeros(x.shape) # we then do a forward pass for every timestep in the sequence
new_input[t] = x[t] # for this, we define a single input for that timestep
mulu = np.dot(U, new_input)
mulw = np.dot(W, prev_s)
add = mulw + mulu
s = sigmoid(add)
mulv = np.dot(V, s)
prev_s = s
# calculate error
loss_per_record = (y - mulv)**2 / 2
loss += loss_per_record
loss = loss / float(y.shape[0])
步骤2.2:检查验证数据的损失
我们将对计算验证数据的损失(在同一循环中)做同样的事情:
# check loss on val
val_loss = 0.0
for i in range(Y_val.shape[0]):
x, y = X_val[i], Y_val[i]
prev_s = np.zeros((hidden_dim, 1))
for t in range(T):
new_input = np.zeros(x.shape)
new_input[t] = x[t]
mulu = np.dot(U, new_input)
mulw = np.dot(W, prev_s)
add = mulw + mulu
s = sigmoid(add)
mulv = np.dot(V, s)
prev_s = s
loss_per_record = (y - mulv)**2 / 2
val_loss += loss_per_record
val_loss = val_loss / float(y.shape[0])
print('Epoch: ', epoch + 1, ', Loss: ', loss, ', Val Loss: ', val_loss)
你应该得到以下输出:
Epoch: 1 , Loss: [[101185.61756671]] , Val Loss: [[50591.0340148]]
...
...
步骤2.3:开始实际培训
我们现在开始对网络进行实际培训。在这里,首先进行前向传递以计算误差,然后使用后向传递来计算梯度并进行更新。
步骤2.3.1:正向传递在正向传递中:
· 首先将输入与输入和隐藏层之间的权重相乘
· 在RNN层中添加权重乘以此项。因为我们希望捕获前一个时间步的知识
· 通过sigmoid激活功能
· 将其与隐藏层和输出层之间的权重相乘
· 在输出层,对值进行线性激活,因此不会通过激活层显式传递值
· 在代码字典中保存当前图层的状态以及上一个时间步的状态
以下是正向传递的代码(注意是上一循环的延续):
# train model
for i in range(Y.shape[0]):
x, y = X[i], Y[i]
layers = []
prev_s = np.zeros((hidden_dim, 1))
dU = np.zeros(U.shape)
dV = np.zeros(V.shape)
dW = np.zeros(W.shape)
dU_t = np.zeros(U.shape)
dV_t = np.zeros(V.shape)
dW_t = np.zeros(W.shape)
dU_i = np.zeros(U.shape)
dW_i = np.zeros(W.shape)
# forward pass
for t in range(T):
new_input = np.zeros(x.shape)
new_input[t] = x[t]
mulu = np.dot(U, new_input)
mulw = np.dot(W, prev_s)
add = mulw + mulu
s = sigmoid(add)
mulv = np.dot(V, s)
layers.append({'s':s, 'prev_s':prev_s})
prev_s = s
步骤2.3.2:反向传播误差在前向传播步骤之后,我们计算每一层的梯度和反向传播的误差。我们将使用截断的反向传播(TBPTT),而不是vanilla backprop。这可能听起来很复杂但实际上非常直接。
BPTT与backprop的核心差异在于,对于RNN层中的所有时间步骤,都进行了反向传播步骤。因此,如果序列长度为50,将会反向传播当前时间步之前的所有时间步长。
如果你猜对了,那么BPTT在计算上看起来非常昂贵。因此,我们不是反向传播所有先前的时间步,而是反向传播直到x时间步以节省计算能力。考虑这在概念上类似于随机梯度下降,其中包括一批数据点而不是所有数据点。
以下是反向传播误差的代码:
# derivative of pred
dmulv = (mulv - y)
# backward pass
for t in range(T):
dV_t = np.dot(dmulv, np.transpose(layers[t]['s']))
dsv = np.dot(np.transpose(V), dmulv)
ds = dsv
dadd = add * (1 - add) * ds
dmulw = dadd * np.ones_like(mulw)
dprev_s = np.dot(np.transpose(W), dmulw)
for i in range(t-1, max(-1, t-bptt_truncate-1), -1):
ds = dsv + dprev_s
dadd = add * (1 - add) * ds
dmulw = dadd * np.ones_like(mulw)
dmulu = dadd * np.ones_like(mulu)
dW_i = np.dot(W, layers[t]['prev_s'])
dprev_s = np.dot(np.transpose(W), dmulw)
new_input = np.zeros(x.shape)
new_input[t] = x[t]
dU_i = np.dot(U, new_input)
dx = np.dot(np.transpose(U), dmulu)
dU_t += dU_i
dW_t += dW_i
dV += dV_t
dU += dU_t
dW += dW_t
步骤2.3.3:更新权重最后,使用计算的权重梯度更新权重。必须记住,如果不对它们进行检查,梯度往往会爆炸。这是训练神经网络的一个基本问题,称为爆炸梯度问题。所以必须将它们夹在一个范围内,我们可以这样做:
if dU.max() > max_clip_value:
dU[dU > max_clip_value] = max_clip_value
if dV.max() > max_clip_value:
dV[dV > max_clip_value] = max_clip_value
if dW.max() > max_clip_value:
dW[dW > max_clip_value] = max_clip_value
if dU.min() < min_clip_value:
dU[dU < min_clip_value] = min_clip_value
if dV.min() < min_clip_value:
dV[dV < min_clip_value] = min_clip_value
if dW.min() < min_clip_value:
dW[dW < min_clip_value] = min_clip_value
# update
U -= learning_rate * dU
V -= learning_rate * dV
W -= learning_rate * dW
在训练上述模型时,得到了这个输出:
Epoch: 1 , Loss: [[101185.61756671]] , Val Loss: [[50591.0340148]]
Epoch: 2 , Loss: [[61205.46869629]] , Val Loss: [[30601.34535365]]
Epoch: 3 , Loss: [[31225.3198258]] , Val Loss: [[15611.65669247]]
Epoch: 4 , Loss: [[11245.17049551]] , Val Loss: [[5621.96780111]]
Epoch: 5 , Loss: [[1264.5157739]] , Val Loss: [[632.02563908]]
Epoch: 6 , Loss: [[20.15654115]] , Val Loss: [[10.05477285]]
Epoch: 7 , Loss: [[17.13622839]] , Val Loss: [[8.55190426]]
Epoch: 8 , Loss: [[17.38870495]] , Val Loss: [[8.68196484]]
Epoch: 9 , Loss: [[17.181681]] , Val Loss: [[8.57837827]]
Epoch: 10 , Loss: [[17.31275313]] , Val Loss: [[8.64199652]]
Epoch: 11 , Loss: [[17.12960034]] , Val Loss: [[8.54768294]]
Epoch: 12 , Loss: [[17.09020065]] , Val Loss: [[8.52993502]]
Epoch: 13 , Loss: [[17.17370113]] , Val Loss: [[8.57517454]]
Epoch: 14 , Loss: [[17.04906914]] , Val Loss: [[8.50658127]]
Epoch: 15 , Loss: [[16.96420184]] , Val Loss: [[8.46794248]]
Epoch: 16 , Loss: [[17.017519]] , Val Loss: [[8.49241316]]
Epoch: 17 , Loss: [[16.94199493]] , Val Loss: [[8.45748739]]
Epoch: 18 , Loss: [[16.99796892]] , Val Loss: [[8.48242177]]
Epoch: 19 , Loss: [[17.24817035]] , Val Loss: [[8.6126231]]
Epoch: 20 , Loss: [[17.00844599]] , Val Loss: [[8.48682234]]
Epoch: 21 , Loss: [[17.03943262]] , Val Loss: [[8.50437328]]
Epoch: 22 , Loss: [[17.01417255]] , Val Loss: [[8.49409597]]
Epoch: 23 , Loss: [[17.20918888]] , Val Loss: [[8.5854792]]
Epoch: 24 , Loss: [[16.92068017]] , Val Loss: [[8.44794633]]
Epoch: 25 , Loss: [[16.76856238]] , Val Loss: [[8.37295808]]
看起来不错!是时候进行预测并绘制它们以获得设计视觉感受了。
步骤3:获得预测
通过训练的权重向前传递以获得预测:
preds = []
for i in range(Y.shape[0]):
x, y = X[i], Y[i]
prev_s = np.zeros((hidden_dim, 1))
# Forward pass
for t in range(T):
mulu = np.dot(U, x)
mulw = np.dot(W, prev_s)
add = mulw + mulu
s = sigmoid(add)
mulv = np.dot(V, s)
prev_s = s
preds.append(mulv)
preds = np.array(preds)
将预测与实际值一起绘制:
plt.plot(preds[:, 0, 0], 'g')
plt.plot(Y[:, 0], 'r')
plt.show()
这是关于培训数据的。怎么知道模型是不是过度拟合的?这是之前创建的验证集发挥作用的地方:
preds = []
for i in range(Y_val.shape[0]):
x, y = X_val[i], Y_val[i]
prev_s = np.zeros((hidden_dim, 1))
# For each time step...
for t in range(T):
mulu = np.dot(U, x)
mulw = np.dot(W, prev_s)
add = mulw + mulu
s = sigmoid(add)
mulv = np.dot(V, s)
prev_s = s
preds.append(mulv)
preds = np.array(preds)
plt.plot(preds[:, 0, 0], 'g')
plt.plot(Y_val[:, 0], 'r')
plt.show()
还不错!预测看起来令人印象深刻。验证数据的RMSE分数也是可以接受的:
from sklearn.metrics import mean_squared_error
math.sqrt(mean_squared_error(Y_val[:, 0] * max_val, preds[:, 0, 0] * max_val))
0.127191931509431