hive中数据倾斜汇总
数据倾斜是指:map/reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点大很多,从而导致某几个节点迟迟运行不完。
在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在job完成后所得到的Counters是整个job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜的原因造成map处理数据量的差异过大,使得这些平均值能代表的价值降低。hive的执行时分阶段的,map处理数据量的差异取决于一个stage的reduce输出,所以如何将数据均匀的分配到各个reduce中,就是解决数据倾斜的根本所在。
1、数据倾斜的原因
1.1操作
关键词 | 情形 | 后果 |
join | 其中一个表较小,但是key集中 | 分发都某一个或几个reduce上的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值和空值过多 | 这些空值都是由一个reduce处理,非常慢 | |
gorup by | group by维度过少,某值的数量过多 | 处理某值的reduce非常耗时 |
count distinct | 某特殊值过多 | 处理此特殊值的reduce耗时 |
order by |
1.2原因
- key分布不均匀
- 业务数据本身的特性
- 建表时考虑不周
- 某型sql语句本身就有数据倾斜
1.3表现
任务进度长时间维持在99%,查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。最长时长远大于平均时长。
2、数据倾斜的解决方法
2.1参数调整
- hive.map.aggr=true----map端部分聚合,相当于Combiner;
- hive.groupby.skewindata=true----有数据倾斜的时候进行负载均衡,当选项设定为true,生成的查询会有两个MR job。第一个MR job中,Map的输出结果集合会随机分不到Reduce中,每个Reduce做聚合操作,并输出结果,这样处理的结果是相同的group by key有可能被分配到不同的Reduce中,从而达到负载均衡的目的;第二个MR job再根据预处理的数据结果按照Group by key分布到reduce中(这个过程可以保证相同的Group By key分布到同一个Reduce中),最后完成最终的聚合操作。
2.2SQL语句调节
①关于join----关于驱动表的选取,选用join key分布最均匀的表作为驱动表。做好裁减和filter操作,以达到两表做join的时候,数据量相对变小的效果。
- 大小表join:使用map join让小的维度表(1000条以下的记录表)先进内存。在map端完成reduce。
- 大表join大表:把空值的key变成一个字符串加上随机数,把倾斜的数据分配到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
②count distinct大量相同特殊值
count distinct时将值为空的情况单独处理,如果是计算count distinct,可以不处理,直接过滤,在最后结果中加1.如果还有其他计算,需要进行group by,可以先将值为空记录单独处理,再和其他计算结果进行union。
③gorup by维度过小:采用sum() group by的方式来替换count(distinct)完成计算。
④特殊情况特殊处理:在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。
3、典型的业务场景
3.1空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如日志中的user_id,如果取其中的user_id和用户表中的user_id关联,会碰到数据倾斜的问题。
解决方法1:user_id为空的不参与关联
select * from log a join users b on a.user_id is not null and a.user_id = b.user_id union all select * from log a where a.user_id is null;
解决方法2:赋予空值分新的key值
select * from log a left outer join users b on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
结论:方法2比方法1效率更好,不但IO少了,而且作业数也少了。解决方法1中log读取两次,jobs是2.方法2job是1。这个优化适合无效ID产生的倾斜问题。把空值key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。
3.2小表不小不大,怎么用map join解决倾斜问题
使用map join解决小表关联大表的数据倾斜问题,这个方法使用频率非常高,单如果小表很大,大到map join会出现bug或异常,这时候需要特殊的处理。如下:
select * from log a left outer join users b on a.user_id = b.user_id;
users表有600w+的记录,把users分发到所有map上也是不小的开销,而且map join不支持这么大的小表。如果用普通的join,又会碰到数据倾斜的问题。解决方法如下:
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
3.3不同数据类型关联产生数据倾斜
场景:用户表user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的join操作时,默认的hash操作会按照int类型来进行分配,这样导致所有string类型Id的记录分配到一个reduce中。
解决方法:把数字类型转换成字符串类型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)
4、总结
使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:
- 采用log表,那些user_id比较倾斜,得到一个结果表temp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样时并不可少的。
- 数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就想一个社会的富人不多,奇特的人不多一样。所以temp1记录数会很少。把temp1和users做map join生成temp2,把temp2读到distribute file cache。这是一个map过程。
- map读入users和log,假设记录来自log,则检查user_id是否在temp2里,如果是,则输出到本地文件a,否则生成<user_id,value>的key,value对,假如记录来自member,生成<user_id,value>的key,value对,进入reduce阶段。
- 最终把a文件,把stage3 reduce阶段输出的文件合并起写到hdfs。
如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:
1、对于join,在判断小表不大于1G的情况下,使用map join
2、对于group by或distinct,设定 hive.groupby.skewindata=true
3、尽量使用上述的SQL语句调节进行优化