spark sql 高效迭代计算
如下sql代码离线计算指标,spark直接执行sql,每一个createtable...select语句都是一个action,创建hive表,数据写入hdfs,下一个sql依赖表,需要从hdfs中重新读取数据,整个作业产生13个action,与使用mapreduce引擎计算相比优势不是太明显,不能最大化发挥spark迭代计算的优势。为了解决这个问题,有两种方案:
- 1.在HDFS与计算引擎之间增加分布式缓存([Alluxio](http://www.alluxio.org/docs/master/cn/)),createtable...select产生的数据通过Alluxio缓存,下一个sql读取表数据直接从缓存中读取,减少磁盘IO,但作业还是产生13个action,优化效果不是太明显;
- 2.解析sql,获取每一个createtable...select语句select部分,调用spark.sql生成dataframe,再调用registertemptable注册为临时表,createtable...select语句不再触发action,作业只有一个insert语句,触发一次action,最大化发挥spark迭代计算优势。
采用第二种方式,不能直接使用sparksql或者sparkthriftserver,需要提供一个sparkdriver做转换处理,同时在sparkdriver之上可以定制各种算法,例如核概率估计算法实现文档:https://github.com/melin/sparkdriver
--准备数据 tdl_eunomia_access_ddg_sht_event create table tdl_eunomia_access_ddg_sht_event LIFECYCLE 10 as select event_id, prodcateleafid, isvrtlprod, byr_byrstarlvl, sel_isb2cuser, byrid, substr(ds,1,8) as ds, substr(ds,1,10) as ds_hour, dispatch_time from secods.odl_event_crm_rights_start where ds>='201611300000' and ds<='201611302359' and perf_test = 'false'; create table tdl_eunomia_access_ddg_sht_indicator_tmp LIFECYCLE 10 as select byrId as byrId, 'byrId' as keys_list, '1' as dis_index from tdl_eunomia_access_ddg_sht_event; create table tdl_eunomia_access_ddg_sht_indicator LIFECYCLE 10 as select distinct byrId, keys_list from tdl_eunomia_access_ddg_sht_indicator_tmp; --指标 create table tdl_eunomia_calc_once_fg_sdt LIFECYCLE 10 as select a.* from ( select dispatch_time ,byrId ,if(( selId > 0 AND byrId > 0),1,0) as condition_ENN ,substr(ds,1,8) as ds ,if(substr(ds,1,8)>='20161116',1,0) as ds_ENN from secods.odl_event_crm_rights_start where ds<='201611302359' and ds>='201611160000' and byrId is not null and (( selId > 0 AND byrId > 0)) ) a join ( select byrId from tdl_eunomia_access_ddg_sht_indicator where keys_list='byrId' ) b on a.byrId=b.byrId create table tdl_eunomia_calc_once_fg_sdt1 LIFECYCLE 10 as select byrId ,COUNT(1) as value ,ds ,'ENN' as indi_code from tdl_eunomia_calc_once_fg_sdt where ds<substr('2016113023',1,8) and ds_ENN=1 and condition_ENN=1 group by ds,byrId; create table tdl_eunomia_calc_once_fg_sdt2 LIFECYCLE 10 as select dispatch_time as event_time ,byrId ,COUNT(1) over(partition by ds,byrId order by dispatch_time rows 500 preceding) as value ,ds ,'ENN' as indi_code from tdl_eunomia_calc_once_fg_sdt where condition_ENN=1 and ds>=substr('2016113000',1,8); set spark.cache.table.tdl_pub_sec_date_cfg_ENN = true; create table tdl_pub_sec_date_cfg_ENN LIFECYCLE 10 as select date_num as bdate from secdw.pub_sec_date_cfg where date_num<=substr('2016113023',1,8) and date_num>=substr('2016113000',1,8) and date_type='date'; create table tdl_eunomia_calc_once_fg_sdt40 LIFECYCLE 10 as select a.*,b.bdate from tdl_eunomia_calc_once_fg_sdt1 a join tdl_pub_sec_date_cfg_ENN b where a.ds<b.bdate and (a.ds>=to_char(dateadd(to_date(b.bdate,'yyyymmdd'),1-15,'dd') ,'yyyymmdd') and a.indi_code='ENN'); create table tdl_eunomia_calc_once_fg_sdt4 LIFECYCLE 10 as select byrId,SUM(cast(value as bigint)) as ENN,bdate as ds,indi_code from tdl_eunomia_calc_once_fg_sdt40 where to_char(dateadd(to_date(bdate,'yyyymmdd'),1-15,'dd') ,'yyyymmdd')<=ds and indi_code='ENN' group by byrId,bdate,indi_code; create table tdl_eunomia_calc_once_fg_sdt2 LIFECYCLE 10 as select * from tdl_eunomia_calc_once_fg_sdt2 where ds<=substr('2016113023',1,8) and ds>=substr('2016113000',1,8) and indi_code in('ENN') and value<500; create table tdl_eunomia_calc_once_fg_sdt3 LIFECYCLE 10 as select cast(event_time as bigint) as event_time ,byrId ,value ,ds ,indi_code from( select a.event_time ,a.byrId ,cast(case when a.indi_code='ENN' then coalesce(a.value,0)+coalesce(b.value,0) end as string) as value ,a.indi_code ,a.ds from tdl_eunomia_calc_once_fg_sdt2 a left outer join tdl_eunomia_calc_once_fg_sdt4 b on a.byrId=b.byrId and a.indi_code=b.indi_code and a.ds=b.ds and a.indi_code in('ENN') and b.indi_code in('ENN') union all select cast(ds as bigint) as event_time ,byrId ,value ,indi_code ,ds from tdl_eunomia_calc_once_fg_sdt4 where indi_code in('ENN') ) a; create table adl_eunomia_calc_once_fg_sdt LIFECYCLE 10 as select byrId ,if(gmt_begin=ds,unix_timestamp(to_date(ds,'yyyymmdd'))*1000,gmt_begin) as gmt_begin ,if(gmt_end is null,unix_timestamp(dateadd(to_date(ds,'yyyymmdd'),1,'dd'))*1000,gmt_end) as gmt_end ,value ,ds ,indi_code from( select byrId ,event_time as gmt_begin ,lead(event_time,1) over(partition by byrId, ds, indi_code order by event_time asc) as gmt_end ,case when indi_code='ENN' then cast(value as bigint) end as value ,indi_code ,ds from tdl_eunomia_calc_once_fg_sdt3 where indi_code in('ENN') ) a; insert OVERWRITE TABLE adl_eunomia_access_ddg_sht PARTITION(ds) select a.event_id, a.prodcateleafid, a.isvrtlprod, a.byr_byrstarlvl, a.sel_isb2cuser, a.byrid, secods:indi_accdata_filter_value(a.dispatch_time, b.aet_a_GROUP) as indireltmaeta3d, a.ds_hour as ds from tdl_eunomia_access_ddg_sht_event a left outer join ( select byrId, substr(ds, 1, 10) ds, WM_CONCAT(concat(value, ";", gmt_begin, ';', gmt_end)) aet_a_GROUP from adl_eunomia_calc_once_fg_sdt where substr(ds,1,10)>='2016113000' and substr(ds,1,10)<='2016113023' and indi_code = 'ENN' group by byrId, substr(ds, 1, 10) ) b on FillBlankFunc(a.byrId) = b.byrId and a.ds=substr(b ```.ds, 1, 8);
相关推荐
adayan0 2020-05-19
tugangkai 2020-05-09
Johnson0 2020-07-28
rongwenbin 2020-06-15
sxyhetao 2020-06-12
Johnson0 2020-06-08
Hhanwen 2020-05-29
Hhanwen 2020-05-29
Johnson0 2020-05-17
登峰小蚁 2020-05-11
Hhanwen 2020-05-04
Hhanwen 2020-05-03
Oeljeklaus 2020-04-20
Hhanwen 2020-07-26
zhixingheyitian 2020-07-19
yanqianglifei 2020-07-07
Hhanwen 2020-07-05