【源起Netty 正传】Netty Channel
Channel定位
注意:如无特别说明,文中的Channel都指的是Netty Channel(io.netty.channel)
一周时间的Channel家族
学习,一度让我怀疑人生——研究这个方法有没有用?学习Netty是不是有点儿下了高速走乡间小路的意思?我为啥要读源码?
之所以产生这些疑问,除了我本身心理活动丰富以外,主要病因在于没搞清楚Channel在Netty体系中的定位。而没能清晰理解Netty的定位,也默默的送出了一记助攻。
作些本质思考:Netty是一个NIO框架,是一个嫁接在java NIO基础上的框架。
宏观上可以这么理解,见下图:
先不急着聊Channel,回顾下IO演进过程,重点关注IO框架的结构变化。搞懂了这部分后,我们将明白Channel在IO世界中扮演的角色!
进击的IO
BIO
此图展示的已经算是优化后的BIO了——用到了线程池。显然,每一个client都需要server端付出一个Thread的代价,即使你通过线程池做了优化,由于受到线程个数的制约,激增的客户端依旧表现的“欲求不满”。
NIO
- Acceptor注册Selector,监听accept事件
- 当客户端连接后,触发accept事件
- 服务器构建对应的Channel,并在其上注册Selector,监听读写事件
- 当发生读写事件后,进行相应的读写处理
Reactor单线程
与NIO模型相似,当然也就有和NIO同样的问题:selector/reactor单个线程处理多个channel的各种操作,如果其中一个channel的事件处理延缓了,将影响其它channel。
Reactor多线程
将read/write这种io处理操作分隔出来,非io型操作(业务操作)配备以线程池,进化成reactor多线程模型:
这样的架构,系统瓶颈转移至Reactor部分。而目前劳苦功高的Reactor作了两件事:
1.接收客户端链接请求
2.处理IO型读写操作
主从Reactor
将接收client链接的功能再次拆分出来:
Netty恰恰就是主从Reactor模型的实践者,想想服务端创建时的代码:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) ...
从nio时代的模型图上开始出现channel(java channel),它的定位就是进行诸如connect、write、read、close等底层交互。概括一下,java channel是上承selector下连socket的存在。而netty channel,则把java channel当作了底层。
源码分析
类结构
清楚了Channel的定位,接下来对其常用api进行分析。
首先拍出类图:
其实Channel内部还有一套体系,Unsafe
家族:
Unsafe是Channel的内置类(接口),与java channel交互的重任最终会落到Unsafe身上。
write方法
write
只是将数据写入到了ChannelOutboundBuffer
中,并没有真正的发送出去,到flush
方法调用时,才写入到java channel中发送给对方。
下面列出AbstractChannel
的write方法,值得关注的地方已打上中文注释:
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); //作消息的包装,转换成ByteBuf等 size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); //msg消息写入ChannelOutboundBuffer }
上述代码最后一行,msg写入了ChannelOutboundBuffer的尾节点tailEntry
,同时将unflushedEntry
赋值暂存。代码展开如下:
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { //注释一、标记成“未刷新”的数据 unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); }
ChannelOutboundBuffer类
这里对ChannelOutboundBuffer
类进行简单说明,按惯例先看类注释。
/** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. * * 省略... */
前文提到过,write方法将消息写到ChannelOutboundBuffer,算是数据暂存;之后的flush
再将消息刷到java channel乃至客户端。
来张示意图,方便理解:
图中列出的三个属性,在write->ChannelOutboundBuffer->flush
的数据流转过程中比较关键。Entry是啥?ChannelOutboundBuffer的静态内部类,典型的链表结构数据:
static final class Entry { Entry next; // 省略... }
write方法的最后部分(注释一位置)调用outboundBuffer.addMessage(msg, size, promise)
,已将封装msg的Entry赋值给tailEntry和unflushedEntry
;而flush方法,通过调用outboundBuffer.addFlush()
(下文,注释二位置),将unflushedEntry间接赋值给了flushedEntry
。
public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
flush方法
直接从AbstractChannel的flush方法开始(若以Channel的flush为开端会经pipeline,将有很长调用链,省略):
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); //注释二、标记成“已刷新”数据 flush0(); //数据处理 }
outboundBuffer.addFlush()
方法已经分析过了,跟踪调用链flush0->doWrite
,我们看下AbstractNioByteChannel的doWrite
方法:
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = config().getWriteSpinCount(); //自旋计数,限制循环次数,默认16 do { Object msg = in.current(); //flushedEntry的msg if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } writeSpinCount -= doWriteInternal(in, msg); } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
writeSpinCount
是个自旋计数,类似于自旋锁的设定,防止当前IO线程由于网络等原因无尽执行写操作,而使得线程假死,造成资源浪费。
观察doWriteInternal
方法,关键处依旧中文注释伺候:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { //writerIndex - readerIndex >0 ? true: flase in.remove(); return 0; } final int localFlushedAmount = doWriteBytes(buf); //返回实际写入到java channel的字节数 if (localFlushedAmount > 0) { //写入成功 in.progress(localFlushedAmount); /** * 1.已经全部写完,执行in.remove() * 2.“写半包”场景,直接返回1。 * 外层方法的自旋变量writeSpinCount递减成15,轮询再次执行本方法 */ if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { //“文件型”消息处理逻辑省略.. } else { // Should not reach here. throw new Error(); } return WRITE_STATUS_SNDBUF_FULL; //发送缓冲区满,值=Integer.MAX_VALUE }
回到doWrite方法,最后执行了incompleteWrite(writeSpinCount < 0)
:
protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { setOpWrite(); } else { // Schedule flush again later so other tasks can be picked up in the meantime Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
这里的设定挺有意思:
- 如果 setOpWrite = writeSpinCount < 0 = true,即 doWriteInternal方法返回值 = WRITE_STATUS_SNDBUF_FULL(发送缓冲区满)时,设置写操作位:
protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }
其实就是设置SelectionKey的OP_WRITE操作位,在selector/reactor下次轮询的时候,将再次执行写操作
- 如果 setOpWrite = writeSpinCount < 0 = false,即 doWriteInternal方法返回值 = 1,16次写半包仍旧没将消息发送出去,则通过定时器再次执行flush:
public Channel flush() { pipeline.flush(); return this; }
结论:前者由于发送缓冲区满,已无法写入数据,于是继希望于selector的下次轮询;后者则可能只是因为自旋次数少,引起的数据发送不完全,直接将任务再次放入pipeline,而无需等待selector。
这无疑是种优化,细节之处,功力尽显!