write异步与流量控制 4.0.29.Final
AbstractChannelHandlerContext.class
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = this.findContextOutbound();
EventExecutor executor = next.executor();
if(executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if(flush) {
next.invokeFlush();
}
} else {
int size = this.channel.estimatorHandle().size(msg);
if(size > 0) {
ChannelOutboundBuffer task = this.channel.unsafe().outboundBuffer();
if(task != null) {
task.incrementPendingOutboundBytes((long)size);
}
}
Object task1;
if(flush) {
task1 = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task1 = AbstractChannelHandlerContext.WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, (Runnable)task1, promise, msg);
}
}对于write操作,会先判断下一个outboundhandler的执行线程是否是当前线程
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
public AbstractEventExecutor() {
}
public EventExecutor next() {
return this;
}
public boolean inEventLoop() {
return this.inEventLoop(Thread.currentThread());
}如果是,则执行invokewrite
private void invokeWrite(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
} catch (Throwable var4) {
notifyOutboundHandlerException(var4, promise);
}
}如果不是则让handler所在线程执行
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable var9) {
Throwable cause = var9;
try {
promise.setFailure(cause);
} finally {
if(msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}execute一个任务
abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode<Runnable> implements Runnable {
。。。。。。
public final void run() {
try {
if(this.size > 0) {
ChannelOutboundBuffer buffer = this.ctx.channel.unsafe().outboundBuffer();
if(buffer != null) {
buffer.decrementPendingOutboundBytes((long)this.size);
}
}
this.write(this.ctx, this.msg, this.promise);
} finally {
this.ctx = null;
this.msg = null;
this.promise = null;
}
}对于业务线程write,当时先增加水位,run时减少水位
到底层
DefaultChannelPipeline.HeadHandler
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
this.unsafe.write(msg, promise);
}
public void flush(ChannelHandlerContext ctx) throws Exception {
this.unsafe.flush();
}AbstractChannel.class
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if(outboundBuffer == null) {
this.safeSetFailure(promise, AbstractChannel.CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
} else {
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
size = AbstractChannel.this.estimatorHandle().size(msg);
if(size < 0) {
size = 0;
}
} catch (Throwable var6) {
this.safeSetFailure(promise, var6);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
}可以看到,最终调用ChannelOutboundBuffer.addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
ChannelOutboundBuffer.Entry entry = ChannelOutboundBuffer.Entry.newInstance(msg, size, total(msg), promise);
if(this.tailEntry == null) {
this.flushedEntry = null;
this.tailEntry = entry;
} else {
ChannelOutboundBuffer.Entry tail = this.tailEntry;
tail.next = entry;
this.tailEntry = entry;
}
if(this.unflushedEntry == null) {
this.unflushedEntry = entry;
}
this.incrementPendingOutboundBytes((long)size, false);
}又增加水位,所以业务线程write会经历增加-减少-增加 3次,而在io线程write只会增加1次
相关推荐
88254251 2020-11-01
MarukoMa 2020-09-02
88234852 2020-09-15
陈旭阳 2020-08-31
whynotgonow 2020-08-19
前端开发Kingcean 2020-07-30
whynotgonow 2020-07-29
bowean 2020-07-08
前端开发Kingcean 2020-07-08
88520191 2020-07-05
前端开发Kingcean 2020-06-27
88481456 2020-06-18
whynotgonow 2020-06-16
88520191 2020-06-13
88520191 2020-06-13
89500297 2020-06-13