Netty(二、深入理解)
reactor模式
在深入了解Netty之前,我们需要先知道reactor(反应器模式),是高性能网络编程必须知道的模式。
BIO
我们先了解下原始socket编程:
//这里可以是个多线程,每个线程对应一个socket,循环处理业务,此处代码就略了,主要讲逻辑while (true){ //new Thread()... //Server监听指定端口 ServerSocket server = new ServerSocket(8080); //socket阻塞,一直等待着连接到来 Socket socket = server.accept(); //从socket获取输入流 InputStream inputStream = socket.getInputStream(); //建立缓冲区进行读取 byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { //指定编码格式 sb.append(new String(bytes, 0, len,"UTF-8")); } System.out.println(sb); inputStream.close(); socket.close(); server.close(); }
以服务端为例,Socket建立好后不断循环监听是否有套接字连接,获取到连接后,从socket获取输入流。在发送/接收数据时,并不是直接从网络中读取或发送,而是要通过缓冲区,例如:发送数据时,现将数据写入缓冲区,然后再由TCP/IP协议将数据由缓冲区发送目标的缓冲区,目标从缓冲区中读取。
这种多线程的socket虽然通过一个线程一个socket的方式,提高了服务器的吞吐,但每个线程内部还是阻塞的,当并发量大时,线程的反复创建和销毁会对系统造成巨大的负担。针对这种情况,我们就需要用到reactor模式。
单线程reactor模式
reactor模式,基于java NIO之上,抽象出了两个组件:Reactor和Handler。
Reactor:负责响应IO事件,如新事件的连接、读写,将事件交给Handler处理。
Handler:负责事件的处理,完成channel的读取,事件逻辑处理,channel的写出。
Reactor
package com.wk.test.nettyTest; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; class Reactor implements Runnable { //选择器 final Selector selector; //服务端通道 final ServerSocketChannel serverChannel; //构造函数初始化 Reactor(int port) throws IOException { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); //绑定连接 serverChannel.socket().bind(new InetSocketAddress(port)); //非阻塞 serverChannel.configureBlocking(false); //将服务端的通道绑定到选择器上面,并定义事件为接收连接时间 //OP_ACCEPT:接收连接就绪事件,服务端监听到客户端,可接收连接 1<<4 //OP_CONNECT:连接就绪事件,表示客户端与服务端建立连接成功 1<<3 //OP_READ:读就绪事件,表示通道中有可读数据,可执行读操作 1<<0 //OP_WRITE:写就绪事件,表示可以向通道写数据 1<<2 SelectionKey selectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT); //选择键通过attach方法附加一个对象 selectionKey.attach(new Acceptor()); } @Override public void run() { //不中断的线程则循环,interrupted方法,判断线程是否中断,并能释放已经中断的线程 while (!Thread.interrupted()){ try { //这里每一个request封装一个channel,所有的channel注册在一个选择器上,selector选择器不断轮询查看可读状态 selector.select(); //获取选择器的选择键集合 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectedKey = iterator.next(); //attachement方法可以获取attach方法附加的对象,这里就是前面附加进来的Handler对象,也就是事件处理类 Runnable r = (Runnable) selectedKey.attachment(); if(r!=null){ r.run(); } } } catch (IOException e) { e.printStackTrace(); } } } class Acceptor implements Runnable{ @Override public void run() { try { //获取已连接上的channel通道 SocketChannel channel = serverChannel.accept(); if(channel!=null){ //自定义Handler,事件处理类,将通道绑定到选择器上面 new Handler(selector,channel); } } catch (IOException e) { e.printStackTrace(); } } } }
Handler
package com.wk.test.nettyTest; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; public class Handler implements Runnable { //通道 final SocketChannel channel; //绑定到选择器的选择键 final SelectionKey selectionKey; //定义输入输出缓冲区 ByteBuffer inputBuffer = ByteBuffer.allocate(102400); ByteBuffer outputBuffer = ByteBuffer.allocate(102400); static final boolean READING = true, WRITING = false; //初始化定义可读就绪 boolean status = READING; Handler(Selector selector, SocketChannel c) throws IOException { channel = c; //非阻塞 c.configureBlocking(false); //这里将通道注册到选择器上,本应后面的int是 1(读),4(写),8(连接),16(可连接)的 //这种操作貌似是判断JDK的selector有没有立即返回或报错,并不引起任何实质操作。 //https://github.com/netty/netty/issues/1836 这个讨论问题的地址,外国友人貌似也搞不懂,似乎是个JDK NIO的BUG selectionKey = channel.register(selector, 0); //选择键将本身也就是Handler附加 selectionKey.attach(this); //定义当前选择键是读就绪状态 selectionKey.interestOps(SelectionKey.OP_READ); //唤醒选择器 selector.wakeup(); } @Override public void run() { try { if (status) { read(); } else { write(); } } catch (IOException e) { e.printStackTrace(); } } public void read() throws IOException { channel.read(inputBuffer); //一系列逻辑判定和处理 status = WRITING; selectionKey.interestOps(SelectionKey.OP_WRITE); } public void write() throws IOException { channel.write(outputBuffer); //判定写操作执行完毕后,关闭selectKey selectionKey.cancel(); } }
客户端每个请求都封装成一个channel通道连接到selector上面,并有一个selectionKey选择键,选择器的附加对象是Handler处理器,将请求分派到handler中。
单线程的缺点是当Handler阻塞时,会导致其他client的请求也阻塞,这种实际使用不多,一般使用多线程的reactor模式。
多线程reactor模式
多线程是将handler放入一个线程池,多线程的进行业务处理
具体代码就不展示了,也就是在Handler中建立一个线程池来进行读写操作。
ps:以上就是基于java NIO的reactor模式,虽然逻辑有些复杂且不易理解。但是在理解Netty之前一定要先理解它。
Netty DEMO
我们先将demo代码贴上来,通过代码来对Netty进行理解。
服务端
NettyServerTest
package com.wk.test.nettyTest; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyServerTest { private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class); public static void main(String[] args) { //实例化两个线程组 //处理服务器与客户端的连接 EventLoopGroup pGroup = new NioEventLoopGroup(1); //进行网络通信(读写) EventLoopGroup cGroup = new NioEventLoopGroup(10); //配置容器,配置相关信息 ServerBootstrap bootstrap = new ServerBootstrap() .group(pGroup,cGroup) //绑定两个线程组 .channel(NioServerSocketChannel.class) //指定NIO的模式 .childHandler(new ChannelInitializer<SocketChannel>() { //配置业务处理类 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //解码器 socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); //编码器 socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); //自定义事件处理器 socketChannel.pipeline().addLast(new NettyServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP缓冲区 .childOption(ChannelOption.SO_KEEPALIVE, true); //保持连接 try { //绑定端口启动 ChannelFuture channelFuture = bootstrap.bind(8090).sync(); logger.info("服务器启动开启监听端口:{}",8090); //等待关闭 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //Netty优雅退出 pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } }
NettyServerHandler
package com.wk.test.nettyTest; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 服务端事件处理器,基础入站处理器类 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); /** * 客户端连接时触发 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Channel active"); } /** * 客户端发送消息时触发 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("服务端接收的消息:{}", msg.toString()); ctx.write("服务器返回"); ctx.flush(); } /** * 发生异常时触发 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端
NettyClientTest
package com.wk.test.nettyTest; import cn.jiguang.common.connection.NettyClientInitializer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyClientTest { private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class); public static void main(String[] args) { //客户端只需要定义一个读写的线程组 EventLoopGroup group = new NioEventLoopGroup(); //客户端是bootstrap,其他和服务端配置大同小异 Bootstrap bootstrap = new Bootstrap() .group(group) .option(ChannelOption.TCP_NODELAY, true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new NettyClientHandler()); } }); try { //连接服务器地址 ChannelFuture future = bootstrap.connect("127.0.0.1",8090).sync(); logger.info("客户端启动成功"); //发送信息 future.channel().writeAndFlush("你好啊").sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //优雅关闭 group.shutdownGracefully(); } } }
NettyClientHandler
package com.wk.test.nettyTest; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 客户端处理类,继承入站处理适配器 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("客户端Active ....."); } /** * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("客户端接收的消息:{}", msg.toString()); } /** * 发生异常时触发 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EventLoop
//实例化两个线程组 //处理服务器与客户端的连接 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //进行网络通信(读写) EventLoopGroup workerGroup = new NioEventLoopGroup(10);
以服务端为例,设置两个线程组。
一个线程组负责监听连接的parentChannel,定义为BossLoopGroup
另一个线程组负责客户端连接读写的childChannel,定义为WorkerLoopGroup
一个线程封装到一个EventLoop,多个EventLoop就组成了线程组。而每一个channel绑定一个EventLoop,一个EventLoop可以有多个channel。
Bootstrap
Bootstrap是Netty提供的一个工厂类,我们可以通过它来完成对Netty服务端或客户端的初始化配置,这样我们就省去了用JDK NIO繁琐的创建channel、设置、启动等步骤,将重心放在事件业务处理上面。
Bootstrap分为服务端的ServerBootstrap和客户端的Bootstrap
Bootstrap执行分为8个步骤:
ServerBootstrap bootstrap = new ServerBootstrap() //1.设置reactor线程 .group(bossGroup,workerGroup) //2.设置channel通道的类型,这里是NIO .channel(NioServerSocketChannel.class) //3.设置监听端口 .localAddress(new InetSocketAddress(8090)) //4.设置通道的选项 .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP缓冲区 .childOption(ChannelOption.SO_KEEPALIVE, true) //心跳检测保持连接 //5.配发事件处理器流水线 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //解码器 socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); //编码器 socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); //自定义事件处理器 socketChannel.pipeline().addLast(new NettyServerHandler()); } }); try { //6.绑定servr,这里使用了sync方法,直到绑定成功为止 ChannelFuture channelFuture = bootstrap.bind().sync(); logger.info("服务器启动开启监听端口:{}",8090); //7.等待关闭,直到channel关闭为止 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //8.Netty优雅退出 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
这8个步骤和我上面写的DEMO顺序方面可能稍有不同,不过不影响。
Channel
核心概念以及流程
Channel是服务端与客户端的通信通道,每一个request都可以封装成一channel。
ChannelPipeline是用于存放Handler的容器,里面存放这事件处理器流水线。
ChannelHandler是处理器,分为入站处理器和出站处理器,以客户端的角度来看,客户端到服务端是出站,服务端到客户端就是入站。
ChannelContext是通信管道的上下文,当一个入站或出站处理器处理完后,将上下文传给下一个入站或出站处理器。
ChannelHandler
//5.配发事件处理器流水线 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //解码器 socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); //编码器 socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); //自定义事件处理器 socketChannel.pipeline().addLast(new NettyServerHandler()); } });
这里就是配发事件处理器流水线,编码器实质上也是个处理器,出站编码,入站解码。
自定义处理器也要继承出站或入站的事件处理配置类
public class NettyServerHandler extends ChannelInboundHandlerAdapter
ByteBuf
数据在网络中传输并不是直接传输的,而是要通过缓冲区。写出时,先将数据写到缓冲区,再由TCP协议将数据从缓冲区。读取时也是一样,从缓冲区读取。
JAVA NIO中的缓冲区是ByteBuff,长度固定且只有一个索引,在读写操作的时候还需要切换读写状态。而Netty的ByteBuf则改良了这些问题。
在ByteBuf中,提供了三个索引,读索引(readIndex)、写索引(writeIndex)、最大容量(maxCapacity)
缓冲区的释放
我们再看看这张图,入站时,当走到tailHandler(最后一个Handler)的时候,会释放掉缓冲区。出站则是在headHandler释放。