聊聊storm的reportError

本文主要研究一下storm的reportError

IErrorReporter

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

public interface IErrorReporter {
    void reportError(Throwable error);
}
  • ISpoutOutputCollector、IOutputCollector、IBasicOutputCollector接口均继承了IErrorReporter接口

ISpoutOutputCollector

storm-core/1.2.2/storm-core-1.2.2-sources.jar!/org/apache/storm/spout/ISpoutOutputCollector.java

public interface ISpoutOutputCollector extends IErrorReporter{
    /**
        Returns the task ids that received the tuples.
    */
    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
    long getPendingCount();
}
  • ISpoutOutputCollector的实现类有SpoutOutputCollector、SpoutOutputCollectorImpl等

IOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java

public interface IOutputCollector extends IErrorReporter {
    /**
     * Returns the task ids that received the tuples.
     */
    List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void ack(Tuple input);

    void fail(Tuple input);

    void resetTimeout(Tuple input);

    void flush();
}
  • IOutputCollector的实现类有OutputCollector、BoltOutputCollectorImpl等

IBasicOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java

public interface IBasicOutputCollector extends IErrorReporter {
    List<Integer> emit(String streamId, List<Object> tuple);

    void emitDirect(int taskId, String streamId, List<Object> tuple);

    void resetTimeout(Tuple tuple);
}
  • IBasicOutputCollector的实现类有BasicOutputCollector

reportError

SpoutOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

@Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }

BoltOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

@Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }
可以看到SpoutOutputCollectorImpl及BoltOutputCollectorImpl的reportError方法,均调用了executor.getReportError().report(error);

ReportError.report

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java

public class ReportError implements IReportError {

    private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);

    private final Map<String, Object> topoConf;
    private final IStormClusterState stormClusterState;
    private final String stormId;
    private final String componentId;
    private final WorkerTopologyContext workerTopologyContext;

    private int maxPerInterval;
    private int errorIntervalSecs;
    private AtomicInteger intervalStartTime;
    private AtomicInteger intervalErrors;

    public ReportError(Map<String, Object> topoConf, IStormClusterState stormClusterState, String stormId, String componentId,
                       WorkerTopologyContext workerTopologyContext) {
        this.topoConf = topoConf;
        this.stormClusterState = stormClusterState;
        this.stormId = stormId;
        this.componentId = componentId;
        this.workerTopologyContext = workerTopologyContext;
        this.errorIntervalSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
        this.maxPerInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
        this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
        this.intervalErrors = new AtomicInteger(0);
    }

    @Override
    public void report(Throwable error) {
        LOG.error("Error", error);
        if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
            intervalErrors.set(0);
            intervalStartTime.set(Time.currentTimeSecs());
        }
        if (intervalErrors.incrementAndGet() <= maxPerInterval) {
            try {
                stormClusterState.reportError(stormId, componentId, Utils.hostname(),
                                              workerTopologyContext.getThisWorkerPort().longValue(), error);
            } catch (UnknownHostException e) {
                throw Utils.wrapInRuntime(e);
            }

        }
    }
}
  • 可以看到这里先判断interval是否需要重置,然后再判断error是否超过interval的最大次数,没有超过的话,则调用stormClusterState.reportError写入到存储,比如zk

StormClusterStateImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

@Override
    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
        String path = ClusterUtils.errorPath(stormId, componentId);
        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
        errorInfo.set_host(node);
        errorInfo.set_port(port.intValue());
        byte[] serData = Utils.serialize(errorInfo);
        stateStorage.mkdirs(path, defaultAcls);
        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls);
        stateStorage.set_data(lastErrorPath, serData, defaultAcls);
        List<String> childrens = stateStorage.get_children(path, false);

        Collections.sort(childrens, new Comparator<String>() {
            public int compare(String arg0, String arg1) {
                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
            }
        });

        while (childrens.size() > 10) {
            String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0);
            try {
                stateStorage.delete_node(znodePath);
            } catch (Exception e) {
                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                    // if the node is already deleted, do nothing
                    LOG.warn("Could not find the znode: {}", znodePath);
                } else {
                    throw e;
                }
            }
        }
    }
  • 这里使用ClusterUtils.errorPath(stormId, componentId)获取写入的目录,再通过ClusterUtils.lastErrorPath(stormId, componentId)获取写入的路径
  • 由于zk不适合存储大量数据,因而这里会判断如果childrens超过10的时候,会删除多余的节点,这里先按照节点名substring(1)升序排序,然后挨个删除

