聊聊storm TridentWindowManager的pendingTriggers

本文主要研究一下storm TridentWindowManager的pendingTriggers

TridentBoltExecutor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
  • 这里调用_bolt的finishBatch方法,这个_bolt有两个实现类,分别是TridentSpoutExecutor用于spout,一个是SubtopologyBolt用于普通的bolt

SubtopologyBolt.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }
  • SubtopologyBolt.finishBatch调用了一系列TridentProcessor的finishBatch操作

WindowTridentProcessor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        // add tuple to the batch state
        Object state = processorContext.state[tridentContext.getStateIndex()];
        ((List<TridentTuple>) state).add(projection.create(tuple));
    }

   public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                    triggerKeys.add(triggerKey(pendingTriggerId));
                }
                triggerValues = windowStore.get(triggerKeys);
            }
        }

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
                }
            }
        }

        collector.setContext(processorContext);
        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
  • WindowTridentProcessor所在的bolt,ack一个batch的所有tuple之后,会执行finishBatch操作
  • WindowTridentProcessor的execute,接收到一个tuple,堆积到processorContext.state
  • finishBatch的时候,从processorContext.state取出这一批tuple,然后调用tridentWindowManager.addTuplesBatch(batchId, tuples)
  • 之后调用tridentWindowManager.getPendingTriggers()获取pendingTriggerIds存入store,同时获取待触发的triggerValues
  • 最后将triggerValues挨个构造TriggerInfo以及resultValue发送出去

StoreBasedTridentWindowManager.addTuplesBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java

public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
        List<WindowsStore.Entry> entries = new ArrayList<>();
        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
        }

        // tuples should be available in store before they are added to window manager
        windowStore.putAll(entries);

        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            addToWindowManager(i, key, tridentTuple);
        }

    }

    private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
        TridentTuple actualTuple = null;
        if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
            actualTuple = tridentTuple;
        }
        currentCachedTuplesSize.incrementAndGet();
        windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
    }
  • StoreBasedTridentWindowManager的addTuplesBatch方法,将这批tuple放入到windowStore,然后挨个addToWindowManager添加到windowManager

WindowManager.add

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

private final ConcurrentLinkedQueue<Event<T>> queue;

    /**
     * Add an event into the window, with {@link System#currentTimeMillis()} as
     * the tracking ts.
     *
     * @param event the event to add
     */
    public void add(T event) {
        add(event, System.currentTimeMillis());
    }

    /**
     * Add an event into the window, with the given ts as the tracking ts.
     *
     * @param event the event to track
     * @param ts    the timestamp
     */
    public void add(T event, long ts) {
        add(new EventImpl<T>(event, ts));
    }

    /**
     * Tracks a window event
     *
     * @param windowEvent the window event to track
     */
    public void add(Event<T> windowEvent) {
        // watermark events are not added to the queue.
        if (!windowEvent.isWatermark()) {
            queue.add(windowEvent);
        } else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }
  • 添加tuple到ConcurrentLinkedQueue中

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

/**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • onTrigger方法首先调用scanEvents方法获取windowEvents,之后区分为events及newEvents,然后回调windowLifecycleListener.onActivation(events, newEvents, expired)方法

WindowManager.scanEvents

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

/**
     * Scan events in the queue, using the expiration policy to check
     * if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop
     *                 as soon as an event not satisfying the expiration policy is found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if (!fullScan || action == STOP) {
                    break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if (!eventsToExpire.isEmpty()) {
            LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }
  • scanEvents方法从ConcurrentLinkedQueue中获取event,然后判断是否过期,将其分为expiredEvents、eventsToProcess两类,返回eventsToProcess的events

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

/**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

    private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • onActivation方法调用了execAggregatorAndStoreResult,它会调用window的aggregator,然后将结果存到windowStore,同时将resultantAggregatedValue作为TriggerResult添加到pendingTriggers中

小结

  • WindowTridentProcessor所在的TridentBoltExecutor,它在接收到spout的tuple的时候,调用processor的execute方法,将tuple缓存到ProcessorContext中;一系列的processor的execute方法执行完之后,就ack该tuple
  • 当WindowTridentProcessor所在的TridentBoltExecutor对一个batch的所有tuple ack完之后,会触发checkFinish操作,然后执行finishBatch操作,而finishBatch操作会调用一系列TridentProcessor的finishBatch操作(比如WindowTridentProcessor -> ProjectedProcessor -> PartitionPersistProcessor -> EachProcessor -> AggregateProcessor)
  • WindowTridentProcessor.finishBatch从processorContext.state取出这一批tuple,然后调用tridentWindowManager.addTuplesBatch(batchId, tuples),将这批tuple放入到windowStore,然后添加到windowManager的ConcurrentLinkedQueue中;之后调用tridentWindowManager.getPendingTriggers()获取pendingTriggerIds存入store,同时获取待触发的triggerValues,将triggerValues挨个构造TriggerInfo以及resultValue发送出去
  • 而WindowManager.onTrigger方法,在window操作时间窗口触发时被调用,它从windowManager的ConcurrentLinkedQueue中获取windowEvent,然后传递给TridentWindowLifeCycleListener.onActivation
  • TridentWindowLifeCycleListener.onActivation方法则会执行window的aggregator的init、aggregate、complete操作获取聚合结果resultantAggregatedValue,然后放入pendingTriggers,至此完成window trigger与WindowTridentProcessor的衔接

doc

相关推荐