程序员:利用Netty来写一个简单的聊天室、心跳检测
Netty
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
Netty 的特点
Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。
Netty的主要特点有:
1)设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
2)使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
3)高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
4)安全:完整的 SSL/TLS 和 StartTLS 支持。
5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。
Netty 常见使用场景
Netty 常见的使用场景如下:
1)互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
2)游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
3)大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
聊天室
很多伙伴问我是Netty怎么用,抽时间给大家写了一个demo级别的入门案例,仅供参考。
Server端代码如下:
package com.netty.groupchat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class GroupChatServer {
private int port;
public GroupChatServer(int port) {
this.port = port;
}
//编写一个run方法,处理客户端的请求
public void run(){
EventLoopGroup boosGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.option(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
socketChannel.pipeline().addLast(new GroupChatServerHandler());
}
});
System.out.println("Netty 服务器启动...");
try {
ChannelFuture cf = serverBootstrap.bind(new InetSocketAddress(port)).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}catch (Exception e){
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new GroupChatServer(6677).run();
}
}
/**
* 自定义消息处理
*/
class GroupChatServerHandler extends SimpleChannelInboundHandler<String>{
//Netty提供的ChannelGroup 可以用来保存 channel
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//保存新来的channels
private List<Channel> channels = new ArrayList<Channel>();
public static Map<User, Channel> channelsMap = new ConcurrentHashMap<>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* handlerAdded 表示建立链接,一旦链接,第一个被执行
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded...");
//将该客户加入聊天的信息推送给其他在线的客户端
channelGroup.add(ctx.channel());
//用来区分用户
channelsMap.put(new User(100,"张三"), ctx.channel());
channelGroup.writeAndFlush("[客户端]" + ctx.channel().remoteAddress() + "加入聊天\n");
}
/**
* 用来提示上线通知,表示channel处于活动的状态,提示 xxxx 上线
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "上线了...\n");
}
/**
* 当Channel处于非活动状态,提示xxx下线了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "离线了...");
}
/**
* 断开链接会被出发,将xx客户离开信息推送给当前在线的用户
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "离线了\n");
System.out.println("channelGroup size "+channelGroup.size());
}
/**
* 处理逻辑
* @param s
* @throws Exception
*/
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(ch->{
//不是当前的channel
if (ch != channel){
ch.writeAndFlush("[客户]" + channel.remoteAddress() + "发送了消息:" + s + "\n");
}else{
ch.writeAndFlush("[自己]发送了消息" + s + "\n");
}
});
}
}
Client端代码如下:
package com.netty.groupchat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Scanner;
public class GroupChatClient {
private String host;
private int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run(){
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try{
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
socketChannel.pipeline().addLast(new GroupChatClientHandler());
}
});
ChannelFuture sync = bootstrap.connect(new InetSocketAddress(host, port)).sync();
System.out.println("------"+sync.channel().remoteAddress()+"------");
// sync.channel().closeFuture().sync();
//客户端要是收入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
sync.channel().writeAndFlush(msg+"\n");
}
}catch (Exception e){
e.printStackTrace();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new GroupChatClient("127.0.0.1", 6677).run();
}
}
class GroupChatClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s.trim());
}
}
心跳检测server端代码,可以用上面的client代码来进行测试哦
package com.netty.xtjc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.timeout.IdleState.*;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
/**
* 心跳检测
*/
public class MyServer {
public static void main(String[] args) {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//加入一个Neety 提供的IdleStatehandler
/**
* 1 readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包,检测是否有链接
* 2 writerIdletime 表示多长时间没有写
* 3 allIdleTime 表示多长时间没有读也没有写
* 当IdleStateEvent触发后,就会传给管道的下一个Handler去处理,通过调用(回调)的 userEventTriggerd
*/
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
//加入一个对空闲检测进一步处理自定义的Handler
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture cf = null;
try {
cf = serverBootstrap.bind(new InetSocketAddress(6677)).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "---超时时间---"+eventType);
// ctx.channel().close();
}
}