2018年第32周-获取hive进度功能

原理

大概原理时,自己写个hook,配置在hive里,然后hive每次运行sql时会执行hook,而我们写的这个hook会以http请求,发送这个hql相关信息,所以在这里我们还得写一个接口来获得hook发过来的信息,然后hive信息里有个文件记录MR的进度,分析这个文件即可得到hql的进度。

过程

1.编写hook, JcRestHook.java

package com.jc.hive;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class JcRestHook implements ExecuteWithHookContext {
    private static Logger logger = LoggerFactory.getLogger(JcRestHook.class);
    public void run(HookContext hookContext) throws Exception {
        QueryPlan queryPlan = hookContext.getQueryPlan();
        HiveConf conf = hookContext.getConf();
        
        String queryId = queryPlan.getQueryId();
        if (StringUtils.isEmpty(queryId)) {
            logger.warn("queryId is null or empty, return");
            return;
        }
        logger.info("queryId: " + queryId);

        String queryStr = URLEncoder.encode(queryPlan.getQueryStr(),
                CharEncoding.UTF_8);
        if (StringUtils.isEmpty(queryStr)) {
            logger.warn("queryStr is null or empty, return");

            return;
        }
        logger.info("queryStr: " + queryStr);

        String jobName = conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME);
        logger.info("jobName: " + jobName);


        String server = (String) conf.getAllProperties().get("hiveserver.execute.hook.server");
        if (StringUtils.isEmpty(server)) {
            logger.warn("server is null or empty, return");

            return;
        }
        logger.info("server: " + server);

        String rest = (String) conf.getAllProperties().get("hiveserver.execute.hook.rest");
        logger.info("rest: " + rest);
        if (StringUtils.isEmpty(rest)) {
            logger.warn("rest is null or empty, return");

            return;
        }

        Map<String, String> params = new HashMap<String, String>();
        params.put("server", server);
        params.put("hook", hookContext.getHookType().toString());
        params.put("queryId", queryId);
        params.put("queryStr", queryStr);
        params.put("jobName", jobName);
        params.put("timestamp", String.valueOf(new Date().getTime()));
        params.put("histFileName", SessionState.get().getHiveHistory().getHistFileName());
        try {
            HttpSender.doPost(rest, params);
        } catch (Exception e) {
            logger.error("do post error: "
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
}

简单的http工具类HttpSender.java

import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.Map;
import java.util.Map.Entry;

public class HttpSender {

    public static String sendPost(String url, String param, Map<String, String> header) throws UnsupportedEncodingException, IOException {

        String result = "";
        URL realUrl = new URL(url);
        URLConnection conn = realUrl.openConnection();
        conn.setConnectTimeout(5000);
        conn.setReadTimeout(15000);
        if (header != null) {
            for (Entry<String, String> entry : header.entrySet()) {
                conn.setRequestProperty(entry.getKey(), entry.getValue());
            }
        }
        conn.setDoOutput(true);
        conn.setDoInput(true);

        try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
            out.print(param);
            out.flush();

            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf8"))) {

                String line;
                while ((line = in.readLine()) != null) {
                    result += line;
                }
            }
        }
        return result;
    }

    public static void doPost(String rest, Map<String, String> params) throws IOException {
        StringBuffer urlParameters = new StringBuffer(); //"param1=a&param2=b&param3=c"
        String delim = "";
        for (Entry<String, String> entry : params.entrySet()) {
            urlParameters.append(delim).append(entry.getKey()).append("=").append(entry.getValue());
            delim = "&";
        }
        sendPost(rest, urlParameters.toString(), null);
    }
}

2.编写接口
获取hive相关信息

package com.jc.web.controller;

import com.jc.domain.ResultVO;
import com.jc.service.TaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
@RequestMapping(path="/hiveserver2")
public class Hiveserver2Controller {

    private static Logger logger = LoggerFactory.getLogger(Hiveserver2Controller.class);

    @Autowired
    private TaskService taskService;


    @RequestMapping(path = "/rest", method = RequestMethod.POST)
    @ResponseBody
    public ResultVO rest(String server, String hook, String queryId, String queryStr, String jobName, String timestamp, String histFileName) {

        logger.info("server: " + server);
        logger.info("hook: " + hook);
        logger.info("queryId: " + queryId);
        logger.info("queryStr: " + queryStr);
        logger.info("jobName: " + jobName);
        logger.info("timestamp: " + timestamp);
        logger.info("histFileName: " + histFileName);

        return taskService.hiveCallback(server, hook, queryId, queryStr, jobName, timestamp, histFileName);

    }


}

由于涉及到敏感数据处理,TaskService.hiveCallback就简要说一下:
其中histFileName是关键,hive会以json形式将进度信息输出到这个histFileName文件中,文件名大概如下:

/data/hiveDataDir/tmpdir/hive/hive_job_log_e3246d8b-8b87-4db7-96f6-34f10fe3e89c_641681466.txt

此文件是每次执行hive的hql时都会生成一个文件。进度信息都会以一行json输出,格式如下:

Counters plan={...}

其中json中有个stageList的值,就是可以分析出当前hql的进度,如果hql比较大,会有多个stage-1、stage-2、stage-3...等等。多个stage时,进度就要所有进度相加然后除以stage的数量才是这个hql的进度

histFileName文件需用apache的commons-io组件中的TailerListenerAdapter来监听

3.在这里有个小技巧,由于是异步的,客户端只有提交hql,而不知道hql对应的queryId是多少,更加不知道jobName。所以sql需要做一些小动作,封装个子查询,在where语句把id加上去,如:

SELECT * FROM (select phone from t_user where l_date>='20180101' and l_date<'20180201'  limit 10
) where 'jc_task_id_17_'='jc_task_id_17_'

这样我们就能解析出id=17,从而找到我们对应的hql(所以每次提交hql时,本地需记录id和hql的映射)。

4.配置hook,就是配置hive-site.xml,重启MetaStore和HiveServer

<property>
    <name>hive.exec.pre.hooks</name>
    <value>com.jc.hive.JcRestHook</value>
    <description>
      Comma-separated list of pre-execution hooks to be invoked for each statement.
      A pre-execution hook is specified as the name of a Java class which implements the
      org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.
    </description>
  </property>
  <property>
    <name>hive.exec.post.hooks</name>
    <value>com.jc.hive.JcRestHook</value>
    <description>
      Comma-separated list of post-execution hooks to be invoked for each statement.
      A post-execution hook is specified as the name of a Java class which implements the
      org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.
    </description>
  </property>
  <property>
        <name>hiveserver.execute.hook.server</name>
        <value>localhost:10000</value>
  </property>
  <property>
        <name>hiveserver.execute.hook.rest</name>
        <value>http://localhost:8034/hiveserver2/rest</value>
  </property>

相关推荐