EMR Spark Runtime Filter性能优化
背景
Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。
目前在SparkSQL中有Filter下推优化,包括两个维度:
生成Filter
SparkSQL会从用户的SQL语句中获取到Filter
直接显示获取
select * from A where a=1
生成Filter(a=1) on A
隐式推断
select * from A, B where A.a = B.b and A.a=1
推断出Filter(b=1) on B
Filter优化
利用生成的Filter算子可以优化,比如:
- 将Filter尽量下推到靠近DataSource端
- 如果Filter中的列是分区列,可以提前对DataSource进行分区裁剪,只扫描需要的分区数据
Runtime Filter
是针对Equi-Join场景提出的一种新的生成Filter的方式,通过动态获取Filter内容来做相关优化。
Runtime Filter原理
优化对象
Equi Join, 形如
select x,y from A join B on A.a = B.b
其中A是一个小表(如维表),B是一个大表(如事实表)
备注: A/B也可以是一个简单的子查询
优化思路
如上述小表A和大表B进行Join,Join条件为A.a=B.b,实际Join过程中需要对大表进行全表扫描才能完成Join操作,极端情况下如A.a仅仅只有一条记录,也需要对B表全表扫描,影响性能。
如果在B表扫描之前,能获取A表的a的相关信息(如所有的a值,或者a的min/max/Bloomfilter等统计信息),并在实际执行Join之前将这些信息对B表的数据进行过滤,而不是全表扫描,可以大大提高性能。
两种场景
根据大表B参与join的key(b)的属性,可以分别采集小表A参与join的key(a)的信息:
b是分区列
如上b为大表B的一个分区列,则可以提前收集A.a列的所有值
,然后利用A.a的值对B表的b列进行分区裁剪
b不是分区列
不能做分区裁剪,只能在实际数据扫描的过程中进行过滤。可以提前收集A.a列的min/max/Bloomfilter的统计信息
,然后利用这些统计信息对B表进行数据过滤,这个过滤又可以分成两种粒度:
- 可下推到存储层,减少数据扫描
如底层文件格式是Parquet/ORC, 可以将相关过滤谓词(min/max等)下推到存储层面,从而减少实际扫描的数据。 - 扫描后数据过滤
不能下推到存储层的,可以在数据被扫描后做条件过滤,减少后续参与计算的数据量(如shuffle/join等)
Runtime Filter实现
Runtime Filter的实现主要在Catalyst中,分为4个步骤:
谓词合成
在用户SQL生成的逻辑执行计划树(logical plan)中,寻找满足条件的Equi-Join节点,然后根据上面的思路,在Join的大表B侧插入一个新的Filter节点,如Filter(In(b, Seq(DynamicValue(a, A))), B)
谓词下推
上面生成的新的Filter会经过PushDownPredicate的Rule,尽量下推靠近DataSource附近
物理执行计划生成
该阶段会将上面下推的Filter(In(b, Seq(DynamicValue(a, A))), B)
转换成物理节点(FilterExec),根据上面两种场景会生成两种不同的FilterExec
b是分区列
b是分区列,采集的是a列的所有值,如:case class DynamicPartitionPruneFilterExec( child: SparkPlan, collectors: Seq[(Expression, SparkPlan)]) extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
其中colletors就是用于采集信息的SparkPlan,因为要跑一个SQL来采集a列的所有值(select a from A group by a
);
因为有可能会有多个分区列,所以这个地方是一个Seq.
b是非分区列
b是非分区列,采集的是a列的min/max/bloomfilter统计信息,如case class DynamicMinMaxFilterExec( child: SparkPlan, collectors: Seq[(Expression, SparkPlan)]) extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
同理上面collectors也是用户采集信息的SparkPlan,如select min(a),max(a) from A
执行
在物理执行计划实际执行的过程中,会在DynamicPartitionPruneFilterExec/DynamicMinMaxFilterExec物理算子内先执行collectors获取到a列的相关信息,然后对底层B的执行计划进行改写,比如利用采集到的信息做分区裁剪/数据过滤等。
Runtime Filter性能测试
以TPC-DS 10TB的Query54为例:
Runtime Filter 关闭
Runtime Filter 打开
经过DynamicPartitionPruneFilter对catalog_sales的分区进行了裁剪,实际对表的扫描从14,327,953,968减少到136,107,053,然后经过min/max的过滤继续减少到135,564,763;另外Runtime Filter减少了大表的扫描,shuffle的数据量以及参加Join的数据量,所以对整个集群IO/网络/CPU有比较大的节省
总结
针对Equi-Join的场景,可以额外的采集小表侧的信息,然后在Join之前对大表进行分区裁剪或者扫描后过滤,从而提高查询性能,减少资源消耗。
本文作者:寒沙牧
本文为云栖社区原创内容,未经允许不得转载。
相关推荐
hive运行在hadoop基础上。选择一个hadoop服务器、安装hadoop。connect jdbc:hive2://<host>:<port>/<db>;auth=noSasl root 123