聊聊storm client的netty buffer watermark

本文主要研究一下storm client的netty buffer watermark

Config

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

/**
     * Netty based messaging: The netty write buffer high watermark in bytes.
     * <p>
     * If the number of bytes queued in the netty's write buffer exceeds this value, the netty {@code Channel.isWritable()} will start to
     * return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK
     * low water mark}.
     * </p>
     */
    @isInteger
    @isPositiveNumber
    public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = "storm.messaging.netty.buffer.high.watermark";
    /**
     * Netty based messaging: The netty write buffer low watermark in bytes.
     * <p>
     * Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water
     * mark} and then dropped down below this value, the netty {@code Channel.isWritable()} will start to return true.
     * </p>
     */
    @isInteger
    @isPositiveNumber
    public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = "storm.messaging.netty.buffer.low.watermark";
  • 这里有两个相关的参数,分别是storm.messaging.netty.buffer.high.watermark以及storm.messaging.netty.buffer.low.watermark
  • defaults.yaml
# The netty write buffer high watermark in bytes.
# If the number of bytes queued in the netty's write buffer exceeds this value, the netty client will block
# until the value falls below the low water mark.
storm.messaging.netty.buffer.high.watermark: 16777216 # 16 MB
# The netty write buffer low watermark in bytes.
# Once the number of bytes queued in the write buffer exceeded the high water mark and then
# dropped down below this value, any blocked clients will unblock and start processing further messages.
storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
在defaults.yaml文件中,low.watermark默认大小为8388608,即8M;high.watermark默认大小为16777216,即16M

Client

storm-2.0.0/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java

Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
        EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
           int port) {
        this.topoConf = topoConf;
        closing = false;
        this.scheduler = scheduler;
        int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
        int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
        int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
        // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
        saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
        LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}",
                 host, port, bufferSize, lowWatermark, highWatermark);

        int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
        int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, -1);

        // Initiate connection to remote destination
        this.eventLoopGroup = eventLoopGroup;
        // Initiate connection to remote destination
        bootstrap = new Bootstrap()
            .group(this.eventLoopGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_SNDBUF, bufferSize)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark))
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
        dstAddress = new InetSocketAddress(host, port);
        dstAddressPrefixedName = prefixedName(dstAddress);
        launchChannelAliveThread();
        scheduleConnect(NO_DELAY_MS);
        int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
        batcher = new MessageBuffer(messageBatchSize);
        String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
        if (clazz == null) {
            waitStrategy = new WaitStrategyProgressive();
        } else {
            waitStrategy = ReflectionUtils.newInstance(clazz);
        }
        waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
    }
  • 这里根据lowWatermark及highWatermark创建了WriteBufferWaterMark对象,设置到ChannelOption.WRITE_BUFFER_WATER_MARK

WriteBufferWaterMark

netty-all-4.1.25.Final-sources.jar!/io/netty/channel/WriteBufferWaterMark.java

/**
 * WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer.
 * <p>
 * If the number of bytes queued in the write buffer exceeds the
 * {@linkplain #high high water mark}, {@link Channel#isWritable()}
 * will start to return {@code false}.
 * <p>
 * If the number of bytes queued in the write buffer exceeds the
 * {@linkplain #high high water mark} and then
 * dropped down below the {@linkplain #low low water mark},
 * {@link Channel#isWritable()} will start to return
 * {@code true} again.
 */
public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

    public static final WriteBufferWaterMark DEFAULT =
            new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);

    private final int low;
    private final int high;

    /**
     * Create a new instance.
     *
     * @param low low water mark for write buffer.
     * @param high high water mark for write buffer
     */
    public WriteBufferWaterMark(int low, int high) {
        this(low, high, true);
    }

    /**
     * This constructor is needed to keep backward-compatibility.
     */
    WriteBufferWaterMark(int low, int high, boolean validate) {
        if (validate) {
            if (low < 0) {
                throw new IllegalArgumentException("write buffer's low water mark must be >= 0");
            }
            if (high < low) {
                throw new IllegalArgumentException(
                        "write buffer's high water mark cannot be less than " +
                                " low water mark (" + low + "): " +
                                high);
            }
        }
        this.low = low;
        this.high = high;
    }

    /**
     * Returns the low water mark for the write buffer.
     */
    public int low() {
        return low;
    }

    /**
     * Returns the high water mark for the write buffer.
     */
    public int high() {
        return high;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder(55)
            .append("WriteBufferWaterMark(low: ")
            .append(low)
            .append(", high: ")
            .append(high)
            .append(")");
        return builder.toString();
    }

}
  • 从注释里头可以看到这两个参数控制的是Channel.isWritable()方法

