聊聊storm的IEventLogger
序
本文主要研究一下storm的IEventLogger
IEventLogger
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java
/** * EventLogger interface for logging the event info to a sink like log file or db for inspecting the events via UI for debugging. */ public interface IEventLogger { void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context); /** * This method would be invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that has event logging * enabled. * * @param e the event */ void log(EventInfo e); void close(); /** * A wrapper for the fields that we would log. */ class EventInfo { private long ts; private String component; private int task; private Object messageId; private List<Object> values; public EventInfo(long ts, String component, int task, Object messageId, List<Object> values) { this.ts = ts; this.component = component; this.task = task; this.messageId = messageId; this.values = values; } public long getTs() { return ts; } public String getComponent() { return component; } public int getTask() { return task; } public Object getMessageId() { return messageId; } public List<Object> getValues() { return values; } /** * Returns a default formatted string with fields separated by "," * * @return a default formatted string with fields separated by "," */ @Override public String toString() { return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + "," + (messageId == null ? "" : messageId.toString()) + "," + values.toString(); } } }
- IEventLogger定义了log方法,同时也定义了EventInfo对象
FileBasedEventLogger
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
public class FileBasedEventLogger implements IEventLogger { private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class); private static final int FLUSH_INTERVAL_MILLIS = 1000; private Path eventLogPath; private BufferedWriter eventLogWriter; private ScheduledExecutorService flushScheduler; private volatile boolean dirty = false; private void initLogWriter(Path logFilePath) { try { LOG.info("logFilePath {}", logFilePath); eventLogPath = logFilePath; eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND); } catch (IOException e) { LOG.error("Error setting up FileBasedEventLogger.", e); throw new RuntimeException(e); } } private void setUpFlushTask() { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("event-logger-flush-%d") .setDaemon(true) .build(); flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); Runnable runnable = new Runnable() { @Override public void run() { try { if (dirty) { eventLogWriter.flush(); dirty = false; } } catch (IOException ex) { LOG.error("Error flushing " + eventLogPath, ex); throw new RuntimeException(ex); } } }; flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); } @Override public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) { String stormId = context.getStormId(); int port = context.getThisWorkerPort(); /* * Include the topology name & worker port in the file name so that * multiple event loggers can log independently. */ String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port); Path path = Paths.get(workersArtifactRoot, "events.log"); File dir = path.toFile().getParentFile(); if (!dir.exists()) { dir.mkdirs(); } initLogWriter(path); setUpFlushTask(); } @Override public void log(EventInfo event) { try { //TODO: file rotation eventLogWriter.write(buildLogMessage(event)); eventLogWriter.newLine(); dirty = true; } catch (IOException ex) { LOG.error("Error logging event {}", event, ex); throw new RuntimeException(ex); } } protected String buildLogMessage(EventInfo event) { return event.toString(); } @Override public void close() { try { eventLogWriter.close(); } catch (IOException ex) { LOG.error("Error closing event log.", ex); } closeFlushScheduler(); } private void closeFlushScheduler() { if (flushScheduler != null) { flushScheduler.shutdown(); try { if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) { flushScheduler.shutdownNow(); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted flushScheduler.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } } }
- IEventLogger默认的实现为FileBasedEventLogger,它启动一个定时任务,每隔FLUSH_INTERVAL_MILLIS时间将数据flush到磁盘(
如果是dirty的话
) - 默认的文件路径为workersArtifactRoot目录下的events.log
StormCommon.addEventLogger
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
public static void addEventLogger(Map<String, Object> conf, StormTopology topology) { Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); if (numExecutors == null || numExecutors == 0) { return; } HashMap<String, Object> componentConf = new HashMap<>(); componentConf.put(Config.TOPOLOGY_TASKS, numExecutors); componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails( eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf); for (Object component : allComponents(topology).values()) { ComponentCommon common = getComponentCommon(component); common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields())); } topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt); } public static List<String> eventLoggerBoltFields() { return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES); } public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) { Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>(); Set<String> allIds = new HashSet<String>(); allIds.addAll(topology.get_bolts().keySet()); allIds.addAll(topology.get_spouts().keySet()); for (String id : allIds) { inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id"))); } return inputs; }
- 这里从Config.TOPOLOGY_EVENTLOGGER_EXECUTORS读取numExecutors,如果为null则使用Config.TOPOLOGY_WORKERS的值,默认是0,即禁用event logger
- 这里还读取了Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS作为Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
- 这里创建了EventLoggerBolt,该bolt使用了fieldsGrouping("component-id")以及Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID)将所有的spout及bolt都作为该bolt的inputs,从而接收所有的tuple,其字段为ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES;同时也会对每个spout或bolt添加一个输出到名为EVENTLOGGER_STREAM_ID的stream的声明,这样使得数据得以流向EventLoggerBolt
EventLoggerBolt
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/EventLoggerBolt.java
public class EventLoggerBolt implements IBolt { /* The below field declarations are also used in common.clj to define the event logger output fields */ public static final String FIELD_TS = "ts"; public static final String FIELD_VALUES = "values"; public static final String FIELD_COMPONENT_ID = "component-id"; public static final String FIELD_MESSAGE_ID = "message-id"; private static final Logger LOG = LoggerFactory.getLogger(EventLoggerBolt.class); private List<IEventLogger> eventLoggers; @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { LOG.info("EventLoggerBolt prepare called"); eventLoggers = new ArrayList<>(); List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_EVENT_LOGGER_REGISTER); if (registerInfo != null && !registerInfo.isEmpty()) { initializeEventLoggers(topoConf, context, registerInfo); } else { initializeDefaultEventLogger(topoConf, context); } } @Override public void execute(Tuple input) { LOG.debug("** EventLoggerBolt got tuple from sourceComponent {}, with values {}", input.getSourceComponent(), input.getValues()); Object msgId = input.getValueByField(FIELD_MESSAGE_ID); EventInfo eventInfo = new EventInfo(input.getLongByField(FIELD_TS), input.getSourceComponent(), input.getSourceTask(), msgId, (List<Object>) input.getValueByField(FIELD_VALUES)); for (IEventLogger eventLogger : eventLoggers) { eventLogger.log(eventInfo); } } @Override public void cleanup() { for (IEventLogger eventLogger : eventLoggers) { eventLogger.close(); } } private void initializeEventLoggers(Map<String, Object> topoConf, TopologyContext context, List<Map<String, Object>> registerInfo) { for (Map<String, Object> info : registerInfo) { String className = (String) info.get(TOPOLOGY_EVENT_LOGGER_CLASS); Map<String, Object> arguments = (Map<String, Object>) info.get(TOPOLOGY_EVENT_LOGGER_ARGUMENTS); IEventLogger eventLogger; try { eventLogger = (IEventLogger) Class.forName(className).newInstance(); } catch (Exception e) { throw new RuntimeException("Could not instantiate a class listed in config under section " + Config.TOPOLOGY_EVENT_LOGGER_REGISTER + " with fully qualified name " + className, e); } eventLogger.prepare(topoConf, arguments, context); eventLoggers.add(eventLogger); } } private void initializeDefaultEventLogger(Map<String, Object> topoConf, TopologyContext context) { FileBasedEventLogger eventLogger = new FileBasedEventLogger(); eventLogger.prepare(topoConf, null, context); eventLoggers.add(eventLogger); } }
- EventLoggerBolt在prepare的时候,从topoConf读取Config.TOPOLOGY_EVENT_LOGGER_REGISTER信息,如果registerInfo为空的话则使用默认的FileBasedEventLogger,否则按registerInfo中注册的eventLoggers来初始化
- 这里的execute方法就是挨个遍历eventLoggers,然后调用log方法
小结
- 要开启EventLogger的话,要设置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS的值大于0(
conf.setNumEventLoggers
),默认为0,即禁用。开启了event logger的话,可以点击spout或bolt的debug,然后打开events链接,就可以在界面上查看debug期间的tuple数据。 - 设置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS大于0了之后,如果没有自己设置Config.TOPOLOGY_EVENT_LOGGER_REGISTER,则默认启用的是FileBasedEventLogger,当开启spout或bolt的debug的时候,会将EventInfo打印到workersArtifactRoot目录下的events.log
- 如果自定义了Config.TOPOLOGY_EVENT_LOGGER_REGISTER(
conf.registerEventLogger
),则StormCommon采用的是该配置来初始化EventLogger,默认的FileBasedEventLogger如果没有被设置进去的话,则不会被初始化;StormCommon在addEventLogger的时候,对所有的spout及bolt增加一个declareStream,输出到EVENTLOGGER_STREAM_ID;同时对EventLoggerBolt通过类似fieldsGrouping(componentId,Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),new Fields("component-id"))将所有的spout及bolt作为inputs;输入到EventLoggerBolt的tuple的字段为ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES
doc
相关推荐
枫叶上的雨露 2020-05-02
LandryBean 2020-03-12
一名java从业者 2020-01-09
weeniebear 2013-03-25
weeniebear 2014-05-28
sfqbluesky 2019-12-12
AbnerSunYH 2016-08-12
weeniebear 2016-08-11
Stereo 2016-07-27
芒果先生Mango 2018-05-31
dykun 2019-08-16
GimmeS 2016-10-11
benbendy 2016-09-30
Johnhao 2016-09-30
AbnerSunYH 2016-04-28
benbendy 2016-04-15