聊聊storm client的netty buffer watermark
序
本文主要研究一下storm client的netty buffer watermark
Config
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java
/** * Netty based messaging: The netty write buffer high watermark in bytes. * <p> * If the number of bytes queued in the netty's write buffer exceeds this value, the netty {@code Channel.isWritable()} will start to * return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK * low water mark}. * </p> */ @isInteger @isPositiveNumber public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = "storm.messaging.netty.buffer.high.watermark"; /** * Netty based messaging: The netty write buffer low watermark in bytes. * <p> * Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water * mark} and then dropped down below this value, the netty {@code Channel.isWritable()} will start to return true. * </p> */ @isInteger @isPositiveNumber public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = "storm.messaging.netty.buffer.low.watermark";
- 这里有两个相关的参数,分别是storm.messaging.netty.buffer.high.watermark以及storm.messaging.netty.buffer.low.watermark
- defaults.yaml
# The netty write buffer high watermark in bytes. # If the number of bytes queued in the netty's write buffer exceeds this value, the netty client will block # until the value falls below the low water mark. storm.messaging.netty.buffer.high.watermark: 16777216 # 16 MB # The netty write buffer low watermark in bytes. # Once the number of bytes queued in the write buffer exceeded the high water mark and then # dropped down below this value, any blocked clients will unblock and start processing further messages. storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB在defaults.yaml文件中,low.watermark默认大小为8388608,即8M;high.watermark默认大小为16777216,即16M
Client
storm-2.0.0/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host, int port) { this.topoConf = topoConf; closing = false; this.scheduler = scheduler; int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK)); int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK)); // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false)); LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}", host, port, bufferSize, lowWatermark, highWatermark); int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, -1); // Initiate connection to remote destination this.eventLoopGroup = eventLoopGroup; // Initiate connection to remote destination bootstrap = new Bootstrap() .group(this.eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_SNDBUF, bufferSize) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark)) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf)); dstAddress = new InetSocketAddress(host, port); dstAddressPrefixedName = prefixedName(dstAddress); launchChannelAliveThread(); scheduleConnect(NO_DELAY_MS); int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); batcher = new MessageBuffer(messageBatchSize); String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY); if (clazz == null) { waitStrategy = new WaitStrategyProgressive(); } else { waitStrategy = ReflectionUtils.newInstance(clazz); } waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); }
- 这里根据lowWatermark及highWatermark创建了WriteBufferWaterMark对象,设置到ChannelOption.WRITE_BUFFER_WATER_MARK
WriteBufferWaterMark
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/WriteBufferWaterMark.java
/** * WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer. * <p> * If the number of bytes queued in the write buffer exceeds the * {@linkplain #high high water mark}, {@link Channel#isWritable()} * will start to return {@code false}. * <p> * If the number of bytes queued in the write buffer exceeds the * {@linkplain #high high water mark} and then * dropped down below the {@linkplain #low low water mark}, * {@link Channel#isWritable()} will start to return * {@code true} again. */ public final class WriteBufferWaterMark { private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024; private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024; public static final WriteBufferWaterMark DEFAULT = new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false); private final int low; private final int high; /** * Create a new instance. * * @param low low water mark for write buffer. * @param high high water mark for write buffer */ public WriteBufferWaterMark(int low, int high) { this(low, high, true); } /** * This constructor is needed to keep backward-compatibility. */ WriteBufferWaterMark(int low, int high, boolean validate) { if (validate) { if (low < 0) { throw new IllegalArgumentException("write buffer's low water mark must be >= 0"); } if (high < low) { throw new IllegalArgumentException( "write buffer's high water mark cannot be less than " + " low water mark (" + low + "): " + high); } } this.low = low; this.high = high; } /** * Returns the low water mark for the write buffer. */ public int low() { return low; } /** * Returns the high water mark for the write buffer. */ public int high() { return high; } @Override public String toString() { StringBuilder builder = new StringBuilder(55) .append("WriteBufferWaterMark(low: ") .append(low) .append(", high: ") .append(high) .append(")"); return builder.toString(); } }
- 从注释里头可以看到这两个参数控制的是Channel.isWritable()方法
ChannelOutboundBuffer.setWritable
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); private volatile int unwritable; /** * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did * not exceed the write watermark of the {@link Channel} and * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to * {@code false}. */ public boolean isWritable() { return unwritable == 0; } /** * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0. */ public long bytesBeforeWritable() { long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark(); // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? 0 : bytes; } return 0; } /** * Decrement the pending bytes which will be written at some point. * This method is thread-safe! */ void decrementPendingOutboundBytes(long size) { decrementPendingOutboundBytes(size, true, true); } private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } } private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } }
- bytesBeforeWritable方法先判断totalPendingSize是否大于lowWatermark,如果不大于则返回0,如果大于且isWritable返回true则返回0,否则返回差值
- decrementPendingOutboundBytes方法会判断,如果notifyWritability为true且newWriteBufferSize < channel.config().getWriteBufferLowWaterMark(),则调用setWritablesetWritable(invokeLater)
- setWritable会更新unwritable,如果是从非0变为0,还会触发fireChannelWritabilityChanged进行通知
ChannelOutboundBuffer.setUnwritable
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); private volatile int unwritable; /** * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did * not exceed the write watermark of the {@link Channel} and * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to * {@code false}. */ public boolean isWritable() { return unwritable == 0; } /** * Get how many bytes can be written until {@link #isWritable()} returns {@code false}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0. */ public long bytesBeforeUnwritable() { long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize; // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? bytes : 0; } return 0; } /** * Increment the pending bytes which will be written at some point. * This method is thread-safe! */ void incrementPendingOutboundBytes(long size) { incrementPendingOutboundBytes(size, true); } private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } } private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } }
- bytesBeforeUnwritable方法先判断highWatermark与totalPendingSize的差值,totalPendingSize大于等于highWatermark,则返回0;如果小于highWatermark,且isWritable为true,则返回差值,否则返回0
- incrementPendingOutboundBytes方法判断如果newWriteBufferSize > channel.config().getWriteBufferHighWaterMark(),则调用setUnwritable(invokeLater)
- setUnwritable会更新unwritable,如果是从0变为非0,还会触发fireChannelWritabilityChanged进行通知
小结
- storm client的storm.messaging.netty.buffer.high.watermark(
默认16M
)以及storm.messaging.netty.buffer.low.watermark(默认8M
)其实配置的是netty的ChannelOption.WRITE_BUFFER_WATER_MARK - netty的WriteBufferWaterMark通过lowWatermark及highWatermark参数影响了ChannelOutboundBuffer的bytesBeforeWritable以及bytesBeforeUnwritable方法的返回值(
目前来看这两个方法貌似调用的比较少
) - lowWatermark及highWatermark分别在decrementPendingOutboundBytes及incrementPendingOutboundBytes方法里头用到(
目前应该是这两个方法起作用
),当小于lowWatermark或者大于highWatermark的时候,分别触发setWritable及setUnwritable,更改ChannelOutboundBuffer的unwritable字段,进而影响isWritable方法;在isWritable为true的时候会立马执行写请求,当返回false的时候,写请求会被放入队列等待isWritable为true时才能执行这些堆积的写请求
doc
相关推荐
枫叶上的雨露 2020-05-02
LandryBean 2020-03-12
一名java从业者 2020-01-09
weeniebear 2013-03-25
weeniebear 2014-05-28
sfqbluesky 2019-12-12
AbnerSunYH 2016-08-12
weeniebear 2016-08-11
Stereo 2016-07-27
芒果先生Mango 2018-05-31
dykun 2019-08-16
GimmeS 2016-10-11
benbendy 2016-09-30
Johnhao 2016-09-30
AbnerSunYH 2016-04-28
benbendy 2016-04-15