聊聊storm的maxSpoutPending
序
本文主要研究一下storm的maxSpoutPending
TOPOLOGY_MAX_SPOUT_PENDING
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java
/** * The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to * spouts or topologies as a whole. * * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has * no effect for unreliable spouts that don't tag their tuples with a message id. */ @isInteger @isPositiveNumber public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending";
- TOPOLOGY_MAX_SPOUT_PENDING设置的是一个spout task已经emit等待ack的tuple的最大数量,该配置仅仅对于发射可靠tuple(
设置msgId
)的spout起作用 - defaults.yaml文件中topology.max.spout.pending的默认配置为null
SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public void init(final ArrayList<Task> idToTask, int idToTaskBase) { this.threadId = Thread.currentThread().getId(); executorTransfer.initLocalRecvQueues(); while (!stormActive.get()) { Utils.sleep(100); } LOG.info("Opening spout {}:{}", componentId, taskIds); this.idToTask = idToTask; this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); //...... } public Callable<Long> call() throws Exception { init(idToTask, idToTaskBase); return new Callable<Long>() { final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount(); int recvqCheckSkips = 0; int swIdleCount = 0; // counter for spout wait strategy int bpIdleCount = 0; // counter for back pressure wait strategy int rmspCount = 0; @Override public Long call() throws Exception { int receiveCount = 0; if (recvqCheckSkips++ == recvqCheckSkipCountMax) { receiveCount = receiveQueue.consume(SpoutExecutor.this); recvqCheckSkips = 0; } long currCount = emittedCount.get(); boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending); boolean isActive = stormActive.get(); if (!isActive) { inactiveExecute(); return 0L; } if (!lastActive.get()) { lastActive.set(true); activateSpouts(); } boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); boolean noEmits = true; long emptyStretch = 0; if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) { for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators. spouts.get(j).nextTuple(); } noEmits = (currCount == emittedCount.get()); if (noEmits) { emptyEmitStreak.increment(); } else { emptyStretch = emptyEmitStreak.get(); emptyEmitStreak.set(0); } } if (reachedMaxSpoutPending) { if (rmspCount == 0) { LOG.debug("Reached max spout pending"); } rmspCount++; } else { if (rmspCount > 0) { LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount); } rmspCount = 0; } if (receiveCount > 1) { // continue without idling return 0L; } if (!pendingEmits.isEmpty()) { // then facing backpressure backPressureWaitStrategy(); return 0L; } bpIdleCount = 0; if (noEmits) { spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch); return 0L; } swIdleCount = 0; return 0L; } private void backPressureWaitStrategy() throws InterruptedException { long start = Time.currentTimeMillis(); if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait."); } bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount); spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start); } private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException { emptyEmitStreak.increment(); long start = Time.currentTimeMillis(); swIdleCount = spoutWaitStrategy.idle(swIdleCount); if (reachedMaxSpoutPending) { spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start); } else { if (emptyStretch > 0) { LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch); } } } // returns true if pendingEmits is empty private boolean tryFlushPendingEmits() { for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) { if (executorTransfer.tryTransfer(t, null)) { pendingEmits.poll(); } else { // to avoid reordering of emits, stop at first failure return false; } } return true; } }; }
- 这里从topoConf读取Config.TOPOLOGY_MAX_SPOUT_PENDING,如果读取不到则取0,之后乘以task的数量,即为maxSpoutPending
- maxSpoutPending在call方法里头控制的是reachedMaxSpoutPending变量,只有!reachedMaxSpoutPending && pendingEmitsIsEmpty才能够执行nextTuple发射数据
- 注意这里reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending),它会计算pending的size
SpoutOutputCollectorImpl
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@Override public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { try { return sendSpoutMsg(streamId, tuple, messageId, null); } catch (InterruptedException e) { LOG.warn("Spout thread interrupted during emit()."); throw new RuntimeException(e); } } @Override public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { try { sendSpoutMsg(streamId, tuple, messageId, taskId); } catch (InterruptedException e) { LOG.warn("Spout thread interrupted during emitDirect()."); throw new RuntimeException(e); } } private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws InterruptedException { emittedCount.increment(); List<Integer> outTasks; if (outTaskId != null) { outTasks = taskData.getOutgoingTasks(outTaskId, stream, values); } else { outTasks = taskData.getOutgoingTasks(stream, values); } final boolean needAck = (messageId != null) && hasAckers; final List<Long> ackSeq = needAck ? new ArrayList<>() : null; final long rootId = needAck ? MessageId.generateId(random) : 0; for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators. Integer t = outTasks.get(i); MessageId msgId; if (needAck) { long as = MessageId.generateId(random); msgId = MessageId.makeRootId(rootId, as); ackSeq.add(as); } else { msgId = MessageId.makeUnanchored(); } final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId); AddressedTuple adrTuple = new AddressedTuple(t, tuple); executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits()); } if (isEventLoggers) { taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits()); } if (needAck) { boolean sample = executor.samplerCheck(); TupleInfo info = new TupleInfo(); info.setTaskId(this.taskId); info.setStream(stream); info.setMessageId(messageId); if (isDebug) { info.setValues(values); } if (sample) { info.setTimestamp(System.currentTimeMillis()); } pending.put(rootId, info); List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId); taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits()); } else if (messageId != null) { // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical if (isDebug) { if (spoutExecutorThdId != Thread.currentThread().getId()) { throw new RuntimeException("Detected background thread emitting tuples for the spout. " + "Spout Output Collector should only emit from the main spout executor thread."); } } globalTupleInfo.clear(); globalTupleInfo.setStream(stream); globalTupleInfo.setValues(values); globalTupleInfo.setMessageId(messageId); globalTupleInfo.setTimestamp(0); globalTupleInfo.setId("0:"); Long timeDelta = 0L; executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo); } return outTasks; }
- 该collector的emit及emitDirect方法最后都是调用sendSpoutMsg
- sendSpoutMsg这里有个判断needAck = (messageId != null) && hasAckers;也就如果没有传messageId或者没有acker的话,needAck为false
- needAck为false的话,是不会往pending队列添加数据的,因而SpoutExecutor的reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending),由于pending.size >= maxSpoutPending不成立,reachedMaxSpoutPending为false,因而就不会触发maxSpoutPending的机制
MasterBatchCoordinator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { _throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1); for (String spoutId : _managedSpoutIds) { _states.add(TransactionalState.newCoordinatorState(conf, spoutId)); } _currTransaction = getStoredCurrTransaction(); _collector = collector; Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); if (active == null) { _maxTransactionActive = 1; } else { _maxTransactionActive = active.intValue(); } _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive); for (int i = 0; i < _spouts.size(); i++) { String txId = _managedSpoutIds.get(i); _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); } LOG.debug("Opened {}", this); } private void sync() { // note that sometimes the tuples active may be less than max_spout_pending, e.g. // max_spout_pending = 3 // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet), // and there won't be a batch for tx 4 because there's max_spout_pending tx active TransactionStatus maybeCommit = _activeTx.get(_currTransaction); if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this); } if (_active) { if (_activeTx.size() < _maxTransactionActive) { Long curr = _currTransaction; for (int i = 0; i < _maxTransactionActive; i++) { if (!_activeTx.containsKey(curr) && isReady(curr)) { // by using a monotonically increasing attempt id, downstream tasks // can be memory efficient by clearing out state for old attempts // as soon as they see a higher attempt id for a transaction Integer attemptId = _attemptIds.get(curr); if (attemptId == null) { attemptId = 0; } else { attemptId++; } _attemptIds.put(curr, attemptId); for (TransactionalState state : _states) { state.setData(CURRENT_ATTEMPTS, _attemptIds); } TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); final TransactionStatus newTransactionStatus = new TransactionStatus(attempt); _activeTx.put(curr, newTransactionStatus); _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this); _throttler.markEvent(); } curr = nextTransactionId(curr); } } } }
- MasterBatchCoordinator的open方法从conf读取Config.TOPOLOGY_MAX_SPOUT_PENDING设置到_maxTransactionActive,如果为null则默认为1
- 这里只有_activeTx.size() < _maxTransactionActive才会往BATCH_STREAM_ID发射数据
小结
- Config.TOPOLOGY_MAX_SPOUT_PENDING(
topology.max.spout.pending
),默认为null,只对于开启可靠(msgId
)消息的spout起作用;对于普通spout,SpoutOutputCollectorImpl判断没有开启ack的话,不会往pending队列添加数据,因而reachedMaxSpoutPending为false,不会触发maxSpoutPending的机制;而对于trident的spout,默认是使用TransactionAttempt.getTransactionId()作为batchId,按transaction进行追踪 - 对于普通的spout,指的是等待ack的数量的最大值,超过这个值,SpoutExecutor不会调用spout的nextTuple发射数据
- 对于trident的spout来说,指的是同时处理的batches的数量,只有这些batches处理成功或失败之后才能继续下一个batch
doc
- Trident Spouts
- 聊聊storm的IWaitStrategy
相关推荐
枫叶上的雨露 2020-05-02
LandryBean 2020-03-12
一名java从业者 2020-01-09
weeniebear 2013-03-25
weeniebear 2014-05-28
sfqbluesky 2019-12-12
AbnerSunYH 2016-08-12
weeniebear 2016-08-11
Stereo 2016-07-27
芒果先生Mango 2018-05-31
dykun 2019-08-16
GimmeS 2016-10-11
benbendy 2016-09-30
Johnhao 2016-09-30
AbnerSunYH 2016-04-28
benbendy 2016-04-15