Netty(二、深入理解)

reactor模式

在深入了解Netty之前,我们需要先知道reactor(反应器模式),是高性能网络编程必须知道的模式。

BIO

我们先了解下原始socket编程:

//这里可以是个多线程,每个线程对应一个socket,循环处理业务,此处代码就略了,主要讲逻辑while (true){       //new Thread()...
            //Server监听指定端口
            ServerSocket server = new ServerSocket(8080);
            //socket阻塞,一直等待着连接到来
            Socket socket = server.accept();
            //从socket获取输入流
            InputStream inputStream = socket.getInputStream();
            //建立缓冲区进行读取
            byte[] bytes = new byte[1024];
            int len;
            StringBuilder sb = new StringBuilder();
            while ((len = inputStream.read(bytes)) != -1) {
                //指定编码格式
                sb.append(new String(bytes, 0, len,"UTF-8"));
            }
            System.out.println(sb);
            inputStream.close();
            socket.close();
            server.close();
}

以服务端为例,Socket建立好后不断循环监听是否有套接字连接,获取到连接后,从socket获取输入流。在发送/接收数据时,并不是直接从网络中读取或发送,而是要通过缓冲区,例如:发送数据时,现将数据写入缓冲区,然后再由TCP/IP协议将数据由缓冲区发送目标的缓冲区,目标从缓冲区中读取。

这种多线程的socket虽然通过一个线程一个socket的方式,提高了服务器的吞吐,但每个线程内部还是阻塞的,当并发量大时,线程的反复创建和销毁会对系统造成巨大的负担。针对这种情况,我们就需要用到reactor模式。

单线程reactor模式

reactor模式,基于java NIO之上,抽象出了两个组件:Reactor和Handler。

Reactor:负责响应IO事件,如新事件的连接、读写,将事件交给Handler处理。

Handler:负责事件的处理,完成channel的读取,事件逻辑处理,channel的写出。

Reactor