ChannelOutboundBuffer.setWritable

netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java

private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

    private volatile int unwritable;

    /**
     * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
     * not exceed the write watermark of the {@link Channel} and
     * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
     * {@code false}.
     */
    public boolean isWritable() {
        return unwritable == 0;
    }

    /**
     * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
     */
    public long bytesBeforeWritable() {
        long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
        // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
        // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
        // together. totalPendingSize will be updated before isWritable().
        if (bytes > 0) {
            return isWritable() ? 0 : bytes;
        }
        return 0;
    }

    /**
     * Decrement the pending bytes which will be written at some point.
     * This method is thread-safe!
     */
    void decrementPendingOutboundBytes(long size) {
        decrementPendingOutboundBytes(size, true, true);
    }

    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }

    private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

    private void fireChannelWritabilityChanged(boolean invokeLater) {
        final ChannelPipeline pipeline = channel.pipeline();
        if (invokeLater) {
            Runnable task = fireChannelWritabilityChangedTask;
            if (task == null) {
                fireChannelWritabilityChangedTask = task = new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelWritabilityChanged();
                    }
                };
            }
            channel.eventLoop().execute(task);
        } else {
            pipeline.fireChannelWritabilityChanged();
        }
    }
  • bytesBeforeWritable方法先判断totalPendingSize是否大于lowWatermark,如果不大于则返回0,如果大于且isWritable返回true则返回0,否则返回差值
  • decrementPendingOutboundBytes方法会判断,如果notifyWritability为true且newWriteBufferSize < channel.config().getWriteBufferLowWaterMark(),则调用setWritablesetWritable(invokeLater)
  • setWritable会更新unwritable,如果是从非0变为0,还会触发fireChannelWritabilityChanged进行通知

ChannelOutboundBuffer.setUnwritable

netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java

private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

    private volatile int unwritable;
    
    /**
     * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
     * not exceed the write watermark of the {@link Channel} and
     * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
     * {@code false}.
     */
    public boolean isWritable() {
        return unwritable == 0;
    }

    /**
     * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
     */
    public long bytesBeforeUnwritable() {
        long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
        // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
        // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
        // together. totalPendingSize will be updated before isWritable().
        if (bytes > 0) {
            return isWritable() ? bytes : 0;
        }
        return 0;
    }

    /**
     * Increment the pending bytes which will be written at some point.
     * This method is thread-safe!
     */
    void incrementPendingOutboundBytes(long size) {
        incrementPendingOutboundBytes(size, true);
    }

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0 && newValue != 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

    private void fireChannelWritabilityChanged(boolean invokeLater) {
        final ChannelPipeline pipeline = channel.pipeline();
        if (invokeLater) {
            Runnable task = fireChannelWritabilityChangedTask;
            if (task == null) {
                fireChannelWritabilityChangedTask = task = new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelWritabilityChanged();
                    }
                };
            }
            channel.eventLoop().execute(task);
        } else {
            pipeline.fireChannelWritabilityChanged();
        }
    }
  • bytesBeforeUnwritable方法先判断highWatermark与totalPendingSize的差值,totalPendingSize大于等于highWatermark,则返回0;如果小于highWatermark,且isWritable为true,则返回差值,否则返回0
  • incrementPendingOutboundBytes方法判断如果newWriteBufferSize > channel.config().getWriteBufferHighWaterMark(),则调用setUnwritable(invokeLater)
  • setUnwritable会更新unwritable,如果是从0变为非0,还会触发fireChannelWritabilityChanged进行通知

小结

  • storm client的storm.messaging.netty.buffer.high.watermark(默认16M)以及storm.messaging.netty.buffer.low.watermark(默认8M)其实配置的是netty的ChannelOption.WRITE_BUFFER_WATER_MARK
  • netty的WriteBufferWaterMark通过lowWatermark及highWatermark参数影响了ChannelOutboundBuffer的bytesBeforeWritable以及bytesBeforeUnwritable方法的返回值(目前来看这两个方法貌似调用的比较少)
  • lowWatermark及highWatermark分别在decrementPendingOutboundBytes及incrementPendingOutboundBytes方法里头用到(目前应该是这两个方法起作用),当小于lowWatermark或者大于highWatermark的时候,分别触发setWritable及setUnwritable,更改ChannelOutboundBuffer的unwritable字段,进而影响isWritable方法;在isWritable为true的时候会立马执行写请求,当返回false的时候,写请求会被放入队列等待isWritable为true时才能执行这些堆积的写请求

doc

相关推荐