聊聊storm的reportError
序
本文主要研究一下storm的reportError
IErrorReporter
storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java
public interface IErrorReporter { void reportError(Throwable error); }
- ISpoutOutputCollector、IOutputCollector、IBasicOutputCollector接口均继承了IErrorReporter接口
ISpoutOutputCollector
storm-core/1.2.2/storm-core-1.2.2-sources.jar!/org/apache/storm/spout/ISpoutOutputCollector.java
public interface ISpoutOutputCollector extends IErrorReporter{ /** Returns the task ids that received the tuples. */ List<Integer> emit(String streamId, List<Object> tuple, Object messageId); void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); long getPendingCount(); }
- ISpoutOutputCollector的实现类有SpoutOutputCollector、SpoutOutputCollectorImpl等
IOutputCollector
storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
public interface IOutputCollector extends IErrorReporter { /** * Returns the task ids that received the tuples. */ List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple); void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple); void ack(Tuple input); void fail(Tuple input); void resetTimeout(Tuple input); void flush(); }
- IOutputCollector的实现类有OutputCollector、BoltOutputCollectorImpl等
IBasicOutputCollector
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
public interface IBasicOutputCollector extends IErrorReporter { List<Integer> emit(String streamId, List<Object> tuple); void emitDirect(int taskId, String streamId, List<Object> tuple); void resetTimeout(Tuple tuple); }
- IBasicOutputCollector的实现类有BasicOutputCollector
reportError
SpoutOutputCollectorImpl.reportError
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@Override public void reportError(Throwable error) { executor.getErrorReportingMetrics().incrReportedErrorCount(); executor.getReportError().report(error); }
BoltOutputCollectorImpl.reportError
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@Override public void reportError(Throwable error) { executor.getErrorReportingMetrics().incrReportedErrorCount(); executor.getReportError().report(error); }可以看到SpoutOutputCollectorImpl及BoltOutputCollectorImpl的reportError方法,均调用了executor.getReportError().report(error);
ReportError.report
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
public class ReportError implements IReportError { private static final Logger LOG = LoggerFactory.getLogger(ReportError.class); private final Map<String, Object> topoConf; private final IStormClusterState stormClusterState; private final String stormId; private final String componentId; private final WorkerTopologyContext workerTopologyContext; private int maxPerInterval; private int errorIntervalSecs; private AtomicInteger intervalStartTime; private AtomicInteger intervalErrors; public ReportError(Map<String, Object> topoConf, IStormClusterState stormClusterState, String stormId, String componentId, WorkerTopologyContext workerTopologyContext) { this.topoConf = topoConf; this.stormClusterState = stormClusterState; this.stormId = stormId; this.componentId = componentId; this.workerTopologyContext = workerTopologyContext; this.errorIntervalSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS)); this.maxPerInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL)); this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs()); this.intervalErrors = new AtomicInteger(0); } @Override public void report(Throwable error) { LOG.error("Error", error); if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) { intervalErrors.set(0); intervalStartTime.set(Time.currentTimeSecs()); } if (intervalErrors.incrementAndGet() <= maxPerInterval) { try { stormClusterState.reportError(stormId, componentId, Utils.hostname(), workerTopologyContext.getThisWorkerPort().longValue(), error); } catch (UnknownHostException e) { throw Utils.wrapInRuntime(e); } } } }
- 可以看到这里先判断interval是否需要重置,然后再判断error是否超过interval的最大次数,没有超过的话,则调用stormClusterState.reportError写入到存储,比如zk
StormClusterStateImpl.reportError
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@Override public void reportError(String stormId, String componentId, String node, Long port, Throwable error) { String path = ClusterUtils.errorPath(stormId, componentId); String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId); ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs()); errorInfo.set_host(node); errorInfo.set_port(port.intValue()); byte[] serData = Utils.serialize(errorInfo); stateStorage.mkdirs(path, defaultAcls); stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls); stateStorage.set_data(lastErrorPath, serData, defaultAcls); List<String> childrens = stateStorage.get_children(path, false); Collections.sort(childrens, new Comparator<String>() { public int compare(String arg0, String arg1) { return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1))); } }); while (childrens.size() > 10) { String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0); try { stateStorage.delete_node(znodePath); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { // if the node is already deleted, do nothing LOG.warn("Could not find the znode: {}", znodePath); } else { throw e; } } } }
- 这里使用ClusterUtils.errorPath(stormId, componentId)获取写入的目录,再通过ClusterUtils.lastErrorPath(stormId, componentId)获取写入的路径
- 由于zk不适合存储大量数据,因而这里会判断如果childrens超过10的时候,会删除多余的节点,这里先按照节点名substring(1)升序排序,然后挨个删除
ClusterUtils.errorPath
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
public static final String ZK_SEPERATOR = "/"; public static final String ERRORS_ROOT = "errors"; public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT; public static String errorPath(String stormId, String componentId) { try { return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8"); } catch (UnsupportedEncodingException e) { throw Utils.wrapInRuntime(e); } } public static String lastErrorPath(String stormId, String componentId) { return errorPath(stormId, componentId) + "-last-error"; } public static String errorStormRoot(String stormId) { return ERRORS_SUBTREE + ZK_SEPERATOR + stormId; }
- errorPath的路径为/errors/{stormId}/{componentId},该目录下创建了以e开头的EPHEMERAL_SEQUENTIAL节点,error信息首先追加到该目录下,然后再判断如果超过10个则删除旧的节点
- lastErrorPath的路径为/errors/{stormId}/{componentId}-last-error,用于存储该componentId的最后一个error
zkCli查看
[zk: localhost:2181(CONNECTED) 21] ls /storm/errors [DRPCStateQuery-1-1540185943, reportErrorDemo-1-1540260375] [zk: localhost:2181(CONNECTED) 22] ls /storm/errors/reportErrorDemo-1-1540260375 [print, print-last-error] [zk: localhost:2181(CONNECTED) 23] ls /storm/errors/reportErrorDemo-1-1540260375/print [e0000000291, e0000000290, e0000000295, e0000000294, e0000000293, e0000000292, e0000000299, e0000000298, e0000000297, e0000000296] [zk: localhost:2181(CONNECTED) 24] ls /storm/errors/reportErrorDemo-1-1540260375/print/e0000000299 [] [zk: localhost:2181(CONNECTED) 25] ls /storm/errors/reportErrorDemo-1-1540260375/print-last-error []
storm-ui
curl -i http://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375?sys=false
- storm-ui请求了如上的接口,获取了topology相关的数据,其中spout或bolt中包括了lastError,展示了最近一个的error信息
StormApiResource
storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
/** * /api/v1/topology -> topo. */ @GET @Path("/topology/{id}") @AuthNimbusOp(value = "getTopology", needsTopoId = true) @Produces("application/json") public Response getTopology(@PathParam("id") String id, @DefaultValue(":all-time") @QueryParam("window") String window, @QueryParam("sys") boolean sys, @QueryParam(callbackParameterName) String callback) throws TException { topologyPageRequestMeter.mark(); try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(config)) { return UIHelpers.makeStandardResponse( UIHelpers.getTopologySummary( nimbusClient.getClient().getTopologyPageInfo(id, window, sys), window, config, servletRequest.getRemoteUser() ), callback ); } }
- 这里调用了nimbusClient.getClient().getTopologyPageInfo(id, window, sys)方法
Nimbus.getTopologyPageInfo
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@Override public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys) throws NotAliveException, AuthorizationException, TException { try { getTopologyPageInfoCalls.mark(); CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyPageInfo"); String topoName = common.topoName; IStormClusterState state = stormClusterState; int launchTimeSecs = common.launchTimeSecs; Assignment assignment = common.assignment; Map<List<Integer>, Map<String, Object>> beats = common.beats; Map<Integer, String> taskToComp = common.taskToComponent; StormTopology topology = common.topology; Map<String, Object> topoConf = Utils.merge(conf, common.topoConf); StormBase base = common.base; if (base == null) { throw new WrappedNotAliveException(topoId); } Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId); List<WorkerSummary> workerSummaries = null; Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>(); if (assignment != null) { Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port(); Map<String, String> nodeToHost = assignment.get_node_host(); for (Entry<List<Long>, NodeInfo> entry : execToNodeInfo.entrySet()) { NodeInfo ni = entry.getValue(); List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next()); exec2NodePort.put(entry.getKey(), nodePort); } workerSummaries = StatsUtil.aggWorkerStats(topoId, topoName, taskToComp, beats, exec2NodePort, nodeToHost, workerToResources, includeSys, true); //this is the topology page, so we know the user is authorized } TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId, exec2NodePort, taskToComp, beats, topology, window, includeSys, state); //...... return topoPageInfo; } catch (Exception e) { LOG.warn("Get topo page info exception. (topology id='{}')", topoId, e); if (e instanceof TException) { throw (TException) e; } throw new RuntimeException(e); } }
- 这里调用了StatsUtil.aggTopoExecsStats来获取TopologyPageInfo
StatsUtil.aggTopoExecsStats
storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
/** * aggregate topo executors stats. * * @param topologyId topology id * @param exec2nodePort executor -> host+port * @param task2component task -> component * @param beats executor[start, end] -> executor heartbeat * @param topology storm topology * @param window the window to be aggregated * @param includeSys whether to include system streams * @param clusterState cluster state * @return TopologyPageInfo thrift structure */ public static TopologyPageInfo aggTopoExecsStats( String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) { List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology); Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, beatList); return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState); }
- StatsUtil.aggTopoExecsStats方法最后调用了postAggregateTopoStats方法
StatsUtil.postAggregateTopoStats
storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
private static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData, String topologyId, IStormClusterState clusterState) { TopologyPageInfo ret = new TopologyPageInfo(topologyId); ret.set_num_tasks(task2comp.size()); ret.set_num_workers(((Set) accData.get(WORKERS_SET)).size()); ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0); Map bolt2stats = ClientStatsUtil.getMapByKey(accData, BOLT_TO_STATS); Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>(); for (Object o : bolt2stats.entrySet()) { Map.Entry e = (Map.Entry) o; Map m = (Map) e.getValue(); long executed = getByKeyOr0(m, EXECUTED).longValue(); if (executed > 0) { double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue(); m.put(EXEC_LATENCY, execLatencyTotal / executed); double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue(); m.put(PROC_LATENCY, procLatencyTotal / executed); } m.remove(EXEC_LAT_TOTAL); m.remove(PROC_LAT_TOTAL); String id = (String) e.getKey(); m.put("last-error", getLastError(clusterState, topologyId, id)); aggBolt2stats.put(id, thriftifyBoltAggStats(m)); } //...... return ret; } private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) { return stormClusterState.lastError(stormId, compId); }
- 这里有添加last-error,通过getLastError调用,之后再通过thriftifyBoltAggStats转化到thrift对象
- 这里调用了stormClusterState.lastError(stormId, compId)获取last-error
UIHelpers.getTopologySummary
storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java
/** * getTopologySummary. * @param topologyPageInfo topologyPageInfo * @param window window * @param config config * @param remoteUser remoteUser * @return getTopologySummary */ public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo, String window, Map<String, Object> config, String remoteUser) { Map<String, Object> result = new HashMap(); Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf()); long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS); Map<String, Object> unpackedTopologyPageInfo = unpackTopologyInfo(topologyPageInfo, window, config); result.putAll(unpackedTopologyPageInfo); result.put("user", remoteUser); result.put("window", window); result.put("windowHint", getWindowHint(window)); result.put("msgTimeout", messageTimeout); result.put("configuration", topologyConf); result.put("visualizationTable", new ArrayList()); result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); return result; }
- 获取到TopologyPageInfo之后,UIHelpers.getTopologySummary对其进行unpackTopologyInfo
UIHelpers.unpackTopologyInfo
storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java
/** * unpackTopologyInfo. * @param topologyPageInfo topologyPageInfo * @param window window * @param config config * @return unpackTopologyInfo */ private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) { Map<String, Object> result = new HashMap(); result.put("id", topologyPageInfo.get_id()); //...... Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats(); List<Map> spoutStats = new ArrayList(); for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) { spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey())); } result.put("spouts", spoutStats); Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats(); List<Map> boltStats = new ArrayList(); for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) { boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey())); } result.put("bolts", boltStats); //...... result.put("samplingPct", samplingPct); result.put("replicationCount", topologyPageInfo.get_replication_count()); result.put("topologyVersion", topologyPageInfo.get_topology_version()); result.put("stormVersion", topologyPageInfo.get_storm_version()); return result; } /** * getTopologySpoutAggStatsMap. * @param componentAggregateStats componentAggregateStats * @param spoutId spoutId * @return getTopologySpoutAggStatsMap */ private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats, String spoutId) { Map<String, Object> result = new HashMap(); CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); result.putAll(getCommonAggStatsMap(commonStats)); result.put("spoutId", spoutId); result.put("encodedSpoutId", URLEncoder.encode(spoutId)); SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout(); result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms()); ErrorInfo lastError = componentAggregateStats.get_last_error(); result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); return result; } /** * getTopologyBoltAggStatsMap. * @param componentAggregateStats componentAggregateStats * @param boltId boltId * @return getTopologyBoltAggStatsMap */ private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats, String boltId) { Map<String, Object> result = new HashMap(); CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); result.putAll(getCommonAggStatsMap(commonStats)); result.put("boltId", boltId); result.put("encodedBoltId", URLEncoder.encode(boltId)); BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt(); result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity())); result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); result.put("executed", boltAggregateStats.get_executed()); result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); ErrorInfo lastError = componentAggregateStats.get_last_error(); result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); return result; } /** * getTruncatedErrorString. * @param errorString errorString * @return getTruncatedErrorString */ private static String getTruncatedErrorString(String errorString) { return errorString.substring(0, Math.min(errorString.length(), 200)); }
- 注意这里对spout调用了getTopologySpoutAggStatsMap,对bolt调用了getTopologyBoltAggStatsMap
- 这两个方法对lastError都进行了getTruncatedErrorString处理,最大只substring(0,200)
crash log
2018-10-23 02:53:28.118 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Async loop died! java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2] at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?] at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2] ... 6 more 2018-10-23 02:53:28.129 o.a.s.d.executor Thread-10-print-executor[7 7] [ERROR] java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2] at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?] at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2] ... 6 more 2018-10-23 02:53:28.175 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died") at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.2.jar:1.2.2] at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?] at org.apache.storm.daemon.worker$fn__11404$fn__11405.invoke(worker.clj:792) [storm-core-1.2.2.jar:1.2.2] at org.apache.storm.daemon.executor$mk_executor_data$fn__10612$fn__10613.invoke(executor.clj:281) [storm-core-1.2.2.jar:1.2.2] at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] 2018-10-23 02:53:28.176 o.a.s.d.worker Thread-41 [INFO] Shutting down worker reportErrorDemo-2-1540263136 f9856902-cfe9-45c7-b675-93a29d3d3d36 6700 2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Terminating messaging context 2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Shutting down executors 2018-10-23 02:53:28.177 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[8 8] 2018-10-23 02:53:28.182 o.a.s.util Thread-3-disruptor-executor[8 8]-send-queue [INFO] Async loop interrupted! 2018-10-23 02:53:28.186 o.a.s.util Thread-4-spout-executor[8 8] [INFO] Async loop interrupted! 2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[8 8] 2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[12 12] 2018-10-23 02:53:28.189 o.a.s.util Thread-5-disruptor-executor[12 12]-send-queue [INFO] Async loop interrupted! 2018-10-23 02:53:28.190 o.a.s.util Thread-6-spout-executor[12 12] [INFO] Async loop interrupted! 2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[12 12] 2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shutting down executor count:[2 2] 2018-10-23 02:53:28.191 o.a.s.util Thread-7-disruptor-executor[2 2]-send-queue [INFO] Async loop interrupted! 2018-10-23 02:53:28.193 o.a.s.util Thread-8-count-executor[2 2] [INFO] Async loop interrupted! 2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shut down executor count:[2 2] 2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shutting down executor print:[7 7] 2018-10-23 02:53:28.196 o.a.s.util Thread-9-disruptor-executor[7 7]-send-queue [INFO] Async loop interrupted!
小结
- spout或bolt的方法里头如果抛出异常会导致整个worker die掉,同时也会自动记录异常到zk但是代价就是worker die掉不断被重启
- reportError可以通过try catch结合使用,使得有异常之后,worker不会die掉,同时也把error信息记录起来;不过一个topology的同一个component也只记录最近10个异常,采用的是EPHEMERAL_SEQUENTIAL节点来保存,随着worker的die而销毁;lastError采用的是PERSISTENT节点。两者在topology被kill的时候相关信息都会被删掉。
- storm-ui展示了每个component的lastError信息,展示的时候错误信息的长度最大为200