package com.wk.test.nettyTest;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Reactor implements Runnable {

    //选择器
    final Selector selector;
    //服务端通道
    final ServerSocketChannel serverChannel;

    //构造函数初始化
    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverChannel = ServerSocketChannel.open();
        //绑定连接
        serverChannel.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverChannel.configureBlocking(false);
        //将服务端的通道绑定到选择器上面,并定义事件为接收连接时间
        //OP_ACCEPT:接收连接就绪事件,服务端监听到客户端,可接收连接 1<<4
        //OP_CONNECT:连接就绪事件,表示客户端与服务端建立连接成功 1<<3
        //OP_READ:读就绪事件,表示通道中有可读数据,可执行读操作 1<<0
        //OP_WRITE:写就绪事件,表示可以向通道写数据 1<<2
        SelectionKey selectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        //选择键通过attach方法附加一个对象
        selectionKey.attach(new Acceptor());

    }



    @Override
    public void run() {
        //不中断的线程则循环,interrupted方法,判断线程是否中断,并能释放已经中断的线程
        while (!Thread.interrupted()){
            try {
                //这里每一个request封装一个channel,所有的channel注册在一个选择器上,selector选择器不断轮询查看可读状态
                selector.select();
                //获取选择器的选择键集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey selectedKey = iterator.next();
                    //attachement方法可以获取attach方法附加的对象,这里就是前面附加进来的Handler对象,也就是事件处理类
                    Runnable r = (Runnable) selectedKey.attachment();
                    if(r!=null){
                        r.run();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    class Acceptor implements Runnable{

        @Override
        public void run() {
            try {
                //获取已连接上的channel通道
                SocketChannel channel = serverChannel.accept();
                if(channel!=null){
                    //自定义Handler,事件处理类,将通道绑定到选择器上面
                    new Handler(selector,channel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Handler

package com.wk.test.nettyTest;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Handler implements Runnable {

    //通道
    final SocketChannel channel;
    //绑定到选择器的选择键
    final SelectionKey selectionKey;
    //定义输入输出缓冲区
    ByteBuffer inputBuffer = ByteBuffer.allocate(102400);
    ByteBuffer outputBuffer = ByteBuffer.allocate(102400);
    static final boolean READING = true, WRITING = false;
    //初始化定义可读就绪
    boolean status = READING;

    Handler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        //非阻塞
        c.configureBlocking(false);
        //这里将通道注册到选择器上,本应后面的int是 1(读),4(写),8(连接),16(可连接)的
        //这种操作貌似是判断JDK的selector有没有立即返回或报错,并不引起任何实质操作。
        //https://github.com/netty/netty/issues/1836 这个讨论问题的地址,外国友人貌似也搞不懂,似乎是个JDK NIO的BUG
        selectionKey = channel.register(selector, 0);
        //选择键将本身也就是Handler附加
        selectionKey.attach(this);
        //定义当前选择键是读就绪状态
        selectionKey.interestOps(SelectionKey.OP_READ);
        //唤醒选择器
        selector.wakeup();
    }

    @Override
    public void run() {
        try {
            if (status) {
                read();
            } else {
                write();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void read() throws IOException {
        channel.read(inputBuffer);
        //一系列逻辑判定和处理
        status = WRITING;
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    public void write() throws IOException {
        channel.write(outputBuffer);
        //判定写操作执行完毕后,关闭selectKey
        selectionKey.cancel();
    }
}

Netty(二、深入理解)

  客户端每个请求都封装成一个channel通道连接到selector上面,并有一个selectionKey选择键,选择器的附加对象是Handler处理器,将请求分派到handler中。

单线程的缺点是当Handler阻塞时,会导致其他client的请求也阻塞,这种实际使用不多,一般使用多线程的reactor模式。

多线程reactor模式

多线程是将handler放入一个线程池,多线程的进行业务处理

Netty(二、深入理解)

 具体代码就不展示了,也就是在Handler中建立一个线程池来进行读写操作。

ps:以上就是基于java NIO的reactor模式,虽然逻辑有些复杂且不易理解。但是在理解Netty之前一定要先理解它。

Netty DEMO

我们先将demo代码贴上来,通过代码来对Netty进行理解。

服务端

NettyServerTest

package com.wk.test.nettyTest;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerTest {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);

    public static void main(String[] args) {

        //实例化两个线程组
        //处理服务器与客户端的连接
        EventLoopGroup pGroup = new NioEventLoopGroup(1);
        //进行网络通信(读写)
        EventLoopGroup cGroup = new NioEventLoopGroup(10);
        //配置容器,配置相关信息
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(pGroup,cGroup)                                   //绑定两个线程组
                .channel(NioServerSocketChannel.class)                  //指定NIO的模式
                .childHandler(new ChannelInitializer<SocketChannel>() { //配置业务处理类
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //解码器
                        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                        //编码器
                        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                        //自定义事件处理器
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 1024)         //设置TCP缓冲区
                .childOption(ChannelOption.SO_KEEPALIVE, true); //保持连接
        try {
            //绑定端口启动
            ChannelFuture channelFuture = bootstrap.bind(8090).sync();
            logger.info("服务器启动开启监听端口:{}",8090);
            //等待关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //Netty优雅退出
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }

    }
}

NettyServerHandler

package com.wk.test.nettyTest;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 服务端事件处理器,基础入站处理器类
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

    /**
     * 客户端连接时触发
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("Channel active");
    }

    /**
     * 客户端发送消息时触发
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info("服务端接收的消息:{}", msg.toString());
        ctx.write("服务器返回");
        ctx.flush();
    }

    /**
     * 发生异常时触发
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

客户端

NettyClientTest

package com.wk.test.nettyTest;

import cn.jiguang.common.connection.NettyClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClientTest {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);

    public static void main(String[] args) {
        //客户端只需要定义一个读写的线程组
        EventLoopGroup group = new NioEventLoopGroup();
        //客户端是bootstrap,其他和服务端配置大同小异
        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .option(ChannelOption.TCP_NODELAY, true)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        try {
            //连接服务器地址
            ChannelFuture future = bootstrap.connect("127.0.0.1",8090).sync();
            logger.info("客户端启动成功");
            //发送信息
            future.channel().writeAndFlush("你好啊").sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //优雅关闭
            group.shutdownGracefully();
        }
    }
}

NettyClientHandler

package com.wk.test.nettyTest;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 客户端处理类,继承入站处理适配器
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("客户端Active .....");
    }

    /**
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info("客户端接收的消息:{}", msg.toString());
    }

    /**
     * 发生异常时触发
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 EventLoop

//实例化两个线程组
//处理服务器与客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//进行网络通信(读写)
EventLoopGroup workerGroup = new NioEventLoopGroup(10);

以服务端为例,设置两个线程组。

一个线程组负责监听连接的parentChannel,定义为BossLoopGroup

另一个线程组负责客户端连接读写的childChannel,定义为WorkerLoopGroup

一个线程封装到一个EventLoop,多个EventLoop就组成了线程组。而每一个channel绑定一个EventLoop,一个EventLoop可以有多个channel。

Netty(二、深入理解)

 Bootstrap

Bootstrap是Netty提供的一个工厂类,我们可以通过它来完成对Netty服务端或客户端的初始化配置,这样我们就省去了用JDK NIO繁琐的创建channel、设置、启动等步骤,将重心放在事件业务处理上面。

Bootstrap分为服务端的ServerBootstrap和客户端的Bootstrap

Bootstrap执行分为8个步骤:

ServerBootstrap bootstrap = new ServerBootstrap()
                //1.设置reactor线程
                .group(bossGroup,workerGroup)
                //2.设置channel通道的类型,这里是NIO
                .channel(NioServerSocketChannel.class)
                //3.设置监听端口
                .localAddress(new InetSocketAddress(8090))
                //4.设置通道的选项
                .option(ChannelOption.SO_BACKLOG, 1024)         //设置TCP缓冲区
                .childOption(ChannelOption.SO_KEEPALIVE, true) //心跳检测保持连接
                //5.配发事件处理器流水线
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //解码器
                        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                        //编码器
                        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                        //自定义事件处理器
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });

        try {
            //6.绑定servr,这里使用了sync方法,直到绑定成功为止
            ChannelFuture channelFuture = bootstrap.bind().sync();
            logger.info("服务器启动开启监听端口:{}",8090);
            //7.等待关闭,直到channel关闭为止
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //8.Netty优雅退出
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

这8个步骤和我上面写的DEMO顺序方面可能稍有不同,不过不影响。

Channel

核心概念以及流程

Channel是服务端与客户端的通信通道,每一个request都可以封装成一channel。

ChannelPipeline是用于存放Handler的容器,里面存放这事件处理器流水线。

ChannelHandler是处理器,分为入站处理器和出站处理器,以客户端的角度来看,客户端到服务端是出站,服务端到客户端就是入站。

ChannelContext是通信管道的上下文,当一个入站或出站处理器处理完后,将上下文传给下一个入站或出站处理器。

Netty(二、深入理解)

 ChannelHandler

//5.配发事件处理器流水线
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //解码器
                        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                        //编码器
                        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                        //自定义事件处理器
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });

这里就是配发事件处理器流水线,编码器实质上也是个处理器,出站编码,入站解码。

自定义处理器也要继承出站或入站的事件处理配置类

public class NettyServerHandler extends ChannelInboundHandlerAdapter

ByteBuf

数据在网络中传输并不是直接传输的,而是要通过缓冲区。写出时,先将数据写到缓冲区,再由TCP协议将数据从缓冲区。读取时也是一样,从缓冲区读取。

Netty(二、深入理解)

 JAVA NIO中的缓冲区是ByteBuff,长度固定且只有一个索引,在读写操作的时候还需要切换读写状态。而Netty的ByteBuf则改良了这些问题。

在ByteBuf中,提供了三个索引,读索引(readIndex)、写索引(writeIndex)、最大容量(maxCapacity)

缓冲区的释放

Netty(二、深入理解)

我们再看看这张图,入站时,当走到tailHandler(最后一个Handler)的时候,会释放掉缓冲区。出站则是在headHandler释放。