hive-脚本增量导入数据
0 现象描述:
ct_teach_coursewares_content从mysql抽取数据到hive仓库时,抽取时间过长,以前是全量抽取,现在计划修改成增量抽取,
其中,ct_teach_coursewares_content 和 ct_teach_coursewares 通过 coursewares_id 授课课件ID 关联, ct_teach_coursewares表中 update_time是增加索引了的
1 修改流程:
1 ct_teach_coursewares(老师授课表) 这边一直是全量抽取的,这边抽取速度也挺快,几分钟就结束,逻辑不动
2 在hive中将抽取过来的 ct_teach_coursewares_content(课件内容表) 按照 select coursewares_id from ct_teach_coursewares where update_time>='${com_present_date_Y_m_d}' and update_time<'${com_after_date_Y_m_d}' 得到增量的 id 写到本地磁盘
3.0 id_list=`cat ${MODE_PATH}/${ct_teach_coursewares_column}/*` 将本地磁盘的id列出来,其中 id格式需要拼接成 '11','22','33'这种方式--- 带单引号是因为coursewares_id是varchar类型
3.1 ct_teach_coursewares_content 在抽取的时候根据2的id去mysql中抽取,query="select coursewares_id,belong_type,content,school_id from ct_teach_coursewares_content where coursewares_id in (${id_list})"
2 一些主要代码:
2.1 步骤2中,ct_teach_coursewares表抽取id并拼接:
insert overwrite local directory '${MODE_PATH}/${hive_table_column}'
ROW FORMAT DELIMITED
fields terminated by '\001'
STORED AS TEXTFILE
select
concat_ws(',',collect_set(concat('\'',coursewares_id,'\'')))
from stg_ct_teach_coursewares where day='${com_present_date_Y_m_d}' and update_time >= '${com_present_date_Y_m_d}' AND update_time < '${com_after_date_Y_m_d}'
-----> 因为查询只查询一个 coursewares_id 因此在使用 collect_set 列传行的时候 没有使用 group by
collect_set里参数是字符串,因此 coursewares_id使用了单引号来拼接, 最后用concat_ws将多个字符串中间用,间隔 最终构建成 '11','22','33'的效果
2.2 总调用sh写法:
#通过起始时间和天数获取需要处理的数据的具体时间
comAddDay $com_start_date $c
hive_table=stg_ct_teach_coursewares_content
funDropTmpTable
#0 获取待统计的授课课件ID
ct_teach_coursewares_column="coursewares_id"
exportIdFromHiveTable "${ct_teach_coursewares_column}"
echo "==============================================="
#查询文件
echo "id所在的,目录: ${MODE_PATH}/${ct_teach_coursewares_column}/ "
id_list=`cat ${MODE_PATH}/${ct_teach_coursewares_column}/*`
#echo "${id_list}"
if [ "${id_list}" == "" ];then
echo "没有待抽取的评价ID数据,退出。"
exit 0
else
echo "${id_list}"
### 1 抽取数据
query="select coursewares_id,belong_type,content,school_id from ct_teach_coursewares_content where coursewares_id in (${id_list})"
query_count="SELECT COUNT(1) FROM ct_teach_coursewares_content where coursewares_id in (${id_list})"
tableIsSharding="true"
comGetSqoopHdfs
funDirToTmpTable
tabledir="$output_dir"
echo $tabledir
stgCheck $tabledir
funTmpToStg
funDropTmpTable
fi