如何与Keras和Apache Spark并行训练您的神经网络
Randomly generated computer hexadecimal machine
Apache Spark上的集群计算
在大多数现实世界的机器学习场景中,数据本身是从物联网传感器或多媒体平台实时生成的,并使用HDFS,ObjectStore,NoSQL或SQL数据库的云解决方案以适当的格式存储。快速,强大和可靠的机器学习工作流程可以利用大型计算集群,无需费力地编辑代码并使其适应并行处理。这就是着名的Hadoop分布式文件系统的秘诀,它允许您将所有磁盘的组合存储容量检索为一个大型虚拟文件系统。
HDFS基本上做的是将数据划分为相同大小的块并将它们分布在物理磁盘上,同时在这些块之上创建虚拟视图,以便可以将其视为跨越整个集群的单个大型文件。该技术的一个主要优点来自数据局部性的概念。由于HDFS跟踪文件的各个块的位置,因此可以使用驻留在同一物理工作节点上的CPU或GPU并行地执行计算。
你有一个很好的计算集群,有256个节点,每个节点8个CPU,每个CPU 16个CPU核心和每个核心4个超线程。并发运行并行线程的最大可能数量是多少?
答案:131 072同时运行并行线程,每个线程都在做你的工作!(= 256个节点*每个节点8个CPU *每个CPU 16个CPU核心*每个核心4个超线程)。因此,Apache spark以这种方式提供了一个开源的分布式通用集群计算框架,允许您操作数据并并行执行计算。
弹性
ApacheSpark中的主要数据抽象是弹性分布式数据集(RDD)。这是一个分布式不可变集合或列表数据,可以使用字符串或双精度值编写,可以使用各种数据存储解决方案,从开源的MongoDB数据库,到更高级的SQL和NoSQL解决方案。您甚至可以从本地文件系统创建RDD。在创建之后,RDD驻留在不同工作节点的主存储器中。最后,你会发现RDD非常懒惰。这意味着只有在确实需要执行某个计算时,才会从底层存储系统读取数据。默认情况下,您应用于data-frame的任何转换(例如删除变量或归一化特征)都不会立即执行。相反,您的选择会被记住,并且仅在操作需要时才会计算 结果将返回到驱动程序。事实证明,这并不是一个坏主意,并且为真正的大提升节省了相当多的计算能力。
Apache Spark的三个特性,使其成为强大的数据科学和机器学习工具::
1.结构化数据检索:Spark SQL
无数的数据科学家,分析师和商业智能用户依靠交互式SQL查询来探索数据。值得庆幸的是,Spark很清楚这一点,并附带了用于结构化数据处理的Spark模块,称为Spark SQL。它提供了我们都非常熟悉的编程抽象,即DataFrame。Spark SQL还可以充当分布式SQL查询引擎,并使未修改的Hadoop Hive查询在现有部署和数据上运行速度提高100倍。它还提供与Spark生态系统其余部分的强大集成(例如,将SQL查询处理与机器学习集成)。
Spark开源生态系统
2.机器学习:MLlib
机器学习很快成为挖掘大数据以获取可操作见解的关键部分。MLlib构建于Spark 之上,是一个可扩展的机器学习库,可提供高质量的算法和“超快”的速度,如官方所说(比MapReduce快100倍)。该库可作为Spark应用程序的一部分在Java,Scala和Python中使用,因此您可以将其包含在完整的工作流程中。
最棒的是,Apache SystemML为使用大到大数据的机器学习提供了一个最佳工作场所,因为它不仅提供了使用广泛定制算法的方法,而且还允许您使用一些很好的预先实现的算法(如Gradient Boosted trees,K-Nearest Neighbors等)。它与Keras和Caffe等各种突出的深度学习框架可以相连接。
3. Streaming Analytics:Spark Streaming
如今许多应用程序不仅需要能够处理和分析批量数据,还需要实时处理和分析新数据流。Spark Streaming运行在Spark之上,可以跨流和历史数据实现强大的交互式和分析式应用程序,同时继承Spark的易用性和容错特性。它可以与各种流行的数据源集成,包括HDFS,Flume,Kafka和Twitter。
使用SystemML缩放机器学习
SystemML,这是大数据世界七大奇迹之一。这种灵活的机器学习系统能够自动扩展到Spark和Hadoop集群。实际上,根据数据大小,稀疏性,计算群集大小以及本地计算机的内存配置,SystemML将决定是编译单个节点计划,还是编译Hadoop或Spark计划。它带有一种类似R的函数编程语言,称为Declarative Machine Learning,它允许您实现首选的机器学习算法,或者从头开始设计自定义的算法。据说SystemML有一个免费的编译器,可以自动生成混合运行时执行计划,它由单个节点和分布式操作组成。它可以在Apache Spark之上运行,它可以逐行自动扩展数据,确定您的代码是应该在驱动程序还是Apache Spark集群上运行。
在SystemML上实现深度学习模型
在SystemML中实现深度学习模型有三种不同的方法:
- 使用DML身体的NN库:该库允许用户充分利用DML语言的灵活性来实现您的神经网络。
- 使用Caffe2DML API:此API允许将以Caffe原型格式表示的模型导入SystemML。此API 不需要在您的SystemML上安装Caffe。
- **使用Keras2DML API:此API允许将Keras API中表示的模型导入SystemML。但是,此API要求在您的驱动程序上安装Keras。**
SystemML开发包括使用GPU功能的额外深度学习,例如导入和运行神经网络架构以及用于训练的预训练模型。本文的以下部分将介绍如何使用MLContext API序列化和训练Keras模型。
IBM Watson Studio上的Apache Spark
现在,我们将最终使用实验性的Keras2DML API来训练我们的Keras模型。要执行以下代码,您需要在IBM cloud帐户上创建一个免费的层帐户并登录以激活Watson studio。
一旦您有了一个带有active Spark计划的Watson studio帐户,您就可以在平台上创建一个Jupyter笔记本,选择一个云机配置(cpu和RAM的数量)和一个Spark计划,然后就可以开始了!
Watson studio提供了一个免费的spark plan,其中包括2名spark工作人员。虽然这足以用于现在的演示目的,但对于现实世界的场景,强烈建议获得付费的Spark plan。
SystemML上的手写数字识别
MNIST数据集上的卷积神经网络
1.我们首先导入一些库:
import tensorflow as tf import keras from keras.models import Sequential from keras.layers import Input, Dense, Conv1D, Conv2D, MaxPooling2D, Dropout,Flatten from keras import backend as K from keras.models import Model import numpy as np import matplotlib.pyplot as plt
Randomly generated computer hexadecimal machine
2.加载数据,Python代码如下:
我们现在可以使用下面这段简单的代码加载来自Keras的MNIST数据集。
from keras.datasets import mnist (X_train, y_train), (X_test, y_test) = mnist.load_data() # Expect to see a numpy n-dimentional array of (60000, 28, 28) type(X_train), X_train.shape, type(X_train)
Randomly generated computer hexadecimal machine
3.Shape数据
在这里,我们做一些reshape最适合我们的神经网络。我们将每个28x28的图像重新排列成一个784像素的向量。
#This time however, we flatten each of our 28 X 28 images to a vector of 1, 784 X_train = X_train.reshape(-1, 784) X_test = X_test.reshape(-1, 784) # expect to see a numpy n-dimentional array of : (60000, 784) for Traning Data shape # and (10000, 784) for Test Data shape type(X_train), X_train.shape, X_test.shape
Randomly generated computer hexadecimal machine
4.归一化您的数据
然后我们使用Scikit-Learn的MinMaxScaler来归一化像素数据,通常范围为0-255。归一化后,值的范围为0-1,这大大改善了结果。
#We also use sklearn's MinMaxScaler for normalizing from sklearn.preprocessing import MinMaxScaler def scaleData(data): # normalize features scaler = MinMaxScaler(feature_range=(0, 1)) return scaler.fit_transform(data) X_train = scaleData(X_train) X_test = scaleData(X_test)
Randomly generated computer hexadecimal machine
5.构建神经网络模型
接下来,我们使用Keras构建我们的神经网络,定义适当的输入形状,然后堆叠一些Convolutional,Max Pooling,Dense和dropout层,如下所示。(一些神经网络基础:确保你的最后一层与你的输出类具有相同数量的神经元。由于我们预测手写数字,范围从0到9,所以我们在这里的最后一层是由10个神经元组成的Dense层),Python代码如下:
# We define the same Keras model as earlier input_shape = (1,28,28) if K.image_data_format() == 'channels_first' else (28,28, 1) keras_model = Sequential() keras_model.add(Conv2D(32, kernel_size=(5, 5), activation='relu', input_shape=input_shape, padding='same')) keras_model.add(MaxPooling2D(pool_size=(2, 2))) keras_model.add(Conv2D(64, (5, 5), activation='relu', padding='same')) keras_model.add(MaxPooling2D(pool_size=(2, 2))) keras_model.add(Flatten()) keras_model.add(Dense(512, activation='relu')) keras_model.add(Dropout(0.5)) keras_model.add(Dense(10, activation='softmax')) keras_model.summary()
Randomly generated computer hexadecimal machine
你可以看到下面这个Keras模型的摘要
Randomly generated computer hexadecimal machine
6.创建SystemML模型
使用Keras2DML wrapper,
把我们刚建好的Keras网络提供给它。这是通过调用Keras2DML方法并将您的spark会话、Keras模型、输入形状和预定义变量提供给它来完成的。变量“epoch”表示算法遍历数据的次数。接下来,我们有“batch_size”,它表示我们的网络将在每个学习批中看到的训练示例的数量。最后,“sample”只是对我们训练集中的sample的数量进行编码。我们还要求每10次迭代显示一次训练结果。
然后,我们在新定义的SystemML模型上使用fit参数,并将训练数组和标签传递给它,以启动我们的训练会话。
# Import the Keras to DML wrapper and define some basic variables from systemml.mllearn import Keras2DML epochs = 5 batch_size = 100 samples = 60000 max_iter = int(epochs*math.ceil(samples/batch_size)) # Now create a SystemML model by calling the Keras2DML method and feeding it your spark session, Keras model, its input shape, and the # predefined variables. We also ask to be displayed the traning results every 10 iterations. sysml_model = Keras2DML(spark, keras_model, input_shape=(1,28,28), weights='weights_dir', batch_size=batch_size, max_iter=max_iter, test_interval=0, display=10) # Initiate traning. More spark workers and better machine configuration means faster training! sysml_model.fit(X_train, y_train)
Randomly generated computer hexadecimal machine
现在,您应该会看到屏幕上显示的内容:
Randomly generated computer hexadecimal machine
7.score
我们通过简单地调用经过训练的SystemML模型的score参数来实现此目的,如下所示:
sysml_model.score(X_test, y_test)
等待spark 作业执行,然后,你应该看到你在测试集上的准确性出现了。正如你在下面看到的,我们已经实现了98.76中的一个,也不算太差。
Randomly generated computer hexadecimal machine
请注意,我们能够通过SystemML的Keras2DML wrapper部署Keras模型,该wrapper基本上将您的神经网络模型序列化为Caffe模型,然后将该模型转换为declarative 机器学习脚本。如果您选择使用Keras训练它,那么相同的Keras模型将受到单个JVM资源的约束,而不会显着调整您的代码以进行并行处理。您现在可以在本地GPU上训练您的神经网络,或者在Watson studio那样使用云计算机。