ClusterUtils.errorPath

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

public static final String ZK_SEPERATOR = "/";

    public static final String ERRORS_ROOT = "errors";

    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;

    public static String errorPath(String stormId, String componentId) {
        try {
            return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static String lastErrorPath(String stormId, String componentId) {
        return errorPath(stormId, componentId) + "-last-error";
    }

    public static String errorStormRoot(String stormId) {
        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
    }
  • errorPath的路径为/errors/{stormId}/{componentId},该目录下创建了以e开头的EPHEMERAL_SEQUENTIAL节点,error信息首先追加到该目录下,然后再判断如果超过10个则删除旧的节点
  • lastErrorPath的路径为/errors/{stormId}/{componentId}-last-error,用于存储该componentId的最后一个error

zkCli查看

[zk: localhost:2181(CONNECTED) 21] ls /storm/errors
[DRPCStateQuery-1-1540185943, reportErrorDemo-1-1540260375]
[zk: localhost:2181(CONNECTED) 22] ls /storm/errors/reportErrorDemo-1-1540260375
[print, print-last-error]
[zk: localhost:2181(CONNECTED) 23] ls /storm/errors/reportErrorDemo-1-1540260375/print
[e0000000291, e0000000290, e0000000295, e0000000294, e0000000293, e0000000292, e0000000299, e0000000298, e0000000297, e0000000296]
[zk: localhost:2181(CONNECTED) 24] ls /storm/errors/reportErrorDemo-1-1540260375/print/e0000000299
[]
[zk: localhost:2181(CONNECTED) 25] ls /storm/errors/reportErrorDemo-1-1540260375/print-last-error
[]

storm-ui

curl -i http://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375?sys=false
  • storm-ui请求了如上的接口,获取了topology相关的数据,其中spout或bolt中包括了lastError,展示了最近一个的error信息

StormApiResource

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java

/**
     * /api/v1/topology -> topo.
     */
    @GET
    @Path("/topology/{id}")
    @AuthNimbusOp(value = "getTopology", needsTopoId = true)
    @Produces("application/json")
    public Response getTopology(@PathParam("id") String id,
                                @DefaultValue(":all-time") @QueryParam("window") String window,
                                @QueryParam("sys") boolean sys,
                                @QueryParam(callbackParameterName) String callback) throws TException {
        topologyPageRequestMeter.mark();
        try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(config)) {
            return UIHelpers.makeStandardResponse(
                    UIHelpers.getTopologySummary(
                            nimbusClient.getClient().getTopologyPageInfo(id, window, sys),
                            window, config,
                            servletRequest.getRemoteUser()
                    ),
                    callback
            );
        }
    }
  • 这里调用了nimbusClient.getClient().getTopologyPageInfo(id, window, sys)方法

Nimbus.getTopologyPageInfo

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

@Override
    public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys)
        throws NotAliveException, AuthorizationException, TException {
        try {
            getTopologyPageInfoCalls.mark();
            CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyPageInfo");
            String topoName = common.topoName;
            IStormClusterState state = stormClusterState;
            int launchTimeSecs = common.launchTimeSecs;
            Assignment assignment = common.assignment;
            Map<List<Integer>, Map<String, Object>> beats = common.beats;
            Map<Integer, String> taskToComp = common.taskToComponent;
            StormTopology topology = common.topology;
            Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
            StormBase base = common.base;
            if (base == null) {
                throw new WrappedNotAliveException(topoId);
            }
            Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
            List<WorkerSummary> workerSummaries = null;
            Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
            if (assignment != null) {
                Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
                Map<String, String> nodeToHost = assignment.get_node_host();
                for (Entry<List<Long>, NodeInfo> entry : execToNodeInfo.entrySet()) {
                    NodeInfo ni = entry.getValue();
                    List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
                    exec2NodePort.put(entry.getKey(), nodePort);
                }

                workerSummaries = StatsUtil.aggWorkerStats(topoId,
                                                           topoName,
                                                           taskToComp,
                                                           beats,
                                                           exec2NodePort,
                                                           nodeToHost,
                                                           workerToResources,
                                                           includeSys,
                                                           true); //this is the topology page, so we know the user is authorized
            }

            TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
                                                                        exec2NodePort,
                                                                        taskToComp,
                                                                        beats,
                                                                        topology,
                                                                        window,
                                                                        includeSys,
                                                                        state);

            //......
            return topoPageInfo;
        } catch (Exception e) {
            LOG.warn("Get topo page info exception. (topology id='{}')", topoId, e);
            if (e instanceof TException) {
                throw (TException) e;
            }
            throw new RuntimeException(e);
        }
    }
  • 这里调用了StatsUtil.aggTopoExecsStats来获取TopologyPageInfo

