如何在Apache Pyspark中运行Scikit-learn模型

如何在Apache Pyspark中运行Scikit-learn模型

在本文中,我们将了解如何在Apache Pyspark中运行Scikit-learn模型,并讨论有关每个步骤的细节。如果您已经准备好了机器学习模型,则可以直接跳到“ pyspark wrapper”部分,也可以通过以下步骤创建一个简单的scikit learn机器学习模型。

scikit learn机器学习模型:

我们使用Python创建一个简单的机器学习模型:

import numpy as npimport matplotlib.pyplot as pltfrom sklearn.linear_model import LogisticRegressionfrom sklearn import datasets# 导入数据iris = datasets.load_iris()# 我们只选择两个特征X = iris.data[:, :2]  Y = iris.targetlogreg = LogisticRegression(C=1e5)# 创建一个逻辑回归分类器实例并对数据进行拟合logreg.fit(X, Y)#进行预测predict(X[0:1])

如何在Apache Pyspark中运行Scikit-learn模型

将机器学习模型保存到磁盘

import picklepickle.dump(logreg, open( "model save path", "wb" ) )

如何在Apache Pyspark中运行Scikit-learn模型

pyspark wrapper

让我们考虑一下pyspark dataframe (df)中提供的运行预测所需的特征

如何在Apache Pyspark中运行Scikit-learn模型

创建一个python函数,该函数接受这四个特性作为参数,并将预测的分数作为输出进行返回

def predictor(s_l, s_w, p_l, p_w):    #open picked model    serialized_model = open("model save path", "rb")    model = pickle.load(serialized_model)    serialized_model.close()    #call predict method for model    return model.predict([s_l, s_w, p_l, p_w])

如何在Apache Pyspark中运行Scikit-learn模型

将python函数转换为pyspark UDF。这里有两种方式:

1、简单地将python函数注册为pyspark的UDF

from pyspark.sql.functions import udffrom pyspark.sql.types import FloatTypeudf_predictor = udf(predictor, FloatType())#apply the udf to dataframedf_prediction = df.withColumn("prediction",                                udf_predictor(df.sepal_length                                                  ,df.sepal_width                                              , df.petal_length                                              , df.petal_width))

如何在Apache Pyspark中运行Scikit-learn模型

这是最简单的选项,但是每行的每个pyspark执行程序都将调用python函数(预测变量),如果你有一个像我们这样的非常小的机器学习模型是可以的,但是如果你有一个大的序列化机器学习模型(> 100mb),从磁盘一次又一次地打开和读取模型是非常低效的。

2、将模型广播给spark执行者并进行预测

为了消除1中的低效率,此技术允许从磁盘读取一次模型并将模型发送到所有spark执行程序。

#open picked modelserialized_model = open("model save path", "rb")model = pickle.load(serialized_model)serialized_model.close()#broadcast model to spark executors using spark context(sc)sc.broadcast(model)#update prediction methoddef predictor(s_l, s_w, p_l, p_w):    #call predict method for model    return model.predict([s_l, s_w, p_l, p_w])#register python method as spark UDF and call over dataframefrom pyspark.sql.functions import udffrom pyspark.sql.types import FloatTypeudf_predictor = udf(predictor, FloatType())#apply the udf to dataframedf_prediction = df.withColumn("prediction",                                udf_predictor(df.sepal_length                                                  ,df.sepal_width                                              , df.petal_length                                              , df.petal_width))

如何在Apache Pyspark中运行Scikit-learn模型

相关推荐