聊聊storm的IEventLogger

本文主要研究一下storm的IEventLogger

IEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java

/**
 * EventLogger interface for logging the event info to a sink like log file or db for inspecting the events via UI for debugging.
 */
public interface IEventLogger {

    void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context);

    /**
     * This method would be invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that has event logging
     * enabled.
     *
     * @param e the event
     */
    void log(EventInfo e);

    void close();

    /**
     * A wrapper for the fields that we would log.
     */
    class EventInfo {
        private long ts;
        private String component;
        private int task;
        private Object messageId;
        private List<Object> values;

        public EventInfo(long ts, String component, int task, Object messageId, List<Object> values) {
            this.ts = ts;
            this.component = component;
            this.task = task;
            this.messageId = messageId;
            this.values = values;
        }

        public long getTs() {
            return ts;
        }

        public String getComponent() {
            return component;
        }

        public int getTask() {
            return task;
        }

        public Object getMessageId() {
            return messageId;
        }

        public List<Object> getValues() {
            return values;
        }

        /**
         * Returns a default formatted string with fields separated by ","
         *
         * @return a default formatted string with fields separated by ","
         */
        @Override
        public String toString() {
            return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + ","
                   + (messageId == null ? "" : messageId.toString()) + "," + values.toString();
        }
    }
}
  • IEventLogger定义了log方法,同时也定义了EventInfo对象

FileBasedEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java

public class FileBasedEventLogger implements IEventLogger {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);

    private static final int FLUSH_INTERVAL_MILLIS = 1000;

    private Path eventLogPath;
    private BufferedWriter eventLogWriter;
    private ScheduledExecutorService flushScheduler;
    private volatile boolean dirty = false;

    private void initLogWriter(Path logFilePath) {
        try {
            LOG.info("logFilePath {}", logFilePath);
            eventLogPath = logFilePath;
            eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
                                                     StandardOpenOption.WRITE, StandardOpenOption.APPEND);
        } catch (IOException e) {
            LOG.error("Error setting up FileBasedEventLogger.", e);
            throw new RuntimeException(e);
        }
    }


    private void setUpFlushTask() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("event-logger-flush-%d")
            .setDaemon(true)
            .build();

        flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    if (dirty) {
                        eventLogWriter.flush();
                        dirty = false;
                    }
                } catch (IOException ex) {
                    LOG.error("Error flushing " + eventLogPath, ex);
                    throw new RuntimeException(ex);
                }
            }
        };

        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
    }


    @Override
    public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
        String stormId = context.getStormId();
        int port = context.getThisWorkerPort();

        /*
         * Include the topology name & worker port in the file name so that
         * multiple event loggers can log independently.
         */
        String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);

        Path path = Paths.get(workersArtifactRoot, "events.log");
        File dir = path.toFile().getParentFile();
        if (!dir.exists()) {
            dir.mkdirs();
        }
        initLogWriter(path);
        setUpFlushTask();
    }

    @Override
    public void log(EventInfo event) {
        try {
            //TODO: file rotation
            eventLogWriter.write(buildLogMessage(event));
            eventLogWriter.newLine();
            dirty = true;
        } catch (IOException ex) {
            LOG.error("Error logging event {}", event, ex);
            throw new RuntimeException(ex);
        }
    }

    protected String buildLogMessage(EventInfo event) {
        return event.toString();
    }

    @Override
    public void close() {
        try {
            eventLogWriter.close();

        } catch (IOException ex) {
            LOG.error("Error closing event log.", ex);
        }

        closeFlushScheduler();
    }

    private void closeFlushScheduler() {
        if (flushScheduler != null) {
            flushScheduler.shutdown();
            try {
                if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) {
                    flushScheduler.shutdownNow();
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted
                flushScheduler.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    }
}
  • IEventLogger默认的实现为FileBasedEventLogger,它启动一个定时任务,每隔FLUSH_INTERVAL_MILLIS时间将数据flush到磁盘(如果是dirty的话)
  • 默认的文件路径为workersArtifactRoot目录下的events.log

StormCommon.addEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
        Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
                                                   ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        if (numExecutors == null || numExecutors == 0) {
            return;
        }
        HashMap<String, Object> componentConf = new HashMap<>();
        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
            eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);

        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
        }
        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
    }

    public static List<String> eventLoggerBoltFields() {
        return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
                             EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
    }

    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
        Set<String> allIds = new HashSet<String>();
        allIds.addAll(topology.get_bolts().keySet());
        allIds.addAll(topology.get_spouts().keySet());

        for (String id : allIds) {
            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
        }
        return inputs;
    }
  • 这里从Config.TOPOLOGY_EVENTLOGGER_EXECUTORS读取numExecutors,如果为null则使用Config.TOPOLOGY_WORKERS的值,默认是0,即禁用event logger
  • 这里还读取了Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS作为Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
  • 这里创建了EventLoggerBolt,该bolt使用了fieldsGrouping("component-id")以及Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID)将所有的spout及bolt都作为该bolt的inputs,从而接收所有的tuple,其字段为ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES;同时也会对每个spout或bolt添加一个输出到名为EVENTLOGGER_STREAM_ID的stream的声明,这样使得数据得以流向EventLoggerBolt

EventLoggerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/EventLoggerBolt.java

public class EventLoggerBolt implements IBolt {

    /*
     The below field declarations are also used in common.clj to define the event logger output fields
      */
    public static final String FIELD_TS = "ts";
    public static final String FIELD_VALUES = "values";
    public static final String FIELD_COMPONENT_ID = "component-id";
    public static final String FIELD_MESSAGE_ID = "message-id";
    private static final Logger LOG = LoggerFactory.getLogger(EventLoggerBolt.class);
    private List<IEventLogger> eventLoggers;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        LOG.info("EventLoggerBolt prepare called");

        eventLoggers = new ArrayList<>();
        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_EVENT_LOGGER_REGISTER);
        if (registerInfo != null && !registerInfo.isEmpty()) {
            initializeEventLoggers(topoConf, context, registerInfo);
        } else {
            initializeDefaultEventLogger(topoConf, context);
        }
    }

    @Override
    public void execute(Tuple input) {
        LOG.debug("** EventLoggerBolt got tuple from sourceComponent {}, with values {}", input.getSourceComponent(), input.getValues());

        Object msgId = input.getValueByField(FIELD_MESSAGE_ID);
        EventInfo eventInfo = new EventInfo(input.getLongByField(FIELD_TS), input.getSourceComponent(),
                                            input.getSourceTask(), msgId, (List<Object>) input.getValueByField(FIELD_VALUES));

        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.log(eventInfo);
        }
    }

    @Override
    public void cleanup() {
        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.close();
        }
    }

    private void initializeEventLoggers(Map<String, Object> topoConf, TopologyContext context, List<Map<String, Object>> registerInfo) {
        for (Map<String, Object> info : registerInfo) {
            String className = (String) info.get(TOPOLOGY_EVENT_LOGGER_CLASS);
            Map<String, Object> arguments = (Map<String, Object>) info.get(TOPOLOGY_EVENT_LOGGER_ARGUMENTS);

            IEventLogger eventLogger;
            try {
                eventLogger = (IEventLogger) Class.forName(className).newInstance();
            } catch (Exception e) {
                throw new RuntimeException("Could not instantiate a class listed in config under section "
                                           + Config.TOPOLOGY_EVENT_LOGGER_REGISTER + " with fully qualified name " + className, e);
            }

            eventLogger.prepare(topoConf, arguments, context);
            eventLoggers.add(eventLogger);
        }
    }

    private void initializeDefaultEventLogger(Map<String, Object> topoConf, TopologyContext context) {
        FileBasedEventLogger eventLogger = new FileBasedEventLogger();
        eventLogger.prepare(topoConf, null, context);
        eventLoggers.add(eventLogger);
    }
}
  • EventLoggerBolt在prepare的时候,从topoConf读取Config.TOPOLOGY_EVENT_LOGGER_REGISTER信息,如果registerInfo为空的话则使用默认的FileBasedEventLogger,否则按registerInfo中注册的eventLoggers来初始化
  • 这里的execute方法就是挨个遍历eventLoggers,然后调用log方法

小结

  • 要开启EventLogger的话,要设置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS的值大于0(conf.setNumEventLoggers),默认为0,即禁用。开启了event logger的话,可以点击spout或bolt的debug,然后打开events链接,就可以在界面上查看debug期间的tuple数据。
  • 设置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS大于0了之后,如果没有自己设置Config.TOPOLOGY_EVENT_LOGGER_REGISTER,则默认启用的是FileBasedEventLogger,当开启spout或bolt的debug的时候,会将EventInfo打印到workersArtifactRoot目录下的events.log
  • 如果自定义了Config.TOPOLOGY_EVENT_LOGGER_REGISTER(conf.registerEventLogger),则StormCommon采用的是该配置来初始化EventLogger,默认的FileBasedEventLogger如果没有被设置进去的话,则不会被初始化;StormCommon在addEventLogger的时候,对所有的spout及bolt增加一个declareStream,输出到EVENTLOGGER_STREAM_ID;同时对EventLoggerBolt通过类似fieldsGrouping(componentId,Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),new Fields("component-id"))将所有的spout及bolt作为inputs;输入到EventLoggerBolt的tuple的字段为ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES

doc

相关推荐