2018年第32周-获取hive进度功能
原理
大概原理时,自己写个hook,配置在hive里,然后hive每次运行sql时会执行hook,而我们写的这个hook会以http请求,发送这个hql相关信息,所以在这里我们还得写一个接口来获得hook发过来的信息,然后hive信息里有个文件记录MR的进度,分析这个文件即可得到hql的进度。
过程
1.编写hook, JcRestHook.java
package com.jc.hive; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang3.CharEncoding; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URLEncoder; import java.util.Date; import java.util.HashMap; import java.util.Map; public class JcRestHook implements ExecuteWithHookContext { private static Logger logger = LoggerFactory.getLogger(JcRestHook.class); public void run(HookContext hookContext) throws Exception { QueryPlan queryPlan = hookContext.getQueryPlan(); HiveConf conf = hookContext.getConf(); String queryId = queryPlan.getQueryId(); if (StringUtils.isEmpty(queryId)) { logger.warn("queryId is null or empty, return"); return; } logger.info("queryId: " + queryId); String queryStr = URLEncoder.encode(queryPlan.getQueryStr(), CharEncoding.UTF_8); if (StringUtils.isEmpty(queryStr)) { logger.warn("queryStr is null or empty, return"); return; } logger.info("queryStr: " + queryStr); String jobName = conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME); logger.info("jobName: " + jobName); String server = (String) conf.getAllProperties().get("hiveserver.execute.hook.server"); if (StringUtils.isEmpty(server)) { logger.warn("server is null or empty, return"); return; } logger.info("server: " + server); String rest = (String) conf.getAllProperties().get("hiveserver.execute.hook.rest"); logger.info("rest: " + rest); if (StringUtils.isEmpty(rest)) { logger.warn("rest is null or empty, return"); return; } Map<String, String> params = new HashMap<String, String>(); params.put("server", server); params.put("hook", hookContext.getHookType().toString()); params.put("queryId", queryId); params.put("queryStr", queryStr); params.put("jobName", jobName); params.put("timestamp", String.valueOf(new Date().getTime())); params.put("histFileName", SessionState.get().getHiveHistory().getHistFileName()); try { HttpSender.doPost(rest, params); } catch (Exception e) { logger.error("do post error: " + ExceptionUtils.getFullStackTrace(e)); } } }
简单的http工具类HttpSender.java
import java.io.*; import java.net.URL; import java.net.URLConnection; import java.util.Map; import java.util.Map.Entry; public class HttpSender { public static String sendPost(String url, String param, Map<String, String> header) throws UnsupportedEncodingException, IOException { String result = ""; URL realUrl = new URL(url); URLConnection conn = realUrl.openConnection(); conn.setConnectTimeout(5000); conn.setReadTimeout(15000); if (header != null) { for (Entry<String, String> entry : header.entrySet()) { conn.setRequestProperty(entry.getKey(), entry.getValue()); } } conn.setDoOutput(true); conn.setDoInput(true); try (PrintWriter out = new PrintWriter(conn.getOutputStream())) { out.print(param); out.flush(); try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf8"))) { String line; while ((line = in.readLine()) != null) { result += line; } } } return result; } public static void doPost(String rest, Map<String, String> params) throws IOException { StringBuffer urlParameters = new StringBuffer(); //"param1=a¶m2=b¶m3=c" String delim = ""; for (Entry<String, String> entry : params.entrySet()) { urlParameters.append(delim).append(entry.getKey()).append("=").append(entry.getValue()); delim = "&"; } sendPost(rest, urlParameters.toString(), null); } }
2.编写接口
获取hive相关信息
package com.jc.web.controller; import com.jc.domain.ResultVO; import com.jc.service.TaskService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping(path="/hiveserver2") public class Hiveserver2Controller { private static Logger logger = LoggerFactory.getLogger(Hiveserver2Controller.class); @Autowired private TaskService taskService; @RequestMapping(path = "/rest", method = RequestMethod.POST) @ResponseBody public ResultVO rest(String server, String hook, String queryId, String queryStr, String jobName, String timestamp, String histFileName) { logger.info("server: " + server); logger.info("hook: " + hook); logger.info("queryId: " + queryId); logger.info("queryStr: " + queryStr); logger.info("jobName: " + jobName); logger.info("timestamp: " + timestamp); logger.info("histFileName: " + histFileName); return taskService.hiveCallback(server, hook, queryId, queryStr, jobName, timestamp, histFileName); } }
由于涉及到敏感数据处理,TaskService.hiveCallback就简要说一下:
其中histFileName是关键,hive会以json形式将进度信息输出到这个histFileName文件中,文件名大概如下:
/data/hiveDataDir/tmpdir/hive/hive_job_log_e3246d8b-8b87-4db7-96f6-34f10fe3e89c_641681466.txt
此文件是每次执行hive的hql时都会生成一个文件。进度信息都会以一行json输出,格式如下:
Counters plan={...}
其中json中有个stageList的值,就是可以分析出当前hql的进度,如果hql比较大,会有多个stage-1、stage-2、stage-3...等等。多个stage时,进度就要所有进度相加然后除以stage的数量才是这个hql的进度
histFileName文件需用apache的commons-io组件中的TailerListenerAdapter来监听
3.在这里有个小技巧,由于是异步的,客户端只有提交hql,而不知道hql对应的queryId是多少,更加不知道jobName。所以sql需要做一些小动作,封装个子查询,在where语句把id加上去,如:
SELECT * FROM (select phone from t_user where l_date>='20180101' and l_date<'20180201' limit 10 ) where 'jc_task_id_17_'='jc_task_id_17_'
这样我们就能解析出id=17,从而找到我们对应的hql(所以每次提交hql时,本地需记录id和hql的映射)。
4.配置hook,就是配置hive-site.xml,重启MetaStore和HiveServer
<property> <name>hive.exec.pre.hooks</name> <value>com.jc.hive.JcRestHook</value> <description> Comma-separated list of pre-execution hooks to be invoked for each statement. A pre-execution hook is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface. </description> </property> <property> <name>hive.exec.post.hooks</name> <value>com.jc.hive.JcRestHook</value> <description> Comma-separated list of post-execution hooks to be invoked for each statement. A post-execution hook is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface. </description> </property> <property> <name>hiveserver.execute.hook.server</name> <value>localhost:10000</value> </property> <property> <name>hiveserver.execute.hook.rest</name> <value>http://localhost:8034/hiveserver2/rest</value> </property>