Netty新连接接入与NioSocketChannel分析
原文链接:https://wangwei.one/posts/net...
前面的一些章节,我们分析了Netty的三大组件 —— Channel 、EventLoop、Pipeline ,对Netty的工作原理有了深入的了解。在此基础上,我们来分析一下当Netty服务端启动后,Netty是如何处理新连接接入的。
本文内容主要分为以下四部分:
- 新连接检测
- NioSocketChannel创建
- NioSocketChannel初始化与注册
- NioSocketChannel注册READ兴趣集
新连接检测
前面,我们在讲 EventLoop的启动过程源码分析 时,解读过下面这段代码:
public final class NioEventLoop extends SingleThreadEventLoop { ... private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... try { ... if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 读取read事件 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } ... } ... }
我们还是以服务端 NioServerSocketChannel 为例,它绑定的unsafe实例为 NioMessageUnsafe 。上面的 unsafe.read()
接口,会向下调用到 NioMessageUnsafe.read() 接口,如下:
public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { // 用于保存新建立的 NioSocketChannel 的集合 private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { // 确保在当前线程与EventLoop中的一致 assert eventLoop().inEventLoop(); // 获取 NioServerSocketChannel config配置 final ChannelConfig config = config(); // 获取 NioServerSocketChannel 绑定的 pipeline final ChannelPipeline pipeline = pipeline(); // 获取RecvByteBuf 分配器 Handle // 当channel在接收数据时,allocHandle 会用于分配ByteBuf来保存数据 // 关于allocHandle后面再去做详细介绍 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); // 重置已累积的所有计数器,并为下一个读取循环读取多少消息/字节数据提供建议 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 调用后面的 doReadMessages 接口,读取到message则返回1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } // 对当前read循环所读取到的message数量计数+1 allocHandle.incMessagesRead(localRead); // 判断是否继续读取message } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 调用pipeline传播ChannelRead事件 pipeline.fireChannelRead(readBuf.get(i)); } // 清空readBuf readBuf.clear(); allocHandle.readComplete(); // 调用pipeline传播 ChannelReadComplete 事件 pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } } ... }
对于 doReadMessages(...)
的分析:
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { ... // 读取消息 @Override protected int doReadMessages(List<Object> buf) throws Exception { // 获取 SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { // 使用SocketChannel创建NioSocketChannel,将其存入buf list中 // 关于NioSocketChannel的创建请看后面的分析 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } ... }
对于 continueReading()
接口的分析,至于结果为什么返回false,后面会单独分析:
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator { private volatile int maxMessagesPerRead; private volatile boolean respectMaybeMoreData = true; ... public abstract class MaxMessageHandle implements ExtendedHandle { private ChannelConfig config; // 每次读取最大的消息数 private int maxMessagePerRead; private int totalMessages; private int totalBytesRead; private int attemptedBytesRead; private int lastBytesRead; private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData; private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { return attemptedBytesRead == lastBytesRead; } }; ... // 判断是否继续读取message @Override public boolean continueReading() { return continueReading(defaultMaybeMoreSupplier); } // 判断是否继续读取message @Override public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { // 默认情况下 config.isAutoRead() 为true // respectMaybeMoreData 默认为 true // maybeMoreDataSupplier.get() 为false // totalMessages第一次循环则为1 // maxMessagePerRead为16 // 结果返回false return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; } ... } ... }
NioSocketChannel创建
上面分析新连接接入,提到了 NioSocketChannel 的创建,我们这里来详细分析一下,NioSocketChannel的创建过程与此前我们分析 NioServerSocketChannel创建 大体类似。
构造器
先来看看 NioSocketChannel 的构造函数:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { ... public NioSocketChannel(Channel parent, SocketChannel socket) { // 调用父类构造器 super(parent, socket); // 创建NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()); } ... }
父类 AbstractNioByteChannel 构造器:
public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { // 调用父类构造器,并设置兴趣集为SelectionKey.OP_READ,对read事件感兴趣 super(parent, ch, SelectionKey.OP_READ); } ... }
父类 AbstractNioChannel 构造器:
public abstract class AbstractNioChannel extends AbstractChannel { ... protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // 调用父类构造器 super(parent); // 设置channel this.ch = ch; // 设置兴趣集 this.readInterestOp = readInterestOp; try { // 设置为非阻塞 ch.configureBlocking(false); } catch (IOException e) { ... } } }
父类 AbstractChannel 构造器:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... protected AbstractChannel(Channel parent) { // 设置parent this.parent = parent; // 创建channelId id = newId(); // 创建unsafe unsafe = newUnsafe(); // 创建pipeline pipeline = newChannelPipeline(); } ... }
ChannelConfig创建
接着我们看看 NioSocketChannelConfig 的创建逻辑:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { ... private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { // 调用父类构造器 super(channel, javaSocket); calculateMaxBytesPerGatheringWrite(); } ... }
父类 DefaultSocketChannelConfig 构造器:
public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { ... public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { // 调用父类构造器,绑定socketchannel super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } // 绑定java socket this.javaSocket = javaSocket; // Enable TCP_NODELAY by default if possible. // netty一般运行在服务器上,不在Android上,canEnableTcpNoDelayByDefault返回true if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { // 开启 TCP_NODELAY ,开启TCP的nagle算法 // 尽量不要等待,只要发送缓冲区中有数据,并且发送窗口是打开的,就尽量把数据发送到网络上去。 setTcpNoDelay(true); } catch (Exception e) { // Ignore. } } } ... }
NioSocketChannel初始化与注册
上面小节分析了NioSocketChannel的创建逻辑,创建完成之后,我们来分析一下NioSocketChannel是如何注册到NioEventLoop上去的。
在前面小节分析新连接检测的有如下小段代码:
private final class NioMessageUnsafe extends AbstractNioUnsafe { ... int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 调用pipeline传播ChannelRead事件 pipeline.fireChannelRead(readBuf.get(i)); } ... }
调用pipeline传播ChannelRead事件,这里的Pipeline是服务端Channel,也就是NioServerSocketChannel所绑定的Pipeline,此时的Pipeline的内部结构是怎么样子的呢?
那这个 ServerBootstrapAcceptor 是从哪里来的呢?
在此前,我们分析 NioServerSocketChannel初始化 时,有过下面这段代码:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... // NioServerSocketChannel初始化 void init(Channel channel) throws Exception { // 获取启动器 启动时配置的option参数,主要是TCP的一些属性 final Map<ChannelOption<?>, Object> options = options0(); // 将获得到 options 配置到 ChannelConfig 中去 synchronized (options) { setChannelOptions(channel, options, logger); } // 获取 ServerBootstrap 启动时配置的 attr 参数 final Map<AttributeKey<?>, Object> attrs = attrs0(); // 配置 Channel attr,主要是设置用户自定义的一些参数 synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipeline ChannelPipeline p = channel.pipeline(); // 将启动器中配置的 childGroup 保存到局部变量 currentChildGroup final EventLoopGroup currentChildGroup = childGroup; // 将启动器中配置的 childHandler 保存到局部变量 currentChildHandler final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; // 保存用户设置的 childOptions 到局部变量 currentChildOptions synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } // 保存用户设置的 childAttrs 到局部变量 currentChildAttrs synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 获取启动器上配置的handler ChannelHandler handler = config.handler(); if (handler != null) { // 添加 handler 到 pipeline 中 pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { // 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor // 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去 // 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } ... }
ServerBootstrapAcceptor
NioServerSocketChannel初始化时,向NioServerSocketChannel所绑定的Pipeline添加了一个InboundHandler节点 —— ServerBootstrapAcceptor ,其代码如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { // 子EventLoopGroup,即为workGroup private final EventLoopGroup childGroup; // ServerBootstrap启动时配置的 childHandler private final ChannelHandler childHandler; // ServerBootstrap启动时配置的 childOptions private final Entry<ChannelOption<?>, Object>[] childOptions; // ServerBootstrap启动时配置的 childAttrs private final Entry<AttributeKey<?>, Object>[] childAttrs; private final Runnable enableAutoReadTask; // 构造函数 ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; // Task which is scheduled to re-enable auto-read. // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may // not be able to load the class because of the file limit it already reached. // // See https://github.com/netty/netty/issues/1328 enableAutoReadTask = new Runnable() { @Override public void run() { channel.config().setAutoRead(true); } }; } // 处理Pipeline所传播的channelRead事件 // 也就是前面新连接检测时看到的那段代码 // pipeline.fireChannelRead(readBuf.get(i)); // ServerBootstrapAcceptor的channelRead接口将会被调用,用于处理channelRead事件 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { // 获取传播事件的对象数据,即为前面的readBuf.get(i) // readBuf.get(i)取出的对象为 NioSocketChannel final Channel child = (Channel) msg; // 向 NioSocketChannel 添加childHandler,也就是我们常看到的 // ServerBootstrap在启动时配置的代码: // ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} ) // 最终的结果就是向NioSocketChannel的Pipeline添加用户自定义的ChannelHandler // 用于处理客户端的channel连接 child.pipeline().addLast(childHandler); // 配置 NioSocketChannel的TCP属性 setChannelOptions(child, childOptions, logger); // 配置 NioSocketChannel 一些用户自定义数据 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } // 将NioSocketChannel注册到childGroup,也就是Netty的WorkerGroup当中去 try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } ... } ... }
关于 ChannelInitializer 的讲解,可以看此前 Pipeline源码分析 文章。
后面的register逻辑,就与我们前面讲解 NioServerSocketChannel注册 大体类似了,这里简单介绍一下。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... // 注册NioSocketChannel // eventLoop为childGroup @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... // 绑定eventLoop到NioSocketChannel上 AbstractChannel.this.eventLoop = eventLoop; // 现在分析的逻辑是在服务端的线程上,eventLoop与主线程不同,返回false if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { // 这里来调用register0方法 register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // 注册 private void register0(ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; // 调用 doRegister() doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 服务端的NioServerSocketChannel已经与客户端的NioSocketChannel建立了连接 // 所以,NioSocketChannel是处于激活状态,isActive()返回ture if (isActive()) { // 对于新连接,是第一次注册 if (firstRegistration) { // 传播ChannelActive事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } ... } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } ... }
调用到NioSocketChannel中的doRegister()方法:
public abstract class AbstractNioChannel extends AbstractChannel { ... @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 将selector注册到底层JDK channel上,并附加了NioSocketChannel对象 // 兴趣集设置为0,表示不关心任何事件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ... } } } ... }
NioSocketChannel 注册OP_READ兴趣集
紧接着上面的分析,传播ChannelActive事件之后的逻辑,主要就是向客户端的NioSocketChannel注册一个Read兴趣集
if (isActive()) { // 对于新连接,是第一次注册 if (firstRegistration) { // 传播ChannelActive事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } }
通过 Pipeline的传播机制 ,最终会调用到doBeginRead()接口,如下:
public abstract class AbstractNioChannel extends AbstractChannel { ... protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { ... @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called // 保存selectionKey到局部变量 final SelectionKey selectionKey = this.selectionKey; // 判断有效性 if (!selectionKey.isValid()) { return; } readPending = true; // 获取selectionKey的兴趣集 // 前面小结分析doRegister()接口提到,selectionKey的兴趣集设置为0 final int interestOps = selectionKey.interestOps(); // 这里的 readInterestOp 是前面讲NioSocketChannel创建时设置的值 // 为 SelectionKey.OP_READ,也就是1 if ((interestOps & readInterestOp) == 0) { // 这样,selectionKey最终设置的兴趣集为SelectionKey.OP_READ // 表示对读事件感兴趣 selectionKey.interestOps(interestOps | readInterestOp); } } ... } ... }
小结
- Netty是在哪里检测有新连接接入的?
- 新连接是怎样注册到NioEventLoop线程上的?