Netty channelRegisteredChannelActive---源码分析
背景
最近发现ChannelOutboundHandlerAdapter的read()回调方法,在连接创建成功和读取数据后都会被回调。因此就产生了疑问“为什么建立连接和读取数据后read()方法会被调用呢?” 从网上搜索到一片文章https://my.oschina.net/lifany... 可以看出一些端倪,但是具体流程和一些疑问还是没有解开。
那我也尝试着从源码找到答案吧。
Demo演示
我们先写个小Demo,其中Test1OutboundHandlerAdapter是一个ChannelOutboundHandlerAdapter,里面的read()添加一行打印。 Test1HandlerAdapter 是一个ChannelInboundHandlerAdapter 里面的channelActive(xxx)、
channelRead(xxx)、channelReadComplete(xxx)添加打印。由于很简单,下面只贴部分代码
Test1OutboundHandlerAdapter.java
@Override public void read(ChannelHandlerContext ctx) throws Exception { super.read(ctx); System.out.println("Test1OutboundHandlerAdapter------------->read"); }
Test1HandlerAdapter.java
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); System.out.println("Test1HandlerAdapter-------------->channelRegistered"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); System.out.println("Test1HandlerAdapter-------------->channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Test1HandlerAdapter-------------->channelRead"); ctx.writeAndFlush(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); }
然后我们建立连接,随便发一下数据,服务器收到数据,打印如下:
Test1HandlerAdapter-------------->handlerAdded
Test1HandlerAdapter-------------->channelRegistered
Test1HandlerAdapter-------------->channelActive
Test1OutboundHandlerAdapter------------->read
Test1HandlerAdapter-------------->channelRead
Test1HandlerAdapter-------------->channelReadComplete
Test1OutboundHandlerAdapter------------->read
如果把Test1OutboundHandlerAdapter的read(xxx)回调方法注释掉,会发现服务器无法接收数据了。
源码分析
1.channelRegistered回调流程分析
可以定位到在AbstractChannelHandlerContext invokeChannelRegistered()方法调用了channelRegistered(xxx)方法,然后再查找会发现是
AbstractChannelHandlerContext的fireChannelRegistered()----->
invokeChannelRegistered(final AbstractChannelHandlerContext next)----->invokeChannelRegistered()
AbstractChannelHandlerContext
@Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; } static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } private void invokeChannelRegistered() { if (invokeHandler()) { try { /**ChannelInboundHandler的register(xxx)在这里被调用*/ ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
顺藤fireChannelRegistered()摸瓜,最终定位到AbstractChannel内部类AbstractUnsafe的
register(EventLoop eventLoop, final ChannelPromise promise)----->register0(ChannelPromise promise)
AbstractUnsafe
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
在继续找的会找到在EventLooop层面的调用了,我们可以先不用管了。在register0的方法中又调用了pipeline.fireChannelRegistered()和pipeline.fireChannelActive();,这正是我们要找的,也符合打印顺序先channelRegistered后channelActive。为了验证我们可以加上断点调试,就是这儿了。
至此我们可以总结一下:
channelRegistered流程:
说明
- DefaultChannelPipeline 的fireChannelRegistered()
@Override public final ChannelPipeline fireChannelUnregistered() { AbstractChannelHandlerContext.invokeChannelUnregistered(head); return this; }
AbstractChannelHandlerContext.invokeChannelUnregistered(head);传递的参数是DefaultChannelPipeline的head,这样保证了register事件沿着pipeline从头流向尾,其对应DefaultChannelPipeline内部类HeadContext。 HeadContext多重身份即是ChannelHandlerContext又是ChannelInboundHandler和ChannelOutboundhandler
DefaultChannelPipeline
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ...省略... protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
DefaultChannelPipeline的内部类HeadContext
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } 省略后边的代码
2.上图黄色的部分都是调用的HeadContext中的方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next)接收的参数是DefaultChannelPipeline传递的head即HeadContext,那么也就是head.invokeChannelRegistered()。
invokeChannelRegistered()方法中会调用
((ChannelInboundHandler) handler()).channelRegistered(this);
HeadContext类中该方法返回的就是自己(可查看上面的代码),因为HeadContext本身也是ChannelInboundHandler。 同时又将自己作为参数,调用自己的channelRegistered方法
3.HeadContext的ChannelRegister方法中调用AbstractChannelHandlerContext的fireChannelRegistered();
(还是调用的自己)该方法中调用了invokeChannelRegistered(findContextInbound()); findContextInbound()所实现的功能就是查找到下一个ChanelInboundHandler即HeadContext(本身是ChannelInboundHandler)下一个ChanelInboundHandler
上面的步骤不断重复,自此registered事件可以沿着pipeline在不同的InboundHandler里流动了。
2.channelActive回调流程分析
channelActive的回调流程和channelRegister流程没有什么区别,可参考上文分析。 但是在HeadContext的channelActive方法中会调用readIfIsAutoRead(); 这个是读数据的关键
3.netty读数据分析
读数据分析