创业还靠转“锦鲤”?学会Spark,你就是锦鲤
点击上方关注,All in AI中国
作者——Ben Weber
这是我的创业数据科学系列的第三部分,在这一期我们主要关注的是Python。
前两篇文章链接:
数据科学的初创公司介绍(链接:https://towardsdatascience.com/data-science-for-startups-introduction-80d022a18aec )
数据科学的初创企业:R->Python(链接:https://towardsdatascience.com/data-science-for-startups-r-python-2ca2cd149c5c)
Spark是一个很好的工具,可以让数据科学家将研究代码转换为生产代码,而PySpark使这个环境更易于访问。虽然我一直很喜欢用谷歌的云数据流来生产模型,但它缺乏一个交互环境,没能够让原型和部署的数据科学模型更容易上手。Spark对于初创公司来说是一个很好的工具,因为它既提供了执行分析的交互环境,也提供了将模型投入生产的可伸缩性。本文讨论了如何在GCP上旋转一个集群并连接到Jupyter上,以便使用Spark。
备注:GCP指Google Cloud Platform/谷歌云平台。其中,Google Kubernetes Engine 让你可以使用完全托管的 Kubernetes 集群来大规模部署、管理和协调容器。Google App Engine 是一种灵活的"平台即服务",让你可以专心处理代码工作,不必为部署和管理基础架构的运营细节而费心。 提供的虚拟机全都运行在 Google 先进的数据中心和覆盖全球的光纤网络之中。链接:https://cloud.google.com/。
Spark有许多不同的生态系统可供使用,从自托管到供应商选择。以下是我在过去探索过的Spark的一些选项:
1.Apache Ambari + Apache Zeppelin
2.GCP DataProc + Jupyter
3.AWS EMR + SageMaker
4.供应商:DataBricks, Cloudera
除了SageMaker,我一直在处理(以上)所有这些设置。在与大型企业合作时,我更喜欢供应商的解决方案,但初创企业可能希望避免这种选择。最好的方法取决于你需要处理多少数据,你是怎样定位你的公司的,以及你需要多少数据科学家来支持这个基础架构。
本文的目标是展示如何使用PySpark的云解决方案尽可能快地启动和运行(达到让你的公司快速发展的目的)。我决定使用Spark GCP,因为这个平台提供了免费的学习资源,而且使用Jupyter Notebook的环境很简单。下面的帖子是你学习Spark GCP的一个很好的起点:https://cloud.google.com/blog/products/gcp/google-cloud-platform-for-data-scientists-using-jupyter-notebooks-with-apache-spark-on-google-cloud
此示例的完整源代码可在点击GitHub:https://github.com/bgweber/StartupDataScience/blob/master/Spark/PySpark_Natality.ipynb
设置
首先,我们需要建立一个与Jupyter连接的集群。我们将保留大多数默认设置,让它们创建一个具有一个主节点和两个工作节点的集群。我们需要做出改动的是运行位于谷歌存储区的脚本,该脚本为集群进行Jupyter设置(https://console.cloud.google.com/storage/browser/dataproc-initialization-actions/jupyter?prefix=jupyter.sh&project=gameanalytics-199018)。
1.从GCP控制台选择"Hamburger menu",然后选择"DataProc"
2.从"DataProc"中选择"create cluster"
3.分配集群名称:"pyspark"
4.单击"Advanced Options"/即,"高级选项"。然后单击"Add Initialization Option"/即,"添加初始化选项"
5.添加以下命令(如下所示):
g:/ / dataproc-initialization-actions / jupyter / jupyter.sh
6.点击"开始"
添加jupyter初始化步骤
集群需要几分钟的时间才能启动。一旦就绪,状态将从"Provisioning"更改为"Running"。在开始使用Jupyter之前,我们需要为集群设置连接规则。推荐的方法是设置这里描述的SSH隧道。为了快速启动和运行,我们将修改防火墙以接受来自特定IP的连接。
初始化的Spark集群
修改防火墙以接受来自你计算机的连接:
1.单击群集"pyspark"
2.点击"VM Instances"
3.对于"pyspark-m"而言,我们需要点击"View network details"/即,"查看网络详细信息"。
4.从左边选择 "Firewall Rules"/即,"防火墙规则"
5.选择"Create Firewall Rule"/即,"创建防火墙规则"
6.对规则使用以下设置
——名称:jupyter
-目标标签:http-server
-IP范围:v4 IP
- tcp: 8123(脚本将在这个端口上进行Jupyter设置)
7.点击"创建"
现在你应该能够测试到DataProc集群的连接。浏览并返回到"VM Instances",然后单击"pyspark-m"以获得集群的外部IP。然后点击"编辑",启用"允许HTTP流量",然后单击"保存"。为集群添加":8123"外部IP的末尾,并将结果粘贴到浏览器中。你应该会看到如下所示的Jupyter。
为GCP DataProc建立Jupyter笔记本
我们现在已经为PySpark的云部署设置了Jupyter笔记本!
PySpark
一旦设置了Jupyter环境,你就可以通过选择"New"并通过"PySpark"来创建新的Jupyter Notebook。虽然已经安装了matplotlib和numpy等一些常用库,但你可能希望通过pip添加其他库。我们可以这样做:
上面的代码片段在集群中安装了Pandas,以及Pandas的BigQueryConnector和PySpark,我们将使用它们来获得对Spark Context的引用。我们将遵循GCP示例(https://cloud.google.com/blog/products/gcp/google-cloud-platform-for-data-scientists-using-jupyter-notebooks-with-apache-spark-on-google-cloud),把从BigQuery中的数据提取到Panda DataFrame上。此时,我们实际上并没有充分利用Spark的功能,因为使用Panda需要将所有数据加载到驱动节点上的内存中。但这是在我们担心Spark的复杂性之前,快速启动并运行它的一种方法。
下面的代码片段展示了如何在BigQuery上运行查询并将结果拉到驱动节点上的PandasDataFrame中。在处理较大数据集时,我们应该使用BigQueryConnector(https://cloud.google.com/dataproc/docs/concepts/connectors/bigquery)将结果返回为SparkDataFrame,而不是PandasDataFrame。
因为我们以PandasDataFrame的形式返回结果,所以我们可以使用hist函数来绘制不同属性的分布。下图显示了出生体重和母亲年龄的分布。
不同属性的数据集
为了充分利用Spark的分布式计算能力,在使用PySpark时最好避免使用Pandas。这意味着我们最好避免使用toPandas(),而是直接将数据加载到Spark而并非Pandas data frames中。然而,Pandas UDFs(https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)是在Spark环境中与熊猫合作的一种很好的方式,但是调试起来很有挑战性。
首先,我们将展示如何将Pabdas DataFrame转换为Spark DataFrame。关键的区别是Spark data frames 被延迟评估并分布在集群中,这意味着在需要计算结果之前不执行任何操作。在使用Pabdas DataFrame时,所有操作都以一种快速的模式执行,并立即进入内存,即使在后面的步骤中没有使用结果。而使用Spark DataFrame之前,我们首先需要获得对Spark Context的引用,如下面的代码片段所示。一旦获得,我们就可以用它使Pandas DataFrame变为 Spark DataFrame。
虽然许多Spark的操作类似于Panda操作,但是执行流程有很大的不同。上面的示例展示了如何使用show函数执行与head操作类似的结果。Spark的主要优点是:执行是分布式和延迟的,从而产生可伸缩的管道。
结论
Spark是构建数据管道的强大工具,而PySpark使这个"生态系统"更易于访问。虽然使用Spark有多种选择,但本文关注的是如何使用GCP的DataProc和Jupyter初始化脚本以快速访问Spark集群。如果你想高效的使用PySpark的话,你可能需要重新学习如何在Python中执行数据科学的标准方法,虽然结果是大量可伸缩的数据管道和分析。