我是怎样阅读 Netty Channel 源码的
Channel 功能说明
我在使用 ServerBootstrap
来创建服务的时候通过 channel(NioServerSocketChannel.class)
来设置 Channel
, 那么 Channel
的主要作用是什么呢?
在分析 Channel
之前, 需要先弄懂 NioServerSocketChannel
类, 它的功能对于 JDK NIO 类库中的 ServerSocketChannel
类, 它是用来监听TCP连接的(并不管读写).
而 NioServerSocketChannel
最主要就是实现 Channel
接口. Channel
接口, 采用 Facade
模式进行统一封装, 将网络的读、写, 客户端发起连接, 主动关闭连接, 链路关闭, 获取通信双方的网络地址等.
Channel
接口下有一个重要的抽象类 AbstractChannel
, 一些公共的基础方法都在这个抽象类中实现, 而特定一些功能, 都可以通过各个不同的实现类去实现, 这样最大程度的实现功能和接口的重用.
为什么不实用 JDK NIO 原生的 Channel
而要另起炉灶呢?
- JDK
SocketChannel
和ServerSocketChannel
没有统一的Channel
. - JDK 的
SocketChannel
和ServerSocketChannel
的主要职责就是网络 IO 操作, 由于他们的 SPI 类接口, 由具体的虚拟机厂家提供, 所以通过继承 SPI 功能类来扩展其功能的难度很大;SocketChannel
和ServerSocketChannel
抽象类, 其工作量和重新开发一个新的Channel
功能类是差不多的. - Netty 的
Channel
需要能够跟 Netty 的整体架构融合在一起, 例如 IO 模型、基于ChannelPipeline
的定制模型, 以及基于元数据描述配置化的 TCP 参数等, 这些 JDK 的SocketChannel
和ServerSocketChannel
都没有提供, 需要重新封装. - 自定义的
Channel
, 功能实现更加灵活.
Channel 方法介绍
Channel read()
: 从当前的 Channel
中读取数据到第一个 inbound
缓冲区中, 如果数据被成功读取, 触发 ChannelHandler.channelRead(ChannelHandlerContext, Object)
事件.
读取操作 API 调用完成之后, 紧接着会触发 ChannelHandler.channelReadComplete(ChannelHandlerContext)
事件, 这样业务的 ChannelHandler
可以决定是否需要继承读取数据. 如果已经有读操作请求被挂起, 则后续的读操作会被忽略.
ChannelFuture write(Object)
: 请求将当前的 msg 通过 ChannelPipeline
写入到目标 Channel
中. 注意, write
操作只是将消息存入到消息发送环数组中, 并没有真正被发送, 只有调用 flush
操作才会被写入到 Channel
中, 发送给对方.
ChannelFuture write(Object, ChannelPromise)
: 功能与 Channel read()
相同, 但是携带了 ChannelPromise
参数负责设置写入操作的结果.
ChannelFuture writeAndFlush(Objec, ChannelPromise)
: 与上面方法功能类似, 不同之处在于它会将消息写入 Channel
中发送, 等价于单独调用 write
和 flush
操作的组合.
Channel flush()
: 将之前写入到发送环形中的消息全部写入到目标 Channel
中, 发送给通信对方.
ChannelFuture close(ChannelPromise)
: 主动关闭当前连接, 通过 ChannelPromise
设置操作结果并执行结果通知, 无论操作是否成功, 都可以通过 ChannelPromise
获取操作结果. 该操作会级联触发 ChannelPipeline
中所有 ChannelHandler
的 ChannelHandler.close(ChannelHandlerContext, ChannelPromise)
事件.
ChannelFuture disconnect(ChannelPromise)
: 请求断开与远程通信对端的连接并使用 ChannelPromise
来获取操作结果的通知消息. 该方法会级联触发 ChannelHandler.disconnect(ChannelHandlerContext, ChannelPromise)
事件.
ChannelFuture connect(SocketAddress remoteAddress)
: 客户端使用指定的服务端地址 remoteAddress
发起连接请求, 如果连接因为应答超时而失败, ChannelFuture
中的操作结果就是 ConnectTimeoutException
异常; 如果连接被拒绝, 操作结果为 ConnectException
. 该方法会级联触发 ChannelHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
事件.
ChannelFuture bind(SocketAddress localAddress)
: 绑定指定的本地 Socket 地址 localAddress
, 该方法会级联触发 ChannelHandler.bind(ChannelHandlerContext, SocketAddress, ChannelPromise)
事件.
ChannelConfig config()
: 获取当前 Channel
的配置信息, 例如 CONNECT_TIMEOUT_MILLIS
.
boolean isOpen()
: 判断当前 Channel
是否已经打开.
boolean isRegistered()
: 判断当前 Channel
是否已经注册到 EventLoop
上.
boolean isActive()
: 判断当前 Channel
是否已经处于激活状态.
ChannelMetadata metadata()
: 获取当前 Channel 的元数据描述信息, 包括 TCP 参数配置等.
SocketAddress localAddress()
: 获取当前 Channel 的本地绑定地址.
SocketAddress remoteAddress()
: 获取当前 Channel 通信的远程 Socket 地址.
eventLoop()
: Channel
需要注册到 EventLoop
的多路复用器上, 用于处理 IO 事件, 通过 eventLoop()
方法可以获取到 Channel 上注册的 EventLoop. EventLoop 本质上就是处理网络读写事件的 Reactor 线程. 也可以用来执行定时任务和用户自定义 NioTask 等任务.
id()
: 它返回 ChannelId 对象, 是 Channel 的唯一标识, 它的可能生成策略如下:
- 机器的 MAC 地址(EUI-48 或 EUI-64) 等可以代表全局唯一的信息;
- 当前的进程 ID;
- 当前系统的毫秒-- System.currentTimeMillis();
- 当前系统的纳秒-- System.nanoTime();
- 32 位的随机整数;
- 32 位自增的序列数.
AbstractChannel 源码分析
AstractChannel
聚合了所有 Channel 使用到的对象, 由 AbstractChannel
提供初始化和统一封装, 如果功能和具体子类相关, 则定义成抽象方法由子类具体实现.
static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException(); static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException(); static { CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } private MessageSizeEstimator.Handle estimatorHandle; private final Channel parent; private final ChannelId id = DefaultChannelId.newInstance(); private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true); private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this); ....
protected AbstractChannel(Channel parent, EventLoop eventLoop) { this.parent = parent; this.eventLoop = validate(eventLoop); unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
每一个 Channel 中包含一个 ChannelPipeline. 在 AbstractChannel 的构造方法中初始化. AbstractChannel 也提供了一些公共 API 的具体实现, 例如 localAddress()
和 remoteAddress()
.
当 Channel 进行 IO 操作时会产生对应的 IO 事件, 然后驱动事件在 ChannelPipeline 中传播, 由对应的 ChannelHandler 对事件进行拦截和处理, 不关心的事件可以直接忽略.
网络 IO 操作直接调用 DefaultChannelPipeline 的相关方法, 由 DefaultChannelPipeline 中对应的 ChannelHandler 进行具体逻辑的处理.