聊聊storm的window trigger

本文主要研究一下storm的window trigger

WindowTridentProcessor.prepare

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
        this.topologyContext = context;
        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
        if (parents.size() != 1) {
            throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }
  • 这里调用了tridentWindowManager.prepare()

AbstractTridentWindowManager.prepare

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
                                        Aggregator aggregator, BatchOutputCollector delegateCollector) {
        this.windowTaskId = windowTaskId;
        this.windowStore = windowStore;
        this.aggregator = aggregator;
        this.delegateCollector = delegateCollector;

        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;

        windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());

        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
        windowManager.setEvictionPolicy(evictionPolicy);
        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
        windowManager.setTriggerPolicy(triggerPolicy);
    }

    public void prepare() {
        preInitialize();

        initialize();

        postInitialize();
    }

    private void postInitialize() {
        // start trigger once the initialization is done.
        triggerPolicy.start();
    }
  • AbstractTridentWindowManager在构造器里头调用windowStrategy.getTriggerPolicy获取triggerPolicy;prepare方法调用了postInitialize,而它触发triggerPolicy.start()

SlidingDurationWindowStrategy.getTriggerPolicy

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java

/**
     * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
     *
     * @param triggerHandler
     * @param evictionPolicy
     * @return
     */
    @Override
    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
    }
  • 以SlidingDurationWindowStrategy为例,这里创建的是TimeTriggerPolicy,其duration为windowConfig.getSlidingLength(),而triggerHandler则为WindowManager

TimeTriggerPolicy.start

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/TimeTriggerPolicy.java

public void start() {
        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    }

   private Runnable newTriggerTask() {
        return new Runnable() {
            @Override
            public void run() {
                // do not process current timestamp since tuples might arrive while the trigger is executing
                long now = System.currentTimeMillis() - 1;
                try {
                    /*
                     * set the current timestamp as the reference time for the eviction policy
                     * to evict the events
                     */
                    if (evictionPolicy != null) {
                        evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
                    }
                    handler.onTrigger();
                } catch (Throwable th) {
                    LOG.error("handler.onTrigger failed ", th);
                    /*
                     * propagate it so that task gets canceled and the exception
                     * can be retrieved from executorFuture.get()
                     */
                    throw th;
                }
            }
        };
    }
  • start方法注册了一个调度任务,每隔duration触发(windowConfig.getSlidingLength());而run方法是触发handler.onTrigger(),即WindowManager.onTrigger()

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

/**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • 这里调用了windowLifecycleListener.onActivation(events, newEvents, expired),而windowLifecycleListener为AbstractTridentWindowManager的TridentWindowLifeCycleListener

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

/**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

   private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • TridentWindowLifeCycleListener.onActivation方法主要是execAggregatorAndStoreResult
  • 而execAggregatorAndStoreResult则依次调用aggregator的init、aggregate及complete方法
  • 最后将TriggerResult放入pendingTriggers

小结

  • storm在TimeTriggerPolicy.start的时候注册了定时任务TriggerTask,以SlidingDurationWindowStrategy为例,它的调度间隔为windowConfig.getSlidingLength()
  • TriggerTask定时触发WindowManager.onTrigger方法,该方法会回调windowLifecycleListener.onActivation
  • AbstractTridentWindowManager提供了TridentWindowLifeCycleListener,它的onActivation主要是调用execAggregatorAndStoreResult;而execAggregatorAndStoreResult方法主要完成对aggregator的一系列调用,先是调用init方法,然后遍历resultTuples挨个调用aggregate方法,最后complete方法(从这里可以清晰看到Aggregator接口的各个方法的调用逻辑及顺序)

doc

相关推荐