StatsUtil.aggTopoExecsStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

/**
     * aggregate topo executors stats.
     *
     * @param topologyId     topology id
     * @param exec2nodePort  executor -> host+port
     * @param task2component task -> component
     * @param beats          executor[start, end] -> executor heartbeat
     * @param topology       storm topology
     * @param window         the window to be aggregated
     * @param includeSys     whether to include system streams
     * @param clusterState   cluster state
     * @return TopologyPageInfo thrift structure
     */
    public static TopologyPageInfo aggTopoExecsStats(
        String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats,
        StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) {
        List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology);
        Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, beatList);
        return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState);
    }
  • StatsUtil.aggTopoExecsStats方法最后调用了postAggregateTopoStats方法

StatsUtil.postAggregateTopoStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

private static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData,
                                                          String topologyId, IStormClusterState clusterState) {
        TopologyPageInfo ret = new TopologyPageInfo(topologyId);

        ret.set_num_tasks(task2comp.size());
        ret.set_num_workers(((Set) accData.get(WORKERS_SET)).size());
        ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0);

        Map bolt2stats = ClientStatsUtil.getMapByKey(accData, BOLT_TO_STATS);
        Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>();
        for (Object o : bolt2stats.entrySet()) {
            Map.Entry e = (Map.Entry) o;
            Map m = (Map) e.getValue();
            long executed = getByKeyOr0(m, EXECUTED).longValue();
            if (executed > 0) {
                double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue();
                m.put(EXEC_LATENCY, execLatencyTotal / executed);

                double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue();
                m.put(PROC_LATENCY, procLatencyTotal / executed);
            }
            m.remove(EXEC_LAT_TOTAL);
            m.remove(PROC_LAT_TOTAL);
            String id = (String) e.getKey();
            m.put("last-error", getLastError(clusterState, topologyId, id));

            aggBolt2stats.put(id, thriftifyBoltAggStats(m));
        }

        //......

        return ret;
    }

    private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) {
        return stormClusterState.lastError(stormId, compId);
    }
  • 这里有添加last-error,通过getLastError调用,之后再通过thriftifyBoltAggStats转化到thrift对象
  • 这里调用了stormClusterState.lastError(stormId, compId)获取last-error

UIHelpers.getTopologySummary

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

/**
     * getTopologySummary.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @param remoteUser remoteUser
     * @return getTopologySummary
     */
    public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo,
                                                         String window, Map<String, Object> config, String remoteUser) {
        Map<String, Object> result = new HashMap();
        Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf());
        long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
        Map<String, Object> unpackedTopologyPageInfo =
                unpackTopologyInfo(topologyPageInfo, window, config);
        result.putAll(unpackedTopologyPageInfo);
        result.put("user", remoteUser);
        result.put("window", window);
        result.put("windowHint", getWindowHint(window));
        result.put("msgTimeout", messageTimeout);
        result.put("configuration", topologyConf);
        result.put("visualizationTable", new ArrayList());
        result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
        return result;
    }
  • 获取到TopologyPageInfo之后,UIHelpers.getTopologySummary对其进行unpackTopologyInfo

