聊聊storm的window trigger
序
本文主要研究一下storm的window trigger
WindowTridentProcessor.prepare
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) { this.topologyContext = context; List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories(); if (parents.size() != 1) { throw new RuntimeException("Aggregation related operation can only have one parent"); } Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf); this.tridentContext = tridentContext; collector = new FreshCollector(tridentContext); projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields); windowStore = windowStoreFactory.create(stormConf); windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR; windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId); tridentWindowManager = storeTuplesInStore ? new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields) : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector()); tridentWindowManager.prepare(); }
- 这里调用了tridentWindowManager.prepare()
AbstractTridentWindowManager.prepare
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator, BatchOutputCollector delegateCollector) { this.windowTaskId = windowTaskId; this.windowStore = windowStore; this.aggregator = aggregator; this.delegateCollector = delegateCollector; windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId; windowManager = new WindowManager<>(new TridentWindowLifeCycleListener()); WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy(); EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy(); windowManager.setEvictionPolicy(evictionPolicy); triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy); windowManager.setTriggerPolicy(triggerPolicy); } public void prepare() { preInitialize(); initialize(); postInitialize(); } private void postInitialize() { // start trigger once the initialization is done. triggerPolicy.start(); }
- AbstractTridentWindowManager在构造器里头调用windowStrategy.getTriggerPolicy获取triggerPolicy;prepare方法调用了postInitialize,而它触发triggerPolicy.start()
SlidingDurationWindowStrategy.getTriggerPolicy
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
/** * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration. * * @param triggerHandler * @param evictionPolicy * @return */ @Override public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) { return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy); }
- 以SlidingDurationWindowStrategy为例,这里创建的是TimeTriggerPolicy,其duration为windowConfig.getSlidingLength(),而triggerHandler则为WindowManager
TimeTriggerPolicy.start
storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/TimeTriggerPolicy.java
public void start() { executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS); } private Runnable newTriggerTask() { return new Runnable() { @Override public void run() { // do not process current timestamp since tuples might arrive while the trigger is executing long now = System.currentTimeMillis() - 1; try { /* * set the current timestamp as the reference time for the eviction policy * to evict the events */ if (evictionPolicy != null) { evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration)); } handler.onTrigger(); } catch (Throwable th) { LOG.error("handler.onTrigger failed ", th); /* * propagate it so that task gets canceled and the exception * can be retrieved from executorFuture.get() */ throw th; } } }; }
- start方法注册了一个调度任务,每隔duration触发(
windowConfig.getSlidingLength()
);而run方法是触发handler.onTrigger(),即WindowManager.onTrigger()
WindowManager.onTrigger
storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java
/** * The callback invoked by the trigger policy. */ @Override public boolean onTrigger() { List<Event<T>> windowEvents = null; List<T> expired = null; try { lock.lock(); /* * scan the entire window to handle out of order events in * the case of time based windows. */ windowEvents = scanEvents(true); expired = new ArrayList<>(expiredEvents); expiredEvents.clear(); } finally { lock.unlock(); } List<T> events = new ArrayList<>(); List<T> newEvents = new ArrayList<>(); for (Event<T> event : windowEvents) { events.add(event.get()); if (!prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); windowLifecycleListener.onActivation(events, newEvents, expired); } else { LOG.debug("No events in the window, skipping onActivation"); } triggerPolicy.reset(); return !events.isEmpty(); }
- 这里调用了windowLifecycleListener.onActivation(events, newEvents, expired),而windowLifecycleListener为AbstractTridentWindowManager的TridentWindowLifeCycleListener
TridentWindowLifeCycleListener.onActivation
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
/** * Listener to reeive any activation/expiry of windowing events and take further action on them. */ class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> { @Override public void onExpiry(List<T> expiredEvents) { LOG.debug("onExpiry is invoked"); onTuplesExpired(expiredEvents); } @Override public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { LOG.debug("onActivation is invoked with events size: [{}]", events.size()); // trigger occurred, create an aggregation and keep them in store int currentTriggerId = triggerId.incrementAndGet(); execAggregatorAndStoreResult(currentTriggerId, events); } } private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) { List<TridentTuple> resultTuples = getTridentTuples(tupleEvents); // run aggregator to compute the result AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); Object state = aggregator.init(currentTriggerId, collector); for (TridentTuple resultTuple : resultTuples) { aggregator.aggregate(state, resultTuple, collector); } aggregator.complete(state, collector); List<List<Object>> resultantAggregatedValue = collector.values; ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1), new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); windowStore.putAll(entries); pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); }
- TridentWindowLifeCycleListener.onActivation方法主要是execAggregatorAndStoreResult
- 而execAggregatorAndStoreResult则依次调用aggregator的init、aggregate及complete方法
- 最后将TriggerResult放入pendingTriggers
小结
- storm在TimeTriggerPolicy.start的时候注册了定时任务TriggerTask,以SlidingDurationWindowStrategy为例,它的调度间隔为windowConfig.getSlidingLength()
- TriggerTask定时触发WindowManager.onTrigger方法,该方法会回调windowLifecycleListener.onActivation
- AbstractTridentWindowManager提供了TridentWindowLifeCycleListener,它的onActivation主要是调用execAggregatorAndStoreResult;而execAggregatorAndStoreResult方法主要完成对aggregator的一系列调用,先是调用init方法,然后遍历resultTuples挨个调用aggregate方法,最后complete方法(
从这里可以清晰看到Aggregator接口的各个方法的调用逻辑及顺序
)
doc
相关推荐
枫叶上的雨露 2020-05-02
LandryBean 2020-03-12
一名java从业者 2020-01-09
weeniebear 2013-03-25
weeniebear 2014-05-28
sfqbluesky 2019-12-12
AbnerSunYH 2016-08-12
weeniebear 2016-08-11
Stereo 2016-07-27
芒果先生Mango 2018-05-31
dykun 2019-08-16
GimmeS 2016-10-11
benbendy 2016-09-30
Johnhao 2016-09-30
AbnerSunYH 2016-04-28
benbendy 2016-04-15