netty基础知识2
本文所使用JDK7和netty5.0为基础。Netty5.0
Netty 服务端开发
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by lianpeng on 2016/12/6. */ public class NettyService { private static final int PORT = 30088; public static void start() { //事件循环组将网络编程的线程处理部分很好的封装起来 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); //启动服务端必须实例化 ServerBootstrap bootstrap = new ServerBootstrap(); //指定使用的事件循环组,该组用于接受连接、接收/发送数据,等等 //可以指定两个事件循环组,分别用于接受连接、读写数据 bootstrap.group( bossGroup, workGroup ) //使用非阻塞的套接字传输,必须指定此服务器通道类型,以监听并处理客户端连接 .channel( NioServerSocketChannel.class ) //当连接被接受后,会创建NioServerSocketChannel的子通道SocketChannel //childHandler方法用于指定子通道的处理器,通常为ChannelInitializer .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel( SocketChannel ch ) throws Exception { //管道(Pipeline)持有某个通道的全部处理器 ChannelPipeline pipeline = ch.pipeline(); //添加一个处理器 pipeline.addLast(); } } ); try { //绑定端口,等待直到成功 ChannelFuture f = bootstrap.bind( PORT ).sync(); //等待服务器通道被关闭 f.channel().closeFuture().sync(); } catch ( InterruptedException e ) { e.printStackTrace(); } finally { //关闭事件循环,释放相关资源(包括创建的线程) bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } class nettyServHandle extends ChannelHandlerAdapter { /** * 当从通道读取到数据后,会执行该回调 * 注意:数据可能碎片化,分若干次读取,这种情况下,该回调会被执行多次 */ public void channelRead( ChannelHandlerContext ctx, Object msg ) { ByteBuf buf = (ByteBuf) msg;//注意这里不一定能收到完整的消息 //将接收到的数据写回去,注意这里还没有将数据刷空以发送到对端(peer) //当前方法不使用channelRead0的原因:write可能在channelRead返回前尚未完成(因为异步) //如果使用channelRead0,那么msg对应的ByteBuf将被自动释放(release) ctx.write( msg ); } /** * 当读取数据完毕(没有更多数据可读)后,会执行该回调 */ public void channelReadComplete( ChannelHandlerContext ctx ) { //刷空所有数据,并在执行完毕后,关闭通道 ctx.writeAndFlush( Unpooled.EMPTY_BUFFER ).addListener( ChannelFutureListener.CLOSE ); } /** * 当发生任何异常时,执行该回调 * 至少应当有一个通道处理器覆盖此方法,以实现必要的异常处理 */ public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { //关闭通道 ctx.close(); } }
Netty 客户端开发
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cc.gmem.study.network.basic.Helper; public class NettyClient { private static final Logger LOGGER = LoggerFactory.getLogger( NettyClient.class ); private final String host; private final int port; public NettyClient( String host, int port ) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { //要启动客户端,必须实例化下面这个实例 Bootstrap b = new Bootstrap(); b //指定事件循环组 .group( group ) //指定非阻塞的套接字通道类 .channel( NioSocketChannel.class ) //指定需要连接的服务器 .remoteAddress( new InetSocketAddress( host, port ) ) //指定通道的处理器,一旦连接成功就会调用此处理器 .handler( new ChannelInitializer() { public void initChannel( SocketChannel ch ) throws Exception { //在通道的尾部添加一个处理器 ch.pipeline().addLast( new EchoClientHandler() ); } } ); //连接、等待直到连接成功 ChannelFuture f = b.connect().sync(); //等待直到客户端通道被关闭 f.channel().closeFuture().sync(); } finally { //关闭事件循环,释放相关资源(包括创建的线程) group.shutdownGracefully().sync(); } } public static void main( String[] args ) throws Exception { new EchoClient( Helper.DEFAULT_HOST, Helper.DEFAULT_PORT ).start(); } @Sharable public class NettyClientHandler extends SimpleChannelInboundHandler { /** * 一旦与服务器的连接建立,即调用此回调 */ public void channelActive( ChannelHandlerContext ctx ) { //发送一个简单的消息给服务器 //Unpooled是一个工具类,可以分配新的缓冲,或者包装已经存在的字节数组、字节缓冲、字符串 //copiedBuffer方法可以根据指定的字符串和编码方式,创建一个大端(big-endian)的字节缓冲 //这里的几个汉字和标点共计21字节 ByteBuf msg = Unpooled.copiedBuffer( "服务器,你好!", CharsetUtil.UTF_8 ); //写入数据并刷空,如果不刷空,数据可能存在于本地缓冲,不发送给服务器 ctx.writeAndFlush( msg ); } /** * 从服务器接收到数据后,调用此回调。channelRead0会自动的释放(减少引用计数,如果为0则解构并回收)ByteBuf对象 * 需要注意的是:接收到的字节可能是碎片化的(channelRead0),虽然上面发送了21字节给服务器 * 但是可能分多次读取,例如:第一次读取10字节,第二次读取11字节。因此该回调可能被调用多次 * 唯一能保证的是:在使用TCP或其它面向流的协议的情况下,数据被读取的顺序有保证 */ public void channelRead0( ChannelHandlerContext ctx, ByteBuf in ) { int count = in.readableBytes();//可以获取本次可读的字节数 ByteBuf buf = in.readBytes( count ); //读取为字节缓冲 Object msg = buf.toString( CharsetUtil.UTF_8 );//转换为字符串形式 LOGGER.debug( "Client received: {}", msg ); } public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { ctx.close(); } } }
Netty 解决tcp粘包和分包
TCP以流的方式进行数据传输,上层的应用程序为了对消息进行区分,往往采样一下此种方式
1.消息长度固定,累计读取到长度总和为LEN的报文后,就认为读取到了一个完整的消息;讲计数器,讲计数器职位重新开始读下一个数据包;
2.将回车符作为消息的结束符,例如:FTP协议,这种方式使用比较广泛
3.将特殊字符作为消息的结束符;
4.通过消息头定义长度字段来标示消息的总长度。
Netty对上面四种应用做了统一的抽象,提供了四种解码器来解决对应问题。
1. StringDecoder 将消息解码成String类型方便读取;
2.LineBasedFrameDecoder 以换行符为结束标记的解码器,工作原理是依次遍历ByteBuf中的可读字节,判读是否是“\n”或“\r\n”;
pipeline.addLast(new LineBasedFrameDecoder( 1024 ) );
3.DelimiterBasedFrameDecoder 以特殊字符为消息结束符的解码器;
ByteBuf delimiter = Unpooled.copiedBuffer( "$_".getBytes() ); //第一个参数表示消息的最大长度,当消息读取到最大长度后任然没找见分割符,就抛出 //TooLongFrameException异常,防止异常码流缺失导致的内存溢出 pipeline.addLast( "framer", new DelimiterBasedFrameDecoder( Integer.MAX_VALUE, Delimiters.nulDelimiter() ) );
4.FixedBasedFrameDecoder 以固定长度进行解码,不管一次接受多少数据包,他都是按设定的长度进行解码;
pipeline.addLast( new FixedBasedFrameDecoder( 1024 ) );