程序员:利用Netty来写一个简单的聊天室、心跳检测

Netty

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

Netty 的特点

Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。

Netty的主要特点有:

1)设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。

2)使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。

3)高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

4)安全:完整的 SSL/TLS 和 StartTLS 支持。

5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

Netty 常见使用场景

Netty 常见的使用场景如下:

1)互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。

2)游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。

非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。

3)大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

聊天室

很多伙伴问我是Netty怎么用,抽时间给大家写了一个demo级别的入门案例,仅供参考。

Server端代码如下:

package com.netty.groupchat;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.*;

import io.netty.channel.group.ChannelGroup;

import io.netty.channel.group.DefaultChannelGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.util.concurrent.GlobalEventExecutor;

import java.net.InetSocketAddress;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

public class GroupChatServer {

private int port;

public GroupChatServer(int port) {

this.port = port;

}

//编写一个run方法,处理客户端的请求

public void run(){

EventLoopGroup boosGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();

try {

serverBootstrap.group(boosGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG,128)

.option(ChannelOption.SO_KEEPALIVE,true)

.childHandler(new ChannelInitializer<SocketChannel>() {

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast("decoder", new StringDecoder());

socketChannel.pipeline().addLast("encoder", new StringEncoder());

socketChannel.pipeline().addLast(new GroupChatServerHandler());

}

});

System.out.println("Netty 服务器启动...");

try {

ChannelFuture cf = serverBootstrap.bind(new InetSocketAddress(port)).sync();

cf.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

}

}catch (Exception e){

}finally {

boosGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

public static void main(String[] args) {

new GroupChatServer(6677).run();

}

}

/**

* 自定义消息处理

*/

class GroupChatServerHandler extends SimpleChannelInboundHandler<String>{

//Netty提供的ChannelGroup 可以用来保存 channel

private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

//保存新来的channels

private List<Channel> channels = new ArrayList<Channel>();

public static Map<User, Channel> channelsMap = new ConcurrentHashMap<>();

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**

* handlerAdded 表示建立链接,一旦链接,第一个被执行

* @param ctx

* @throws Exception

*/

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

System.out.println("handlerAdded...");

//将该客户加入聊天的信息推送给其他在线的客户端

channelGroup.add(ctx.channel());

//用来区分用户

channelsMap.put(new User(100,"张三"), ctx.channel());

channelGroup.writeAndFlush("[客户端]" + ctx.channel().remoteAddress() + "加入聊天\n");

}

/**

* 用来提示上线通知,表示channel处于活动的状态,提示 xxxx 上线

* @param ctx

* @throws Exception

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println(ctx.channel().remoteAddress() + "上线了...\n");

}

/**

* 当Channel处于非活动状态,提示xxx下线了

* @param ctx

* @throws Exception

*/

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println(ctx.channel().remoteAddress() + "离线了...");

}

/**

* 断开链接会被出发,将xx客户离开信息推送给当前在线的用户

* @param ctx

* @throws Exception

*/

@Override

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

Channel channel = ctx.channel();

channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "离线了\n");

System.out.println("channelGroup size "+channelGroup.size());

}

/**

* 处理逻辑

* @param s

* @throws Exception

*/

protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {

Channel channel = ctx.channel();

channelGroup.forEach(ch->{

//不是当前的channel

if (ch != channel){

ch.writeAndFlush("[客户]" + channel.remoteAddress() + "发送了消息:" + s + "\n");

}else{

ch.writeAndFlush("[自己]发送了消息" + s + "\n");

}

});

}

}

Client端代码如下:

package com.netty.groupchat;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

import java.util.Scanner;

public class GroupChatClient {

private String host;

private int port;

public GroupChatClient(String host, int port) {

this.host = host;

this.port = port;

}

public void run(){

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();

try{

bootstrap.group(eventLoopGroup)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast("decoder", new StringDecoder());

socketChannel.pipeline().addLast("encoder", new StringEncoder());

socketChannel.pipeline().addLast(new GroupChatClientHandler());

}

});

ChannelFuture sync = bootstrap.connect(new InetSocketAddress(host, port)).sync();

System.out.println("------"+sync.channel().remoteAddress()+"------");

// sync.channel().closeFuture().sync();

//客户端要是收入信息

Scanner scanner = new Scanner(System.in);

while (scanner.hasNextLine()) {

String msg = scanner.nextLine();

sync.channel().writeAndFlush(msg+"\n");

}

}catch (Exception e){

e.printStackTrace();

}finally {

eventLoopGroup.shutdownGracefully();

}

}

public static void main(String[] args) {

new GroupChatClient("127.0.0.1", 6677).run();

}

}

class GroupChatClientHandler extends SimpleChannelInboundHandler<String>{

@Override

protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {

System.out.println(s.trim());

}

}

心跳检测server端代码,可以用上面的client代码来进行测试哦

package com.netty.xtjc;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

import io.netty.handler.timeout.IdleStateHandler;

import java.net.InetSocketAddress;

import java.util.concurrent.TimeUnit;

import static io.netty.handler.timeout.IdleState.*;

import static io.netty.handler.timeout.IdleState.READER_IDLE;

/**

* 心跳检测

*/

public class MyServer {

public static void main(String[] args) {

EventLoopGroup boosGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(boosGroup,workerGroup)

.channel(NioServerSocketChannel.class)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

//加入一个Neety 提供的IdleStatehandler

/**

* 1 readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包,检测是否有链接

* 2 writerIdletime 表示多长时间没有写

* 3 allIdleTime 表示多长时间没有读也没有写

* 当IdleStateEvent触发后,就会传给管道的下一个Handler去处理,通过调用(回调)的 userEventTriggerd

*/

pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));

//加入一个对空闲检测进一步处理自定义的Handler

pipeline.addLast(new MyServerHandler());

}

});

ChannelFuture cf = null;

try {

cf = serverBootstrap.bind(new InetSocketAddress(6677)).sync();

cf.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

class MyServerHandler extends ChannelInboundHandlerAdapter {

/**

*

* @param ctx

* @param evt

* @throws Exception

*/

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

IdleStateEvent event = (IdleStateEvent) evt;

String eventType = null;

switch (event.state()){

case READER_IDLE:

eventType = "读空闲";

break;

case WRITER_IDLE:

eventType = "写空闲";

break;

case ALL_IDLE:

eventType = "读写空闲";

break;

}

System.out.println(ctx.channel().remoteAddress() + "---超时时间---"+eventType);

// ctx.channel().close();

}

}

程序员:利用Netty来写一个简单的聊天室、心跳检测

相关推荐