write异步与流量控制 4.0.29.Final
AbstractChannelHandlerContext.class
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = this.findContextOutbound(); EventExecutor executor = next.executor(); if(executor.inEventLoop()) { next.invokeWrite(msg, promise); if(flush) { next.invokeFlush(); } } else { int size = this.channel.estimatorHandle().size(msg); if(size > 0) { ChannelOutboundBuffer task = this.channel.unsafe().outboundBuffer(); if(task != null) { task.incrementPendingOutboundBytes((long)size); } } Object task1; if(flush) { task1 = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, msg, size, promise); } else { task1 = AbstractChannelHandlerContext.WriteTask.newInstance(next, msg, size, promise); } safeExecute(executor, (Runnable)task1, promise, msg); } }
对于write操作,会先判断下一个outboundhandler的执行线程是否是当前线程
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { public AbstractEventExecutor() { } public EventExecutor next() { return this; } public boolean inEventLoop() { return this.inEventLoop(Thread.currentThread()); }
如果是,则执行invokewrite
private void invokeWrite(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler)this.handler()).write(this, msg, promise); } catch (Throwable var4) { notifyOutboundHandlerException(var4, promise); } }
如果不是则让handler所在线程执行
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable var9) { Throwable cause = var9; try { promise.setFailure(cause); } finally { if(msg != null) { ReferenceCountUtil.release(msg); } } } }
execute一个任务
abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode<Runnable> implements Runnable { 。。。。。。 public final void run() { try { if(this.size > 0) { ChannelOutboundBuffer buffer = this.ctx.channel.unsafe().outboundBuffer(); if(buffer != null) { buffer.decrementPendingOutboundBytes((long)this.size); } } this.write(this.ctx, this.msg, this.promise); } finally { this.ctx = null; this.msg = null; this.promise = null; } }
对于业务线程write,当时先增加水位,run时减少水位
到底层
DefaultChannelPipeline.HeadHandler
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { this.unsafe.write(msg, promise); } public void flush(ChannelHandlerContext ctx) throws Exception { this.unsafe.flush(); }
AbstractChannel.class
public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if(outboundBuffer == null) { this.safeSetFailure(promise, AbstractChannel.CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); } else { int size; try { msg = AbstractChannel.this.filterOutboundMessage(msg); size = AbstractChannel.this.estimatorHandle().size(msg); if(size < 0) { size = 0; } } catch (Throwable var6) { this.safeSetFailure(promise, var6); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); } }
可以看到,最终调用ChannelOutboundBuffer.addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) { ChannelOutboundBuffer.Entry entry = ChannelOutboundBuffer.Entry.newInstance(msg, size, total(msg), promise); if(this.tailEntry == null) { this.flushedEntry = null; this.tailEntry = entry; } else { ChannelOutboundBuffer.Entry tail = this.tailEntry; tail.next = entry; this.tailEntry = entry; } if(this.unflushedEntry == null) { this.unflushedEntry = entry; } this.incrementPendingOutboundBytes((long)size, false); }
又增加水位,所以业务线程write会经历增加-减少-增加 3次,而在io线程write只会增加1次
相关推荐
nmgxzm00 2020-11-10
xixixi 2020-11-11
88254251 2020-11-01
MarukoMa 2020-09-02
88234852 2020-09-15
陈旭阳 2020-08-31
whynotgonow 2020-08-19
前端开发Kingcean 2020-07-30
whynotgonow 2020-07-29
bowean 2020-07-08
前端开发Kingcean 2020-07-08
88520191 2020-07-05
前端开发Kingcean 2020-06-27
88481456 2020-06-18
whynotgonow 2020-06-16
88520191 2020-06-13
88520191 2020-06-13
89500297 2020-06-13