基于netty的群聊

基于netty的群聊

学了一段时间的netty知识,现在通过这个基于console的程序来对netty的相关接口做个简单的应用。

准备

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.44.Final</version>
</dependency>

代码实现

我们都知道,一个典型的netty程序绝大部分使基于以下三部曲来走的;

  1. server/client 启动类
  2. xxxInitializer (implements ChannelInitializer<?> )
  3. xxxChannelHandler (implememts SimpleChannelInboundHandler<?>)

按照以上的三部曲思路,就可以实现自己的网络程序了。

Server端实现

server启动类
public class MyChatServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyChatServerInitializeor());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
serverChannelInitializer
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 粘包、粘包处理器
        pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyServerChannelHandler());

    }
}
serverChannelHandler
public class MyServerChannelHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 保存channel对象
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final String DATE_PARTTEN = "yyyy-MM-dd HH:mm:ss:SSS";
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.forEach(ch -> {
            // 当前遍历的channel不是发送msg的channel对象。则向其他客户端广播
            if (channel != ch) {
                ch.writeAndFlush(channel.remoteAddress() + ", 发送的消息" + msg + "\n");
            } else {
                ch.writeAndFlush("[自己] " + msg + " \n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + " 上线了!");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + " 离开了!");

    }

    /**
     * 客户端链接建立的时候调用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //super.handlerAdded(ctx);
        // 服务端与客户端建立
        Channel channel = ctx.channel();
        // 向其他链接的客户端发送广播信息
        SocketAddress socketAddress = channel.remoteAddress();
        String date = DateTimeFormatter.ofPattern(DATE_PARTTEN).format(LocalDateTime.now());
        // 向channelGroup中的每一个channel对象发送一个消息
        channelGroup.writeAndFlush(date + " [服务器] - " + socketAddress + " 加入 \n");
        // 保存该客户端链接
        channelGroup.add(channel);
    }

    /**
     * 链接断开
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        String date = DateTimeFormatter.ofPattern(DATE_PARTTEN).format(LocalDateTime.now());

        channelGroup.writeAndFlush(date + " [服务器] - " + channel.remoteAddress() + " 离开 \n");
    }

    /**
     * 客户端注册进来
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }
}

client 端实现

client启动类
public class MyChatClient {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();


        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new MyClientChannelInitializor());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            Channel channel = channelFuture.channel();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            for (; ; ) {
                channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
            }
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }

    }
}
clientChannelInitializer
public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyChatClientChannelHandler());

    }
}
clientChannelHandler
public class MyChatClientChannelHandler extends SimpleChannelInboundHandler<String> {

   // 直接打印服务端返回的消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);

    }
}

相关推荐