UIHelpers.unpackTopologyInfo

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

/**
     * unpackTopologyInfo.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @return unpackTopologyInfo
     */
    private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) {
        Map<String, Object> result = new HashMap();
        result.put("id", topologyPageInfo.get_id());
        //......

        Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats();
        List<Map> spoutStats = new ArrayList();

        for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) {
            spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey()));
        }
        result.put("spouts", spoutStats);

        Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats();
        List<Map> boltStats = new ArrayList();

        for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) {
            boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey()));
        }
        result.put("bolts", boltStats);

        //......
        result.put("samplingPct", samplingPct);
        result.put("replicationCount", topologyPageInfo.get_replication_count());
        result.put("topologyVersion", topologyPageInfo.get_topology_version());
        result.put("stormVersion", topologyPageInfo.get_storm_version());
        return result;
    }

    /**
     * getTopologySpoutAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param spoutId spoutId
     * @return getTopologySpoutAggStatsMap
     */
    private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                   String spoutId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("spoutId", spoutId);
        result.put("encodedSpoutId", URLEncoder.encode(spoutId));
        SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout();
        result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms());
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTopologyBoltAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param boltId boltId
     * @return getTopologyBoltAggStatsMap
     */
    private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                  String boltId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("boltId", boltId);
        result.put("encodedBoltId", URLEncoder.encode(boltId));
        BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt();
        result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity()));
        result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
        result.put("executed", boltAggregateStats.get_executed());
        result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTruncatedErrorString.
     * @param errorString errorString
     * @return getTruncatedErrorString
     */
    private static String getTruncatedErrorString(String errorString) {
        return errorString.substring(0, Math.min(errorString.length(), 200));
    }
  • 注意这里对spout调用了getTopologySpoutAggStatsMap,对bolt调用了getTopologyBoltAggStatsMap
  • 这两个方法对lastError都进行了getTruncatedErrorString处理,最大只substring(0,200)

crash log

2018-10-23 02:53:28.118 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
2018-10-23 02:53:28.129 o.a.s.d.executor Thread-10-print-executor[7 7] [ERROR]
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
2018-10-23 02:53:28.175 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__11404$fn__11405.invoke(worker.clj:792) [storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__10612$fn__10613.invoke(executor.clj:281) [storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-10-23 02:53:28.176 o.a.s.d.worker Thread-41 [INFO] Shutting down worker reportErrorDemo-2-1540263136 f9856902-cfe9-45c7-b675-93a29d3d3d36 6700
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Terminating messaging context
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Shutting down executors
2018-10-23 02:53:28.177 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[8 8]
2018-10-23 02:53:28.182 o.a.s.util Thread-3-disruptor-executor[8 8]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.186 o.a.s.util Thread-4-spout-executor[8 8] [INFO] Async loop interrupted!
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[8 8]
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[12 12]
2018-10-23 02:53:28.189 o.a.s.util Thread-5-disruptor-executor[12 12]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.util Thread-6-spout-executor[12 12] [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[12 12]
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shutting down executor count:[2 2]
2018-10-23 02:53:28.191 o.a.s.util Thread-7-disruptor-executor[2 2]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.193 o.a.s.util Thread-8-count-executor[2 2] [INFO] Async loop interrupted!
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shut down executor count:[2 2]
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shutting down executor print:[7 7]
2018-10-23 02:53:28.196 o.a.s.util Thread-9-disruptor-executor[7 7]-send-queue [INFO] Async loop interrupted!

小结

  • spout或bolt的方法里头如果抛出异常会导致整个worker die掉,同时也会自动记录异常到zk但是代价就是worker die掉不断被重启
  • reportError可以通过try catch结合使用,使得有异常之后,worker不会die掉,同时也把error信息记录起来;不过一个topology的同一个component也只记录最近10个异常,采用的是EPHEMERAL_SEQUENTIAL节点来保存,随着worker的die而销毁;lastError采用的是PERSISTENT节点。两者在topology被kill的时候相关信息都会被删掉。
  • storm-ui展示了每个component的lastError信息,展示的时候错误信息的长度最大为200

doc

相关推荐