聊聊storm的WindowedBoltExecutor
序
本文主要研究一下storm的WindowedBoltExecutor
WindowedBoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
/** * An {@link IWindowedBolt} wrapper that does the windowing of tuples. */ public class WindowedBoltExecutor implements IRichBolt { public static final String LATE_TUPLE_FIELD = "late_tuple"; private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s private static final int DEFAULT_MAX_LAG_MS = 0; // no lag private final IWindowedBolt bolt; // package level for unit tests transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator; private transient WindowedOutputCollector windowedOutputCollector; private transient WindowLifecycleListener<Tuple> listener; private transient WindowManager<Tuple> windowManager; private transient int maxLagMs; private TimestampExtractor timestampExtractor; private transient String lateTupleStream; private transient TriggerPolicy<Tuple, ?> triggerPolicy; private transient EvictionPolicy<Tuple, ?> evictionPolicy; private transient Duration windowLengthDuration; public WindowedBoltExecutor(IWindowedBolt bolt) { this.bolt = bolt; timestampExtractor = bolt.getTimestampExtractor(); } @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { doPrepare(topoConf, context, collector, new ConcurrentLinkedQueue<>(), false); } // NOTE: the queue has to be thread safe. protected void doPrepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector, Collection<Event<Tuple>> queue, boolean stateful) { Objects.requireNonNull(topoConf); Objects.requireNonNull(context); Objects.requireNonNull(collector); Objects.requireNonNull(queue); this.windowedOutputCollector = new WindowedOutputCollector(collector); bolt.prepare(topoConf, context, windowedOutputCollector); this.listener = newWindowLifecycleListener(); this.windowManager = initWindowManager(listener, topoConf, context, queue, stateful); start(); LOG.info("Initialized window manager {} ", windowManager); } @Override public void execute(Tuple input) { if (isTupleTs()) { long ts = timestampExtractor.extractTimestamp(input); if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) { windowManager.add(input, ts); } else { if (lateTupleStream != null) { windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); } else { LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts); } windowedOutputCollector.ack(input); } } else { windowManager.add(input); } } @Override public void cleanup() { if (waterMarkEventGenerator != null) { waterMarkEventGenerator.shutdown(); } windowManager.shutdown(); bolt.cleanup(); } // for unit tests WindowManager<Tuple> getWindowManager() { return windowManager; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); if (lateTupleStream != null) { declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD)); } bolt.declareOutputFields(declarer); } @Override public Map<String, Object> getComponentConfiguration() { return bolt.getComponentConfiguration(); } //...... }
- WindowedBoltExecutor实现了IRichBolt接口,在prepare的时候初始化windowedOutputCollector、listener、windowManager,调用了bolt.prepare;在cleanup的时候对waterMarkEventGenerator、windowManager、bolt进行清理;TopologyBuilder在setBolt的时候,对原始的IWindowedBolt的实现类进行了一次包装,用WindowedBoltExecutor替代
- declareOutputFields采用的是bolt.declareOutputFields(declarer);getComponentConfiguration也返回的是bolt.getComponentConfiguration();
- execute方法主要是将tuple添加到windowManager,对于不纳入window的tuple则立刻进行ack
WindowedOutputCollector
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
/** * Creates an {@link OutputCollector} wrapper that automatically anchors the tuples to inputTuples while emitting. */ private static class WindowedOutputCollector extends OutputCollector { private List<Tuple> inputTuples; WindowedOutputCollector(IOutputCollector delegate) { super(delegate); } void setContext(List<Tuple> inputTuples) { this.inputTuples = inputTuples; } @Override public List<Integer> emit(String streamId, List<Object> tuple) { return emit(streamId, inputTuples, tuple); } @Override public void emitDirect(int taskId, String streamId, List<Object> tuple) { emitDirect(taskId, streamId, inputTuples, tuple); } }
- WindowedOutputCollector继承了OutputCollector,可以看到这里重写了emit计emitDirect方法,默认对inputTuples进行anchor
WindowLifecycleListener
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java
/** * A callback for expiry, activation of events tracked by the {@link WindowManager} * * @param <T> The type of Event in the window (e.g. Tuple). */ public interface WindowLifecycleListener<T> { /** * Called on expiry of events from the window due to {@link EvictionPolicy} * * @param events the expired events */ void onExpiry(List<T> events); /** * Called on activation of the window due to the {@link TriggerPolicy} * * @param events the list of current events in the window. * @param newEvents the newly added events since last activation. * @param expired the expired events since last activation. * @param referenceTime the reference (event or processing) time that resulted in activation */ default void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long referenceTime) { throw new UnsupportedOperationException("Not implemented"); } /** * Called on activation of the window due to the {@link TriggerPolicy}. This is typically invoked when the windows are persisted in * state and is huge to be loaded entirely in memory. * * @param eventsIt a supplier of iterator over the list of current events in the window * @param newEventsIt a supplier of iterator over the newly added events since the last ativation * @param expiredIt a supplier of iterator over the expired events since the last activation * @param referenceTime the reference (event or processing) time that resulted in activation */ default void onActivation(Supplier<Iterator<T>> eventsIt, Supplier<Iterator<T>> newEventsIt, Supplier<Iterator<T>> expiredIt, Long referenceTime) { throw new UnsupportedOperationException("Not implemented"); } }
- WindowLifecycleListener定义了几个回调方法,分别是onExpiry、onActivation
- 它们分别是由EvictionPolicy、TriggerPolicy两种策略来触发
EvictionPolicy
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
/** * Eviction policy tracks events and decides whether an event should be evicted from the window or not. * * @param <T> the type of event that is tracked. */ public interface EvictionPolicy<T, S> { /** * Decides if an event should be expired from the window, processed in the current window or kept for later processing. * * @param event the input event * @return the {@link org.apache.storm.windowing.EvictionPolicy.Action} to be taken based on the input event */ Action evict(Event<T> event); /** * Tracks the event to later decide whether {@link EvictionPolicy#evict(Event)} should evict it or not. * * @param event the input event to be tracked */ void track(Event<T> event); /** * Returns the current context that is part of this eviction policy. * * @return the eviction context */ EvictionContext getContext(); /** * Sets a context in the eviction policy that can be used while evicting the events. E.g. For TimeEvictionPolicy, this could be used to * set the reference timestamp. * * @param context the eviction context */ void setContext(EvictionContext context); /** * Resets the eviction policy. */ void reset(); /** * Return runtime state to be checkpointed by the framework for restoring the eviction policy in case of failures. * * @return the state */ S getState(); /** * Restore the eviction policy from the state that was earlier checkpointed by the framework. * * @param state the state */ void restoreState(S state); /** * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked. */ public enum Action { /** * expire the event and remove it from the queue. */ EXPIRE, /** * process the event in the current window of events. */ PROCESS, /** * don't include in the current window but keep the event in the queue for evaluating as a part of future windows. */ KEEP, /** * stop processing the queue, there cannot be anymore events satisfying the eviction policy. */ STOP } }
- EvictionPolicy主要负责追踪event,然后判断event是否该从window中移除
- EvictionPolicy有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicy
TriggerPolicy
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/TriggerPolicy.java
/** * Triggers the window calculations based on the policy. * * @param <T> the type of the event that is tracked */ public interface TriggerPolicy<T, S> { /** * Tracks the event and could use this to invoke the trigger. * * @param event the input event */ void track(Event<T> event); /** * resets the trigger policy. */ void reset(); /** * Starts the trigger policy. This can be used during recovery to start the triggers after recovery is complete. */ void start(); /** * Any clean up could be handled here. */ void shutdown(); /** * Return runtime state to be checkpointed by the framework for restoring the trigger policy in case of failures. * * @return the state */ S getState(); /** * Restore the trigger policy from the state that was earlier checkpointed by the framework. * * @param state the state */ void restoreState(S state); }
- TriggerPolicy主要是负责window的计算
- TriggerPolicy有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicy
WindowedBoltExecutor.newWindowLifecycleListener
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() { return new WindowLifecycleListener<Tuple>() { @Override public void onExpiry(List<Tuple> tuples) { for (Tuple tuple : tuples) { windowedOutputCollector.ack(tuple); } } @Override public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { windowedOutputCollector.setContext(tuples); boltExecute(tuples, newTuples, expiredTuples, timestamp); } }; } protected void boltExecute(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp)); }
- 这里创建了一个匿名的WindowLifecycleListener实现
- 在onExpiry的时候挨个对tuple进行ack,在onActivation的时候,调用了boltExecute,构造TupleWindowImpl,传递给bolt进行执行
WindowedBoltExecutor.initWindowManager
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf, TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) { WindowManager<Tuple> manager = stateful ? new StatefulWindowManager<>(lifecycleListener, queue) : new WindowManager<>(lifecycleListener, queue); Count windowLengthCount = null; Duration slidingIntervalDuration = null; Count slidingIntervalCount = null; // window length if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) { windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) { windowLengthDuration = new Duration( ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } // sliding interval if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) { slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) { slidingIntervalDuration = new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } else { // default is a sliding window of count 1 slidingIntervalCount = new Count(1); } // tuple ts if (timestampExtractor != null) { // late tuple stream lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); if (lateTupleStream != null) { if (!context.getThisStreams().contains(lateTupleStream)) { throw new IllegalArgumentException( "Stream for late tuples must be defined with the builder method withLateTupleStream"); } } // max lag if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) { maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue(); } else { maxLagMs = DEFAULT_MAX_LAG_MS; } // watermark interval int watermarkInterval; if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) { watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue(); } else { watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS; } waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval, maxLagMs, getComponentStreams(context)); } else { if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) { throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field"); } } // validate validate(topoConf, windowLengthCount, windowLengthDuration, slidingIntervalCount, slidingIntervalDuration); evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, evictionPolicy); manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); return manager; } private EvictionPolicy<Tuple, ?> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) { if (windowLengthCount != null) { if (isTupleTs()) { return new WatermarkCountEvictionPolicy<>(windowLengthCount.value); } else { return new CountEvictionPolicy<>(windowLengthCount.value); } } else { if (isTupleTs()) { return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs); } else { return new TimeEvictionPolicy<>(windowLengthDuration.value); } } } private TriggerPolicy<Tuple, ?> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration, WindowManager<Tuple> manager, EvictionPolicy<Tuple, ?> evictionPolicy) { if (slidingIntervalCount != null) { if (isTupleTs()) { return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager); } else { return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy); } } else { if (isTupleTs()) { return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager); } else { return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy); } } }
- 对于WindowedBoltExecutor来说,stateful为false,这里创建的是WindowManager
- 这里默认的DEFAULT_MAX_LAG_MS为0,即没有lag,默认的DEFAULT_WATERMARK_EVENT_INTERVAL_MS为1000,即1秒
- 这里根据windowLength及slidingInterval指定的参数类型,来获取相应的EvictionPolicy及TriggerPolicy,对于有配置timestampField的,参数是Duration的,则创建的是WatermarkTimeEvictionPolicy以及WatermarkTimeTriggerPolicy
WindowManager
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
/** * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks on expiry of events or activation of the window due to * {@link TriggerPolicy}. * * @param <T> the type of event in the window. */ public class WindowManager<T> implements TriggerHandler { protected final Collection<Event<T>> queue; private final AtomicInteger eventsSinceLastExpiry; //...... /** * Add an event into the window, with the given ts as the tracking ts. * * @param event the event to track * @param ts the timestamp */ public void add(T event, long ts) { add(new EventImpl<T>(event, ts)); } /** * Tracks a window event * * @param windowEvent the window event to track */ public void add(Event<T> windowEvent) { // watermark events are not added to the queue. if (!windowEvent.isWatermark()) { queue.add(windowEvent); } else { LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp()); } track(windowEvent); compactWindow(); } /** * feed the event to the eviction and trigger policies for bookkeeping and optionally firing the trigger. */ private void track(Event<T> windowEvent) { evictionPolicy.track(windowEvent); triggerPolicy.track(windowEvent); } /** * expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big. */ protected void compactWindow() { if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) { scanEvents(false); } } /** * Scan events in the queue, using the expiration policy to check if the event should be evicted or not. * * @param fullScan if set, will scan the entire queue; if not set, will stop as soon as an event not satisfying the expiration policy is * found * @return the list of events to be processed as a part of the current window */ private List<Event<T>> scanEvents(boolean fullScan) { LOG.debug("Scan events, eviction policy {}", evictionPolicy); List<T> eventsToExpire = new ArrayList<>(); List<Event<T>> eventsToProcess = new ArrayList<>(); try { lock.lock(); Iterator<Event<T>> it = queue.iterator(); while (it.hasNext()) { Event<T> windowEvent = it.next(); Action action = evictionPolicy.evict(windowEvent); if (action == EXPIRE) { eventsToExpire.add(windowEvent.get()); it.remove(); } else if (!fullScan || action == STOP) { break; } else if (action == PROCESS) { eventsToProcess.add(windowEvent); } } expiredEvents.addAll(eventsToExpire); } finally { lock.unlock(); } eventsSinceLastExpiry.set(0); LOG.debug("[{}] events expired from window.", eventsToExpire.size()); if (!eventsToExpire.isEmpty()) { LOG.debug("invoking windowLifecycleListener.onExpiry"); windowLifecycleListener.onExpiry(eventsToExpire); } return eventsToProcess; } //...... }
- WindowedBoltExecutor的execute主要是将tuple添加到windowManager
- EventImpl的isWatermark返回false,这里主要是执行track及compactWindow操作
- track主要是委托给evictionPolicy以及triggerPolicy进行track,compactWindow在events超过指定阈值的时候,会触发scanEvents,不是fullScan的话,检测到一个非过期的event就跳出遍历,然后检测eventsToExpire是否为空如果有则触发windowLifecycleListener.onExpiry(eventsToExpire);
WaterMarkEventGenerator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
/** * Tracks tuples across input streams and periodically emits watermark events. Watermark event timestamp is the minimum of the latest tuple * timestamps across all the input streams (minus the lag). Once a watermark event is emitted any tuple coming with an earlier timestamp can * be considered as late events. */ public class WaterMarkEventGenerator<T> implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class); private final WindowManager<T> windowManager; private final int eventTsLag; private final Set<GlobalStreamId> inputStreams; private final Map<GlobalStreamId, Long> streamToTs; private final ScheduledExecutorService executorService; private final int interval; private ScheduledFuture<?> executorFuture; private volatile long lastWaterMarkTs; //...... public void start() { this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); } @Override public void run() { try { long waterMarkTs = computeWaterMarkTs(); if (waterMarkTs > lastWaterMarkTs) { this.windowManager.add(new WaterMarkEvent<>(waterMarkTs)); lastWaterMarkTs = waterMarkTs; } } catch (Throwable th) { LOG.error("Failed while processing watermark event ", th); throw th; } } }
- WindowedBoltExecutor在start的时候会调用WaterMarkEventGenerator的start方法
- 该方法每隔watermarkInterval时间调度WaterMarkEventGenerator这个任务
- 其run方法就是计算watermark(
这批数据最小值-lag
),当大于lastWaterMarkTs时,更新lastWaterMarkTs,往windowManager添加WaterMarkEvent(该event的isWatermark为true
) - windowManager.add(new WaterMarkEvent<>(waterMarkTs))会触发triggerPolicy.track(windowEvent)以及compactWindow操作
WatermarkTimeTriggerPolicy.track
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
@Override public void track(Event<T> event) { if (started && event.isWatermark()) { handleWaterMarkEvent(event); } } /** * Invokes the trigger all pending windows up to the watermark timestamp. The end ts of the window is set in the eviction policy context * so that the events falling within that window can be processed. */ private void handleWaterMarkEvent(Event<T> event) { long watermarkTs = event.getTimestamp(); long windowEndTs = nextWindowEndTs; LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs); while (windowEndTs <= watermarkTs) { long currentCount = windowManager.getEventCount(windowEndTs); evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount)); if (handler.onTrigger()) { windowEndTs += slidingIntervalMs; } else { /* * No events were found in the previous window interval. * Scan through the events in the queue to find the next * window intervals based on event ts. */ long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs); LOG.debug("Next aligned window end ts {}", ts); if (ts == Long.MAX_VALUE) { LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs); break; } windowEndTs = ts; } } nextWindowEndTs = windowEndTs; } /** * Computes the next window by scanning the events in the window and finds the next aligned window between the startTs and endTs. Return * the end ts of the next aligned window, i.e. the ts when the window should fire. * * @param startTs the start timestamp (excluding) * @param endTs the end timestamp (including) * @return the aligned window end ts for the next window or Long.MAX_VALUE if there are no more events to be processed. */ private long getNextAlignedWindowTs(long startTs, long endTs) { long nextTs = windowManager.getEarliestEventTs(startTs, endTs); if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) { return nextTs; } return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs)); }
- handleWaterMarkEvent会触发handler.onTrigger()方法
WindowManager.onTrigger
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
/** * The callback invoked by the trigger policy. */ @Override public boolean onTrigger() { List<Event<T>> windowEvents = null; List<T> expired = null; try { lock.lock(); /* * scan the entire window to handle out of order events in * the case of time based windows. */ windowEvents = scanEvents(true); expired = new ArrayList<>(expiredEvents); expiredEvents.clear(); } finally { lock.unlock(); } List<T> events = new ArrayList<>(); List<T> newEvents = new ArrayList<>(); for (Event<T> event : windowEvents) { events.add(event.get()); if (!prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime()); } else { LOG.debug("No events in the window, skipping onActivation"); } triggerPolicy.reset(); return !events.isEmpty(); }
- onTrigger方法主要是计算出三类数据,events、expiredEvents、newEvents
- 当events不为空时,触发windowLifecycleListener.onActivation,也就是调用bolt的execute方法
小结
- WindowedBoltExecutor实现了IRichBolt接口,是一个bolt,TopologyBuilder在setBolt的时候,对用户的IWindowedBolt的实现类进行了一次包装,用WindowedBoltExecutor替代,它改造了execute方法,对于该纳入windows的调用windowManager.add添加,该丢弃的则进行ack,而真正的bolt的execute操作,则需要等待window的触发
- WindowLifecycleListener有两个回调操作,一个是由EvictionPolicy触发的onExpiry,一个是由TriggerPolicy触发的onActivation操作
- 由于window的windowLength及slidingInterval参数有Duration及Count两个维度,因而EvictionPolicy及TriggerPolicy也有这两类维度,外加watermark属性,因而每个policy分别有4个实现类,EvictionPolicy有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicy;TriggerPolicy有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicy
- windowManager.add除了把tuple保存起来外,还调用了两类trigger的track操作,然后进行compactWindow操作;WatermarkTimeEvictionPolicy的track目前没有操作,而WatermarkTimeTriggerPolicy的track方法在event是WaterMarkEvent的时候会触发window操作,调用WindowManager的onTrigger方法,进而筛选出window的数据,然后触发windowLifecycleListener.onActivation操作,最后触发windowedBolt的execute方法
- WindowManager的onTrigger方法以及add方法都会调用scanEvents,区别是前者是fullScan,后者不是;scanEvents会调用evictionPolicy.evict来判断是否该剔除tuple,进而触发windowLifecycleListener.onExpiry操作,该操作会对tuple进行ack,即过期的tuple在expired的时候会自动ack(
理论上所有tuple都会过期,也就都会自动被ack,因而要求Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS大于windowLength + slidingInterval,避免还没ack就被认为超时
) - WindowedBoltExecutor在start的时候会启动WaterMarkEventGenerator,它会注册一个定时任务,每隔watermarkInterval时间计算watermark(
这批数据最小值-lag
),当大于lastWaterMarkTs时,更新lastWaterMarkTs,往windowManager添加WaterMarkEvent(该event的isWatermark为true
),整个WindowManager的onTrigger方法(即windowLifecycleListener.onActivation操作
)就是靠这里来触发的 - 关于ack的话,在WindowedBoltExecutor.execute方法对于未能进入window队列的,没有配置配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM的话,则立马ack;在tuple过期的时候会自ack;WindowedBoltExecutor使用了WindowedOutputCollector,它继承了OutputCollector,对输入的tuples做anchor操作