spark sql 高效迭代计算

如下sql代码离线计算指标,spark直接执行sql,每一个createtable...select语句都是一个action,创建hive表,数据写入hdfs,下一个sql依赖表,需要从hdfs中重新读取数据,整个作业产生13个action,与使用mapreduce引擎计算相比优势不是太明显,不能最大化发挥spark迭代计算的优势。为了解决这个问题,有两种方案:

  • 1.在HDFS与计算引擎之间增加分布式缓存([Alluxio](http://www.alluxio.org/docs/master/cn/)),createtable...select产生的数据通过Alluxio缓存,下一个sql读取表数据直接从缓存中读取,减少磁盘IO,但作业还是产生13个action,优化效果不是太明显;
  • 2.解析sql,获取每一个createtable...select语句select部分,调用spark.sql生成dataframe,再调用registertemptable注册为临时表,createtable...select语句不再触发action,作业只有一个insert语句,触发一次action,最大化发挥spark迭代计算优势。

采用第二种方式,不能直接使用sparksql或者sparkthriftserver,需要提供一个sparkdriver做转换处理,同时在sparkdriver之上可以定制各种算法,例如核概率估计算法实现文档:https://github.com/melin/sparkdriver

--准备数据 tdl_eunomia_access_ddg_sht_event
create table tdl_eunomia_access_ddg_sht_event LIFECYCLE 10 as
select
	event_id,
	prodcateleafid,
	isvrtlprod,
	byr_byrstarlvl,
	sel_isb2cuser,
	byrid,
	substr(ds,1,8) as ds,
	substr(ds,1,10) as ds_hour,
	dispatch_time
from
	secods.odl_event_crm_rights_start
where
	ds>='201611300000'
	and ds<='201611302359'
	and perf_test = 'false';

create table tdl_eunomia_access_ddg_sht_indicator_tmp LIFECYCLE 10 as
select byrId as byrId, 'byrId' as keys_list, '1' as dis_index
from tdl_eunomia_access_ddg_sht_event;

create table tdl_eunomia_access_ddg_sht_indicator LIFECYCLE 10 as
select distinct byrId,  keys_list from tdl_eunomia_access_ddg_sht_indicator_tmp;

--指标
create table tdl_eunomia_calc_once_fg_sdt LIFECYCLE 10 as
select a.* from (
	select
		dispatch_time
		,byrId
		,if(( selId > 0 AND byrId > 0),1,0) as condition_ENN
		,substr(ds,1,8) as ds
		,if(substr(ds,1,8)>='20161116',1,0) as ds_ENN
	from secods.odl_event_crm_rights_start
	where ds<='201611302359'
	and ds>='201611160000'
	and byrId is not null
	and (( selId > 0 AND byrId > 0))
) a
join ( select byrId from tdl_eunomia_access_ddg_sht_indicator where keys_list='byrId' ) b 
on a.byrId=b.byrId

create table tdl_eunomia_calc_once_fg_sdt1 LIFECYCLE 10 as
select
	byrId
	,COUNT(1) as value
	,ds
	,'ENN' as indi_code
from tdl_eunomia_calc_once_fg_sdt
where ds<substr('2016113023',1,8) and ds_ENN=1 and condition_ENN=1
group by ds,byrId;

create table tdl_eunomia_calc_once_fg_sdt2 LIFECYCLE 10 as
select
	dispatch_time as event_time
	,byrId
	,COUNT(1) over(partition by ds,byrId order by dispatch_time rows 500 preceding) as value
	,ds
	,'ENN' as indi_code
from tdl_eunomia_calc_once_fg_sdt
where condition_ENN=1 and ds>=substr('2016113000',1,8);

set spark.cache.table.tdl_pub_sec_date_cfg_ENN = true;

create table tdl_pub_sec_date_cfg_ENN LIFECYCLE 10 as
select date_num as bdate from secdw.pub_sec_date_cfg
where date_num<=substr('2016113023',1,8) and date_num>=substr('2016113000',1,8) and date_type='date';

create table tdl_eunomia_calc_once_fg_sdt40 LIFECYCLE 10 as
select a.*,b.bdate
from tdl_eunomia_calc_once_fg_sdt1 a
join tdl_pub_sec_date_cfg_ENN b 
where a.ds<b.bdate and (a.ds>=to_char(dateadd(to_date(b.bdate,'yyyymmdd'),1-15,'dd') ,'yyyymmdd') and a.indi_code='ENN');

create table tdl_eunomia_calc_once_fg_sdt4 LIFECYCLE 10 as
select byrId,SUM(cast(value as bigint)) as ENN,bdate as ds,indi_code
from tdl_eunomia_calc_once_fg_sdt40
where to_char(dateadd(to_date(bdate,'yyyymmdd'),1-15,'dd') ,'yyyymmdd')<=ds and indi_code='ENN'
group by byrId,bdate,indi_code;

create table tdl_eunomia_calc_once_fg_sdt2 LIFECYCLE 10 as
select * from tdl_eunomia_calc_once_fg_sdt2
where ds<=substr('2016113023',1,8) and ds>=substr('2016113000',1,8) and indi_code in('ENN') and value<500;

create table tdl_eunomia_calc_once_fg_sdt3 LIFECYCLE 10 as
select
	cast(event_time as bigint) as event_time
	,byrId
	,value
	,ds
	,indi_code
from(
	select
		a.event_time
		,a.byrId
		,cast(case
		when a.indi_code='ENN' then coalesce(a.value,0)+coalesce(b.value,0)
		end as string) as value
		,a.indi_code
		,a.ds
	from
		tdl_eunomia_calc_once_fg_sdt2 a
	left outer join
		tdl_eunomia_calc_once_fg_sdt4 b
	on
		a.byrId=b.byrId and 
		a.indi_code=b.indi_code
		and a.ds=b.ds
		and a.indi_code in('ENN')
		and b.indi_code in('ENN')
	union all
	select
		cast(ds as bigint) as event_time
		,byrId
		,value
		,indi_code
		,ds
	from tdl_eunomia_calc_once_fg_sdt4
	where indi_code in('ENN')
) a;

create table adl_eunomia_calc_once_fg_sdt LIFECYCLE 10 as
select
	byrId
	,if(gmt_begin=ds,unix_timestamp(to_date(ds,'yyyymmdd'))*1000,gmt_begin) as gmt_begin
	,if(gmt_end is null,unix_timestamp(dateadd(to_date(ds,'yyyymmdd'),1,'dd'))*1000,gmt_end) as gmt_end
	,value
	,ds
	,indi_code
from(
	select
		byrId
		,event_time as gmt_begin
		,lead(event_time,1) over(partition by byrId, ds, indi_code order by event_time asc) as gmt_end
		,case
		when indi_code='ENN' then cast(value as bigint)
		end as value
		,indi_code
		,ds
	from tdl_eunomia_calc_once_fg_sdt3
	where indi_code in('ENN')
) a;

insert OVERWRITE TABLE adl_eunomia_access_ddg_sht PARTITION(ds)
select
    a.event_id,
    a.prodcateleafid,
    a.isvrtlprod,
    a.byr_byrstarlvl,
    a.sel_isb2cuser,
    a.byrid,
    secods:indi_accdata_filter_value(a.dispatch_time, b.aet_a_GROUP) as indireltmaeta3d,
    a.ds_hour as ds
from tdl_eunomia_access_ddg_sht_event a
left outer join (
    select
        byrId, substr(ds, 1, 10) ds, WM_CONCAT(concat(value, ";", gmt_begin, ';', gmt_end)) aet_a_GROUP
    from adl_eunomia_calc_once_fg_sdt
    where
        substr(ds,1,10)>='2016113000'
        and substr(ds,1,10)<='2016113023'
        and indi_code = 'ENN'
    group by byrId, substr(ds, 1, 10)
) b on
    FillBlankFunc(a.byrId) = b.byrId
    and a.ds=substr(b
```.ds, 1, 8);

相关推荐