如何在Python中使用Dask Dataframes进行并行数据分析
有时你用Python的Pandas打开一个大数据集,尝试获得一些指标,整个事情会变的很糟糕。
如果您使用大数据(用Pandas),那么您可以等待一分钟来获得一个简单的平均值。这只是几百万行!当数据达到数亿行时,你最好开始使用Spark或其他东西。
什么是Dask?
Dask是一个开源项目,它为您提供NumPy Arrays,Pandas Dataframes和常规列表的abstractions,允许您使用多核处理并行运行它们。
Dask提供模仿NumPy,lists和Pandas的高级Array,Bag和DataFrame集合,但可以在不适合主内存的数据集上并行运行。Dask的高级集合是大型数据集的NumPy和Pandas的替代品。
我开始为这篇文章尝试Dask Dataframes,并对它们进行了几个基准测试。
阅读文档
我首先做的是阅读官方文档,看看在Dask中建议做什么。以下是官方文档中的相关部分:
- 操纵大型数据集,即使这些数据集不适合内存
- 通过使用多核心来加速计算
- 使用标准Pandas操作(如groupby,join和时间序列计算)对大型数据集进行分布式计算
然后在下面,它列出了一些在使用Dask Dataframes时非常快的东西:
- 算术运算(multiplying or adding to a Series)
- 常见聚合(平均值,最小值,最大值,总和等)
- 调用apply(as long as it’s along the index -that is, not after a groupby(‘y’) where ‘y’ is not the index-)
- 调用value_counts(),drop_duplicates()或corr()
- 使用loc,isin和行方式选择进行过滤
Python示例代码
#returns only the rows where x is >5, by reference (writing on them alters original df)
df2 = df.loc[df['x'] > 5]
#returns only the rows where x is 0,1,2,3 or 4, by reference
df3 = df.x.isin(range(4))
#returns only the rows where x is >5, by read-only reference (can't be written on)
df4 = df[df['x']>5]
使用Dask Dataframes
Dask dataframe和panda dataframe有相同的API,只是聚合和applys被延迟计算,需要通过调用compute方法来计算。为了生成Dask Dataframe,您可以简单地调用read_csv方法,就像在panda中那样,或者,给定一个panda Dataframe df,您可以调用
dd = ddf.from_pandas(df, npartitions=N)
其中ddf是您导入Dask Dataframes的名称,而npartitions是一个参数,告诉Dataframe如何对其进行分区。
根据StackOverflow的说法,建议将Dataframe划分为与计算机所拥有的核心数量相同的分区,或者是该数量的几倍,因为每个分区将在不同的线程上运行。
测试
基准测试可以在Github上找到(https://github.com/StrikingLoo/dask-dataframe-benchmarking),但主要有(Python实现):
def get_big_mean():
return dfn.salary.mean().compute()
def get_big_mean_old():
return df3.salary.mean()
def get_big_max():
return dfn.salary.max().compute()
def get_big_max_old():
return df3.salary.max()
def get_big_sum():
return dfn.salary.sum().compute()
def get_big_sum_old():
return df3.salary.sum()
def filter_df():
df = dfn[dfn['salary']>5000]
def filter_df_old():
df = df3[df3['salary']>5000]
这里df3是一个普通的Pandas Dataframe,拥有2500万行。我拿了50行数据集并连接了500000次。
importimport dask.dataframedask.da as ddf
import time
import pandas as pd
dfo = pd.read_csv('random_people.csv')
dfodfo[['bonus''bonus'] = dfo['salary']*.5
df2 = pd.concat([dfo for _ in range(1000)])
df3 = pd.concat([df2 for _ in range(500)])
dfn = ddf.from_pandas(df3, npartitions=8)
dfn就是基于df3的Dask Dataframe 。
第一批结果:不太乐观
我在Dask上的测试结果很糟糕,我不得不等待很长时间才能得到结果,但我担心可能是因为我做的分区太少了:
204.313940048 seconds for get_big_mean
39.7543280125 seconds for get_big_mean_old
131.600986004 seconds for get_big_max
43.7621600628 seconds for get_big_max_old
120.027213097 seconds for get_big_sum
7.49701309204 seconds for get_big_sum_old
0.581165790558 seconds for filter_df
226.700095892 seconds for filter_df_old
您可以看到,当我使用Dask时,大多数操作变得非常慢。这提示我可能需要使用更多的分区。生成惰性评估所花费的时间也可以忽略不计(在某些情况下不到半秒)。
我也用apply method尝试了这个测试:
def f(x):
return (13*x+5)%7
def apply_random_old():
df3['random']= df3['salary'].apply(f)
def apply_random():
dfn['random']= dfn['salary'].apply(f).compute()
并有非常相似的结果:
369.541605949 seconds for apply_random
157.643756866 seconds for apply_random_old
尝试更多分区
在这些令人沮丧的结果之后,我决定我可能只是没有使用足够的分区。我尝试了8个分区的相同测试,这就是我得到的(我省略了非并行数据帧的结果,因为它们基本相同):
3.08352184296 seconds for get_big_mean
1.3314101696 seconds for get_big_max
1.21639800072 seconds for get_big_sum
0.228978157043 seconds for filter_df
112.135010004 seconds for apply_random
50.2007009983 seconds for value_count_test
那就对了!大多数操作的运行速度比常规Dataframe快十倍,甚至申请速度也更快!我还运行了value_count测试,它只调用salary系列上的value_count方法。对于上下文,请记住,在经过十分钟的等待后,我在常规Dataframe上运行此测试时必须终止该过程。这次只用了50秒! 所以基本上我只是使用错误的工具,而且它非常快。比常规Dataframe快很多。