netty基础知识2

 本文所使用JDK7和netty5.0为基础。Netty5.0

Netty 服务端开发

      

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by lianpeng on 2016/12/6.
 */
public class NettyService {

    private static final int PORT = 30088;

    public static void start() {
        //事件循环组将网络编程的线程处理部分很好的封装起来
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        //启动服务端必须实例化
        ServerBootstrap bootstrap = new ServerBootstrap();
        //指定使用的事件循环组,该组用于接受连接、接收/发送数据,等等
        //可以指定两个事件循环组,分别用于接受连接、读写数据
        bootstrap.group( bossGroup, workGroup )
                 //使用非阻塞的套接字传输,必须指定此服务器通道类型,以监听并处理客户端连接
                 .channel( NioServerSocketChannel.class )
                 //当连接被接受后,会创建NioServerSocketChannel的子通道SocketChannel
                 //childHandler方法用于指定子通道的处理器,通常为ChannelInitializer
                 .handler( new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel( SocketChannel ch ) throws Exception {
                         //管道(Pipeline)持有某个通道的全部处理器
                         ChannelPipeline pipeline = ch.pipeline();
                         //添加一个处理器
                         pipeline.addLast();
                     }
                 } );
        try {
            //绑定端口,等待直到成功
            ChannelFuture f = bootstrap.bind( PORT ).sync();
            //等待服务器通道被关闭
            f.channel().closeFuture().sync();
        } catch ( InterruptedException e ) {
            e.printStackTrace();
        } finally {
            //关闭事件循环,释放相关资源(包括创建的线程)
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }


}

class nettyServHandle extends ChannelHandlerAdapter {
    /**
     * 当从通道读取到数据后,会执行该回调
     * 注意:数据可能碎片化,分若干次读取,这种情况下,该回调会被执行多次
     */
    public void channelRead( ChannelHandlerContext ctx, Object msg ) {
        ByteBuf buf = (ByteBuf) msg;//注意这里不一定能收到完整的消息
        //将接收到的数据写回去,注意这里还没有将数据刷空以发送到对端(peer)
        //当前方法不使用channelRead0的原因:write可能在channelRead返回前尚未完成(因为异步)
        //如果使用channelRead0,那么msg对应的ByteBuf将被自动释放(release)
        ctx.write( msg );
    }

    /**
     * 当读取数据完毕(没有更多数据可读)后,会执行该回调
     */
    public void channelReadComplete( ChannelHandlerContext ctx ) {
        //刷空所有数据,并在执行完毕后,关闭通道
        ctx.writeAndFlush( Unpooled.EMPTY_BUFFER ).addListener( ChannelFutureListener.CLOSE );
    }

    /**
     * 当发生任何异常时,执行该回调
     * 至少应当有一个通道处理器覆盖此方法,以实现必要的异常处理
     */
    public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) {
        //关闭通道
        ctx.close();
    }
}

Netty 客户端开发

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
 
import java.net.InetSocketAddress;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import cc.gmem.study.network.basic.Helper;
 
 
public class NettyClient
{
 
    private static final Logger LOGGER = LoggerFactory.getLogger( NettyClient.class );
 
    private final String        host;
 
    private final int           port;
 
    public NettyClient( String host, int port )
    {
        this.host = host;
        this.port = port;
    }
 
    public void start() throws Exception
    {
 
        EventLoopGroup group = new NioEventLoopGroup();
        try
        {
            //要启动客户端,必须实例化下面这个实例
            Bootstrap b = new Bootstrap();
            b
                    //指定事件循环组
                    .group( group )
                    //指定非阻塞的套接字通道类
                    .channel( NioSocketChannel.class )
                    //指定需要连接的服务器
                    .remoteAddress( new InetSocketAddress( host, port ) )
                    //指定通道的处理器,一旦连接成功就会调用此处理器
                    .handler( new ChannelInitializer() {
 
                        public void initChannel( SocketChannel ch ) throws Exception
                        {
                            //在通道的尾部添加一个处理器
                            ch.pipeline().addLast( new EchoClientHandler() );
                        }
                    } );
            //连接、等待直到连接成功
            ChannelFuture f = b.connect().sync();
            //等待直到客户端通道被关闭
            f.channel().closeFuture().sync();
        }
        finally
        {
            //关闭事件循环,释放相关资源(包括创建的线程)
            group.shutdownGracefully().sync();
        }
    }
 
