用PySpark进行情感分析
Python对于数据科学建模非常有用,这要归功于其众多的模块和包来帮助实现数据科学目标。但是如果你正在处理的数据不能适合单台机器呢?也许你可以在一台机器上实现谨慎的抽样以进行分析,但使用像PySpark这样的分布式计算框架,可以高效地实现大型数据集的任务。
Spark API有多种编程语言(Scala,Java,Python和R)。
Spark通过其API提供三种不同的数据结构:RDD,Dataframe(这与Pandas数据框不同),Dataset。对于这篇文章,我将使用Dataframe以及相应的机器学习库SparkML。我首先根据Analytics Vidhya中的帖子提供的建议,决定要使用的数据结构。“Dataframe比RDD快得多,因为它具有与其关联的元数据(关于数据的一些信息),这使Spark可以优化查询计划。”
然后我发现如果我想处理Dataframe,我需要使用SparkML来代替SparkMLLib。SparkMLLib与RDD一起使用,而SparkML支持Dataframe。
为了在Jupyter Notebook中使用PySpark,您应该配置PySpark驱动程序或使用名为Findspark的包在Jupyter Notebook中提供Spark Context。您可以在命令行上通过“pip install findspark”轻松安装Findspark。我们首先加载一些我们需要的基本依赖关系。
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext
任何Apache编程的第一步是创建一个SparkContext。我们想要在集群中执行操作时需要SparkContext。SparkContext告诉Spark如何以及在哪里访问集群。这是连接Apache Cluster的第一步。
try:
# create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
sc = ps.SparkContext('local[4]')
sqlContext = SQLContext(sc)
print("Just created a SparkContext")
except ValueError:
warnings.warn("SparkContext already exists in this scope")
我将用于这篇文章的数据集是来自“ Sentiment140 ”的Tweets注释。
df = sqlContext.read.format('com.databricks.spark.csv')。options(header ='true',inferschema ='true')。load('project-capstone / Twitter_sentiment_analysis / clean_tweet.csv')
type DF)
df.show(5)
df = df.dropna()
df.count()
将数据成功加载为Spark Dataframe后,我们可以通过调用.show()来查看数据,这相当于Pandas .head()。在放弃NA之后,我们有不到160万个推文。我将分成三部分; 培训,验证,测试。由于我有大约160万个参赛作品,每个1%的验证和测试集足以测试模型。
(train_set,val_set,test_set)= df.randomSplit([0.98,0.01,0.01],seed = 2000)
HashingTF + IDF + Logistic回归
通过我之前对Pandas和Scikit-Learn进行情感分析的尝试,我了解到具有Logistic回归的TF-IDF具有相当强的组合性,并且表现出强大的性能,高达Word2Vec +卷积神经网络模型。所以在这篇文章中,我将尝试用PySpark实现TF-IDF + Logistic回归模型。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])
pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)
0.86!这看起来不错,也许太好了。因为我已经尝试了与Pandas和SKLearn中的相同数据相同的技术组合,我知道具有逻辑回归的单元TF-IDF的结果准确度约为80%。由于详细的模型参数,可能会有一些细微的差别,但是,这看起来太好了。
通过查看Spark文档,我意识到BinaryClassificationEvaluator评估的是默认的areaUnderROC。
对于二进制分类,Spark不支持作为度量标准的精度。但是我仍然可以通过计算匹配标签的预测数量并将其除以总条目来计算准确性。
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy
现在看起来似乎更合理,实际上,精确度略低于SKLearn的结果。
CountVectorizer + IDF + Logistic回归
还有另一种方法可以获得IDF(逆文档频率)计算的词频。它是SparkML中的CountVectorizer。除了特征(词汇表)的可逆性之外,每个特征如何过滤顶级特征还有一个重要的区别。在HashingTF的情况下,降低了可能的冲突的维数。CountVectorizer丢弃不常用的令牌。
让我们看看如果我们使用CountVectorizer而不是HashingTF,性能是否会发生变化。
%%time
from pyspark.ml.feature import CountVectorizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])
pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)
print "Accuracy Score: {0:.4f}".format(accuracy)
print "ROC-AUC: {0:.4f}".format(roc_auc)
它看起来像使用CountVectorizer有点改进了性能。
N-gram实现
在Scikit-Learn中,n-gram的实现相当简单。当您调用TfIdf Vectorizer时,您可以定义一系列的n元组。但是对于Spark来说,这有点复杂。它不会自动合并来自不同n-gram的特征,因此我必须在流水线中使用VectorAssembler来合并我从每个n-gram获得的特征。
我首先尝试从unigram,bigram,trigram中提取大约16,000个特征。这意味着我将总共获得约48,000个功能。然后,我实施了Chi-Squared功能选择,将功能数量减少到16,000个。
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector
def build_trigrams(inputCol=["text","target"], n=3):
tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
ngrams = [
NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
for i in range(1, n + 1)
]
cv = [
CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
outputCol="{0}_tf".format(i))
for i in range(1, n + 1)
]
idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]
assembler = [VectorAssembler(
inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
outputCol="rawFeatures"
)]
label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
lr = [LogisticRegression(maxIter=100)]
return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+lr)
现在我准备好运行我上面定义的函数了。
%%time
trigram_pipelineFit = build_trigrams().fit(train_set)
predictions = trigram_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(dev_set.count())
roc_auc = evaluator.evaluate(predictions)
# print accuracy, roc_auc
print "Accuracy Score: {0:.4f}".format(accuracy)
print "ROC-AUC: {0:.4f}".format(roc_auc)
准确性有所提高,但正如您可能已经注意到的那样,拟合模型需要4个小时!这主要是因为ChiSqSelector。
如果我最初从unigram,bigram,trigram中提取5,460个特征,最终总共有大约16,000个特征,如果没有Chi Squared特征选择,该怎么办?
def build_ngrams_wocs(inputCol=["text","target"], n=3):
tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
ngrams = [
NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
for i in range(1, n + 1)
]
cv = [
CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
outputCol="{0}_tf".format(i))
for i in range(1, n + 1)
]
idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]
assembler = [VectorAssembler(
inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
outputCol="features"
)]
label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
lr = [LogisticRegression(maxIter=100)]
return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)
%%time
trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = evaluator.evaluate(predictions_wocs)
# print accuracy, roc_auc
print "Accuracy Score: {0:.4f}".format(accuracy_wocs)
print "ROC-AUC: {0:.4f}".format(roc_auc_wocs)
这给了我几乎相同的结果,稍低,但差异在第四位。考虑到没有ChiSqSelector只需要6分钟,我绝对选择没有ChiSqSelector的模型。
最后,让我们在最后的测试集上试试这个模型。
test_predictions = trigramwocs_pipelineFit.transform(test_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count()/ float(test_set.count())
test_roc_auc = evaluateator.evaluate(test_predictions)
#print accuracy,roc_auc
print“Accuracy Score:{0:.4f}”。format(test_accuracy)
print“ROC-AUC:{0:.4f}”。format(test_roc_auc)
最终测试集精度为81.22%,ROC-AUC为0.8862。
通过这篇文章,我已经用PySpark实现了一个简单的情感分析模型。