用于高级机器学习的自定义TensorFlow损失函数
在本文中,我们将看看:
- 在高级机器学习(ML)应用程序中使用自定义损失函数
- 定义自定义损失函数并集成到基本Tensorflow神经网络模型
- 一个简单的知识蒸馏学习的例子
介绍
机器学习中预定义的损失函数,可为您尝试优化的问题提供合适的损失值。我们常见的有用于分类的交叉熵损失和用于回归问题的均方误差(MSE)或均方根误差(RMSE)。流行的机器学习(ML)包包括前端(如Keras)和后端(如Tensorflow),其中包括一组用于大多数分类和回归任务的基本损失函数。但是,极个别情况下您可能需要解决某个问题的自定义损失函数,这些函数仅受有效张量运算的约束。
在Keras中,您可以在技术上创建自己的损失函数,但是损失函数的形式仅限于some_loss(y_true,y_pred)。如果您尝试以some_loss_1(y_true,y_pred,** kwargs)的形式向损失添加其他参数,Keras将抛出运行时异常。有很多方法可以解决这个问题,但总的来说我们需要一种 可扩展的方法来编写一个接受我们传递给它的任何有效参数的损失函数,并以标准和预期的方式在我们的张量上运行。我们将看到如何直接使用Tensorflow从头开始编写神经网络并构建自定义损失函数来训练它。
Tensorflow
Tensorflow(TF)是一种符号和数值计算引擎,它允许我们将张量一起串联到计算图中并对它们进行反向传播。Keras是在Tensorflow之上运行的API或前端,它可以方便地打包使用Tensorflow构建的标准架构(例如各种预定义的神经网络层),并抽象出TF的许多低级机制。然而,在使这些架构的过程中,粒度级别的控制和执行非常具体的事情的能力就丧失了。
为了简单起见,张量是多维数组,其形状类似元组(feature_dim, n_features)
例子是能够定义接受任意数量参数的自定义损失函数,并且能够使用网络内部的任意张量和网络外部的输入张量计算损失。严格地说,TF中的损失函数甚至不需要是python函数,而只需是TF张量对象上操作的有效组合。前一点很重要,因为自定义损失来自于计算任意张量上的损失,而不仅仅是严格意义上的监督目标张量和网络输出张量(y_true, y_pred)的形式。
在我们得到自定义损失之前,让我们简要回顾一下基本的2层dense网络(MLP),看看它是如何在TF中定义和训练的。虽然有预定义的TF层,但我们从头开始定义权重和偏差。Python代码如下:
# A simple Tensorflow 2 layer dense network example import tensorflow as tf import numpy as np from sklearn import datasets from sklearn.preprocessing import MinMaxScaler from sklearn.decomposition import PCA from sklearn.preprocessing import LabelBinarizer import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D # load the sklearn breast cancer dataset bc = datasets.load_breast_cancer() X = bc.data[:, :] Y = bc.target # min max scale and binarize the target labels scaler = MinMaxScaler() X = scaler.fit_transform(X,Y) label = LabelBinarizer() Y = label.fit_transform(Y) # train fraction frac = 0.9 # shuffle dataset idx = np.random.randint(X.shape[0], size=len(X)) X = X[idx] Y = Y[idx] train_stop = int(len(X) * frac) X_ = X[:train_stop] Y_ = Y[:train_stop] X_t = X[train_stop:] Y_t = Y[train_stop:] # plot the first 3 PCA dimensions of the sampled data fig = plt.figure(1, figsize=(8, 6)) ax = Axes3D(fig, elev=-150, azim=110) X_reduced = PCA(n_components=3).fit_transform(X_) ax.scatter(X_reduced[:, 0], X_reduced[:, 1], X_reduced[:, 2], c=Y_.ravel(), cmap=plt.cm.Set1, edgecolor='k', s=40) ax.set_title("First three PCA directions") ax.set_xlabel("1st eigenvector") ax.w_xaxis.set_ticklabels([]) ax.set_ylabel("2nd eigenvector") ax.w_yaxis.set_ticklabels([]) ax.set_zlabel("3rd eigenvector") ax.w_zaxis.set_ticklabels([]) plt.show() # create the TF neural net # some hyperparams training_epochs = 200 n_neurons_in_h1 = 10 n_neurons_in_h2 = 10 learning_rate = 0.1 n_features = len(X[0]) labels_dim = 1 ############################################# # basic 2 layer dense net (MLP) example adapted from # https://becominghuman.ai/creating-your-own-neural-network-using-tensorflow-fa8ca7cc4d0e # these placeholders serve as our input tensors x = tf.placeholder(tf.float32, [None, n_features], name='input') y = tf.placeholder(tf.float32, [None, labels_dim], name='labels') # TF Variables are our neural net parameter tensors, we initialize them to random (gaussian) values in # Layer1. Variables are allowed to be persistent across training epochs and updatable bt TF operations W1 = tf.Variable(tf.truncated_normal([n_features, n_neurons_in_h1], mean=0, stddev=1 / np.sqrt(n_features)), name='weights1') b1 = tf.Variable(tf.truncated_normal([n_neurons_in_h1], mean=0, stddev=1 / np.sqrt(n_features)), name='biases1') # note the output tensor of the 1st layer is the activation applied to a # linear transform of the layer 1 parameter tensors # the matmul operation calculates the dot product between the tensors y1 = tf.sigmoid((tf.matmul(x, W1) + b1), name='activationLayer1') # network parameters(weights and biases) are set and initialized (Layer2) W2 = tf.Variable(tf.random_normal([n_neurons_in_h1, n_neurons_in_h2], mean=0, stddev=1), name='weights2') b2 = tf.Variable(tf.random_normal([n_neurons_in_h2], mean=0, stddev=1), name='biases2') # activation function(sigmoid) y2 = tf.sigmoid((tf.matmul(y1, W2) + b2), name='activationLayer2') # output layer weights and biases Wo = tf.Variable(tf.random_normal([n_neurons_in_h2, labels_dim], mean=0, stddev=1 ), name='weightsOut') bo = tf.Variable(tf.random_normal([labels_dim], mean=0, stddev=1), name='biasesOut') # the sigmoid (binary softmax) activation is absorbed into TF's sigmoid_cross_entropy_with_logits loss logits = (tf.matmul(y2, Wo) + bo) loss = tf.nn.sigmoid_cross_entropy_with_logits(labels = y, logits = logits) # tap a separate output that applies softmax activation to the output layer # for training accuracy readout a = tf.nn.sigmoid(logits, name='activationOutputLayer') # optimizer used to compute gradient of loss and apply the parameter updates. # the train_step object returned is ran by a TF Session to train the net train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss) # prediction accuracy # compare predicted value from network with the expected value/target correct_prediction = tf.equal(tf.round(a), y) # accuracy determination accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="Accuracy") ############################################# # ***NOTE global_variables_initializer() must be called before creating a tf.Session()!*** init_op = tf.global_variables_initializer() # create a session for training and feedforward (prediction). Sessions are TF's way to run # feed data to placeholders and variables, obtain outputs and update neural net parameters with tf.Session() as sess: # ***initialization of all variables... NOTE this must be done before running any further sessions!*** sess.run(init_op) # training loop over the number of epochs batch_size = 50 batches = int(len(X_) / batch_size) for epoch in range(training_epochs): losses = 0 accs = 0 for j in range(batches): idx = np.random.randint(X_.shape[0], size=batch_size) X_b = X_[idx] Y_b = Y_[idx] # train the network, note the dictionary of inputs and labels sess.run(train_step, feed_dict={x: X_b, y: Y_b}) # feedforwad the same data and labels, but grab the accuracy and loss as outputs acc, l, soft_max_a = sess.run([accuracy, loss, a], feed_dict={x: X_b, y: Y_b}) losses = losses + np.sum(l) accs = accs + np.sum(acc) print("Epoch %.8d " % epoch, "avg train loss over", batches, " batches ", "%.4f" % (losses/batches), "avg train acc ", "%.4f" % (accs/batches)) # test on the holdout set acc, l, soft_max_a = sess.run([accuracy, loss, a], feed_dict={x: X_t, y: Y_t}) print("Epoch %.8d " % epoch, "test loss %.4f" % np.sum(l), "test acc %.4f" % acc) print(soft_max_a)
通过用softmax_cross_entropy_with_logits替换损失并用tf.nn.softmax替换最终的sigmoid激活,可以修改上面的代码以进行多类分类。
接下来演示如何使用任意张量的自定义损失函数,让我们实现一个知识蒸馏模型,该模型优化二元分类损失以及被训练模型和参考模型之间的损失。知识蒸馏是迁移学习的一种形式,我们用目标模型(我们想要训练的机器学习模型)学习,但也间接地从参考模型转移知识表示。我们将使用sklearn中的高斯过程分类器(GPC)作为我们的参考模型。我们还将通过将sklearn breast cancer机器学习数据集中569个样本的训练数据减少到1%,从而使问题变得更有趣……同时作为参考和目标,并从头开始训练它们。这是众所周知的小概率学习问题。
在传统的迁移学习中,参考模型通常是一个宽的和/或深的网络,对许多实例/类进行预训练,而目标是一个窄的/浅的网络,对一些可用的具实例/类进行训练。
这种知识精馏方案的损失看起来是这样的
二元交叉熵损失只是常规二元分类损失,第二项涉及目标f(x)和参考g(x)的输出之间的另一个损失D. 我们将D作为f(x)和g(x)之间的Kullback-Leibler散度(DKL ):
简单地说,DKL量化了分布f和分布g在信息方面的差异(大致信息与确定性成反比); 它可以被认为是分布之间的交叉熵,并且是可以取负值的不对称损失。通过最小化f和g之间的DKL,我们基本上是想增加f相对于g的信息内容。当f和g具有相同数量的信息时, 上面的log对象是0,DKL损失也是0。当使用GPC作为参考模型时,使用DKL作为损失是有意义的,因为当从GPC进行预测时,我们从其后验分布(softmax)进行采样,尽管我们的神经网络是后验的粗略近似,但它也是一个分布。
请注意我们现在需要如何将外部输入g(x)输入到损失中。在Keras,这个过程是人为的,不可扩展的。但是在TF中,它就像创建一个新的占位符张量一样简单,为组合损失添加必要的项,并在运行训练或预测会话时提供输入。Python代码如下:
# A simple Tensorflow 2 layer dense network example import tensorflow as tf from sklearn.gaussian_process import GaussianProcessClassifier import numpy as np from sklearn import datasets from sklearn.preprocessing import MinMaxScaler from sklearn.decomposition import PCA from sklearn.preprocessing import LabelBinarizer import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D # load the sklearn breast cancer dataset bc = datasets.load_breast_cancer() X = bc.data[:, :] Y = bc.target # min max scale and binarize the target labels scaler = MinMaxScaler() X = scaler.fit_transform(X,Y) label = LabelBinarizer() Y = label.fit_transform(Y) # train fraction frac = 0.01 np.random.seed(666) # shuffle dataset idx = np.random.randint(X.shape[0], size=len(X)) X = X[idx] Y = Y[idx] train_stop = int(len(X) * frac) test_stop = 100 X_ = X[:train_stop] Y_ = Y[:train_stop] # have the same 10% holdout as the previous example X_t = X[len(X) - test_stop:] Y_t = Y[len(X) - test_stop:] # plot the first 3 PCA dimensions of the sampled data fig = plt.figure(1, figsize=(8, 6)) ax = Axes3D(fig, elev=-150, azim=110) X_reduced = PCA(n_components=3).fit_transform(X_) ax.scatter(X_reduced[:, 0], X_reduced[:, 1], X_reduced[:, 2], c=Y_.ravel(), cmap=plt.cm.Set1, edgecolor='k', s=40) ax.set_title("First three PCA directions") ax.set_xlabel("1st eigenvector") ax.w_xaxis.set_ticklabels([]) ax.set_ylabel("2nd eigenvector") ax.w_yaxis.set_ticklabels([]) ax.set_zlabel("3rd eigenvector") ax.w_zaxis.set_ticklabels([]) plt.show() from sklearn.gaussian_process.kernels import RBF kernel = 1.0 * RBF(1.0) gpc = GaussianProcessClassifier(kernel=kernel, multi_class = 'one_vs_one', random_state=0).fit(X_, Y_) # lets see how good our fit on the train set is print(gpc.score(X_, Y_)) # create the TF neural net # some hyperparams training_epochs = 200 n_neurons_in_h1 = 10 n_neurons_in_h2 = 10 learning_rate = 0.01 dkl_loss_rate = 0.1 n_features = len(X[0]) labels_dim = 1 ############################################# # these placeholders serve as our input tensors x = tf.placeholder(tf.float32, [None, n_features], name='input') y = tf.placeholder(tf.float32, [None, labels_dim], name='labels') # input tensor for our reference model predictions y_g = tf.placeholder(tf.float32, [None, labels_dim], name='labels') # TF Variables are our neural net parameter tensors, we initialize them to random (gaussian) values in # Layer1. Variables are allowed to be persistent across training epochs and updatable bt TF operations W1 = tf.Variable(tf.truncated_normal([n_features, n_neurons_in_h1], mean=0, stddev=1 / np.sqrt(n_features)), name='weights1') b1 = tf.Variable(tf.truncated_normal([n_neurons_in_h1], mean=0, stddev=1 / np.sqrt(n_features)), name='biases1') # note the output tensor of the 1st layer is the activation applied to a # linear transform of the layer 1 parameter tensors # the matmul operation calculates the dot product between the tensors y1 = tf.sigmoid((tf.matmul(x, W1) + b1), name='activationLayer1') # network parameters(weights and biases) are set and initialized (Layer2) W2 = tf.Variable(tf.random_normal([n_neurons_in_h1, n_neurons_in_h2], mean=0, stddev=1), name='weights2') b2 = tf.Variable(tf.random_normal([n_neurons_in_h2], mean=0, stddev=1), name='biases2') # activation function(sigmoid) y2 = tf.sigmoid((tf.matmul(y1, W2) + b2), name='activationLayer2') # output layer weights and biases Wo = tf.Variable(tf.random_normal([n_neurons_in_h2, labels_dim], mean=0, stddev=1 ), name='weightsOut') bo = tf.Variable(tf.random_normal([labels_dim], mean=0, stddev=1), name='biasesOut') # the sigmoid (binary softmax) activation is absorbed into TF's sigmoid_cross_entropy_with_logits loss logits = (tf.matmul(y2, Wo) + bo) loss_1 = tf.nn.sigmoid_cross_entropy_with_logits(labels = y, logits = logits) # tap a separate output that applies softmax activation to the output layer # for training accuracy readout a = tf.nn.sigmoid(logits, name='activationOutputLayer') # here's the KL-Div loss, note the inputs are softmax distributions, not raw logits def kl_divergence(p, q): return tf.reduce_sum(p * tf.log(p/q)) loss_2 = kl_divergence(a, y_g) # combined loss, since the DKL loss can be negative, reverse its sign when negative # basically an abs() but the demonstration is on how to use tf.cond() to check tensor values loss_2 = tf.cond(loss_2 < 0, lambda: -1 * loss_2, lambda: loss_2) # can also normalize the losses for stability but not done in this case norm = 1 #tf.reduce_sum(loss_1 + loss_2) loss = loss_1 / norm + dkl_loss_rate*loss_2 / norm # optimizer used to compute gradient of loss and apply the parameter updates. # the train_step object returned is ran by a TF Session to train the net train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss) # prediction accuracy # compare predicted value from network with the expected value/target correct_prediction = tf.equal(tf.round(a), y) # accuracy determination accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="Accuracy") ############################################# # ***NOTE global_variables_initializer() must be called before creating a tf.Session()!*** init_op = tf.global_variables_initializer() # create a session for training and feedforward (prediction). Sessions are TF's way to run # feed data to placeholders and variables, obtain outputs and update neural net parameters with tf.Session() as sess: # ***initialization of all variables... NOTE this must be done before running any further sessions!*** sess.run(init_op) # training loop over the number of epochs batch_size = 5 batches = int(len(X_) / batch_size) for epoch in range(training_epochs): losses = 0 dkl_losses = 0 accs = 0 for j in range(batches): idx = np.random.randint(X_.shape[0], size=batch_size) X_b = X_[idx] Y_b = Y_[idx] # get the GPC predictions... and slice only the positive class probabilities Y_g = gpc.predict_proba(X_b)[:,1].reshape((-1,1)) # train the network, note the dictionary of inputs and labels sess.run(train_step, feed_dict={x: X_b, y: Y_b, y_g: Y_g}) # feedforwad the same data and labels, but grab the accuracy and loss as outputs acc, l, soft_max_a, l_2 = sess.run([accuracy, loss, a, loss_2], feed_dict={x: X_b, y: Y_b, y_g: Y_g}) losses = losses + np.sum(l) accs = accs + np.sum(acc) dkl_losses = dkl_losses + np.sum(l_2) print("Epoch %.8d " % epoch, "avg train loss over", batches, " batches ", "%.4f" % (losses/batches), "DKL loss %.4f " % (dkl_losses/batches), "avg train acc ", "%.4f" % (accs/batches)) # test on the holdout set Y_g = gpc.predict_proba(X_t)[:, 1].reshape((-1,1)) acc, l, soft_max_a = sess.run([accuracy, loss, a], feed_dict={x: X_t, y: Y_t, y_g: Y_g}) print("Epoch %.8d " % epoch, "test loss %.4f" % np.sum(l), "DKL loss %.4f " % dkl_losses, "test acc %.4f" % acc) print(soft_max_a)
仅有5个训练样本,上述示例收敛得更快,并且比DKL损失设置为零的网络(即没有转移学习)提供更好的原始测试精度。注意,未考虑测试集的不平衡! 读者应该在准确度读数中添加F1分数。值得庆幸的是,我们的数据集伪随机样本给出了2:3的正类和负类的划分。
还值得注意的是,从holdout:100个示例中测试的迁移学习模型的softmax输出:
与非迁移学习模型相对应
迁移学习模型的预测反映了在有限的训练信息下的不确定性。
结论
我们看到了如何在Tensorflow中从头实现神经网络,如何将张量运算与损失函数相结合,并讨论了迁移学习的一个有趣的应用。一般来说,Tensorflow对于数据科学家在高级或监督机器学习应用程序中更加灵活。