    public static void main( String[] args ) throws Exception
    {
        new EchoClient( Helper.DEFAULT_HOST, Helper.DEFAULT_PORT ).start();
    }
 
    @Sharable
    public class NettyClientHandler extends SimpleChannelInboundHandler
    {
 
        /**
         * 一旦与服务器的连接建立,即调用此回调
         */
        public void channelActive( ChannelHandlerContext ctx )
        {
            //发送一个简单的消息给服务器
            //Unpooled是一个工具类,可以分配新的缓冲,或者包装已经存在的字节数组、字节缓冲、字符串
            //copiedBuffer方法可以根据指定的字符串和编码方式,创建一个大端(big-endian)的字节缓冲
            //这里的几个汉字和标点共计21字节
            ByteBuf msg = Unpooled.copiedBuffer( "服务器,你好!", CharsetUtil.UTF_8 );
            //写入数据并刷空,如果不刷空,数据可能存在于本地缓冲,不发送给服务器
            ctx.writeAndFlush( msg );
        }
 
        /**
         * 从服务器接收到数据后,调用此回调。channelRead0会自动的释放(减少引用计数,如果为0则解构并回收)ByteBuf对象
         * 需要注意的是:接收到的字节可能是碎片化的(channelRead0),虽然上面发送了21字节给服务器
         * 但是可能分多次读取,例如:第一次读取10字节,第二次读取11字节。因此该回调可能被调用多次
         * 唯一能保证的是:在使用TCP或其它面向流的协议的情况下,数据被读取的顺序有保证
         */
        public void channelRead0( ChannelHandlerContext ctx, ByteBuf in )
        {
            int count = in.readableBytes();//可以获取本次可读的字节数
            ByteBuf buf = in.readBytes( count ); //读取为字节缓冲
            Object msg = buf.toString( CharsetUtil.UTF_8 );//转换为字符串形式
            LOGGER.debug( "Client received: {}", msg );
        }
 
        public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause )
        {
            ctx.close();
        }
    }
}

Netty 解决tcp粘包和分包

   TCP以流的方式进行数据传输,上层的应用程序为了对消息进行区分,往往采样一下此种方式

  1.消息长度固定,累计读取到长度总和为LEN的报文后,就认为读取到了一个完整的消息;讲计数器,讲计数器职位重新开始读下一个数据包;

  2.将回车符作为消息的结束符,例如:FTP协议,这种方式使用比较广泛

  3.将特殊字符作为消息的结束符;

  4.通过消息头定义长度字段来标示消息的总长度。

Netty对上面四种应用做了统一的抽象,提供了四种解码器来解决对应问题。

   1. StringDecoder     将消息解码成String类型方便读取;

    2.LineBasedFrameDecoder    以换行符为结束标记的解码器,工作原理是依次遍历ByteBuf中的可读字节,判读是否是“\n”或“\r\n”;

pipeline.addLast(new LineBasedFrameDecoder( 1024 ) );

    3.DelimiterBasedFrameDecoder   以特殊字符为消息结束符的解码器;

ByteBuf delimiter = Unpooled.copiedBuffer( "$_".getBytes() );
//第一个参数表示消息的最大长度,当消息读取到最大长度后任然没找见分割符,就抛出
//TooLongFrameException异常,防止异常码流缺失导致的内存溢出
pipeline.addLast( "framer", new DelimiterBasedFrameDecoder( Integer.MAX_VALUE, Delimiters.nulDelimiter() ) );

   4.FixedBasedFrameDecoder   以固定长度进行解码,不管一次接受多少数据包,他都是按设定的长度进行解码;

    

pipeline.addLast( new FixedBasedFrameDecoder( 1024 ) );