Netty channelRegisteredChannelActive---源码分析

背景

最近发现ChannelOutboundHandlerAdapter的read()回调方法,在连接创建成功和读取数据后都会被回调。因此就产生了疑问“为什么建立连接和读取数据后read()方法会被调用呢?” 从网上搜索到一片文章https://my.oschina.net/lifany... 可以看出一些端倪,但是具体流程和一些疑问还是没有解开。
那我也尝试着从源码找到答案吧。

Demo演示

我们先写个小Demo,其中Test1OutboundHandlerAdapter是一个ChannelOutboundHandlerAdapter,里面的read()添加一行打印。 Test1HandlerAdapter 是一个ChannelInboundHandlerAdapter 里面的channelActive(xxx)、
channelRead(xxx)、channelReadComplete(xxx)添加打印。由于很简单,下面只贴部分代码

Test1OutboundHandlerAdapter.java

@Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        super.read(ctx);
        System.out.println("Test1OutboundHandlerAdapter------------->read");
    }

Test1HandlerAdapter.java

@Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        System.out.println("Test1HandlerAdapter-------------->channelRegistered");
    }
   
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("Test1HandlerAdapter-------------->channelActive");
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Test1HandlerAdapter-------------->channelRead");
        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

然后我们建立连接,随便发一下数据,服务器收到数据,打印如下:

Test1HandlerAdapter-------------->handlerAdded
Test1HandlerAdapter-------------->channelRegistered
Test1HandlerAdapter-------------->channelActive
Test1OutboundHandlerAdapter------------->read

Test1HandlerAdapter-------------->channelRead
Test1HandlerAdapter-------------->channelReadComplete
Test1OutboundHandlerAdapter------------->read

如果把Test1OutboundHandlerAdapter的read(xxx)回调方法注释掉,会发现服务器无法接收数据了。

源码分析

1.channelRegistered回调流程分析

Netty channelRegisteredChannelActive---源码分析

可以定位到在AbstractChannelHandlerContext invokeChannelRegistered()方法调用了channelRegistered(xxx)方法,然后再查找会发现是
AbstractChannelHandlerContext的fireChannelRegistered()----->
invokeChannelRegistered(final AbstractChannelHandlerContext next)----->invokeChannelRegistered()

AbstractChannelHandlerContext

@Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                /**ChannelInboundHandler的register(xxx)在这里被调用*/
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

顺藤fireChannelRegistered()摸瓜,最终定位到AbstractChannel内部类AbstractUnsafe的
register(EventLoop eventLoop, final ChannelPromise promise)----->register0(ChannelPromise promise)

AbstractUnsafe

@Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

在继续找的会找到在EventLooop层面的调用了,我们可以先不用管了。在register0的方法中又调用了pipeline.fireChannelRegistered()和pipeline.fireChannelActive();,这正是我们要找的,也符合打印顺序先channelRegistered后channelActive。为了验证我们可以加上断点调试,就是这儿了。

Netty channelRegisteredChannelActive---源码分析

至此我们可以总结一下:
channelRegistered流程:

Netty channelRegisteredChannelActive---源码分析

说明

  1. DefaultChannelPipeline 的fireChannelRegistered()
@Override
    public final ChannelPipeline fireChannelUnregistered() {
        AbstractChannelHandlerContext.invokeChannelUnregistered(head);
        return this;
    }

AbstractChannelHandlerContext.invokeChannelUnregistered(head);传递的参数是DefaultChannelPipeline的head,这样保证了register事件沿着pipeline从头流向尾,其对应DefaultChannelPipeline内部类HeadContext。 HeadContext多重身份即是ChannelHandlerContext又是ChannelInboundHandler和ChannelOutboundhandler

DefaultChannelPipeline

final AbstractChannelHandlerContext head;
 final AbstractChannelHandlerContext tail;
 
...省略...

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

DefaultChannelPipeline的内部类HeadContext

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }
省略后边的代码

2.上图黄色的部分都是调用的HeadContext中的方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next)接收的参数是DefaultChannelPipeline传递的head即HeadContext,那么也就是head.invokeChannelRegistered()
invokeChannelRegistered()方法中会调用
((ChannelInboundHandler) handler()).channelRegistered(this);
HeadContext类中该方法返回的就是自己(可查看上面的代码),因为HeadContext本身也是ChannelInboundHandler。 同时又将自己作为参数,调用自己的channelRegistered方法

3.HeadContext的ChannelRegister方法中调用AbstractChannelHandlerContext的fireChannelRegistered();
(还是调用的自己)该方法中调用了invokeChannelRegistered(findContextInbound()); findContextInbound()所实现的功能就是查找到下一个ChanelInboundHandler即HeadContext(本身是ChannelInboundHandler)下一个ChanelInboundHandler
上面的步骤不断重复,自此registered事件可以沿着pipeline在不同的InboundHandler里流动了。

2.channelActive回调流程分析

channelActive的回调流程和channelRegister流程没有什么区别,可参考上文分析。 但是在HeadContext的channelActive方法中会调用readIfIsAutoRead(); 这个是读数据的关键

3.netty读数据分析

读数据分析

相关推荐