netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统
结构
- netty 作为服务端
- protobuf 作为序列化数据的协议
- websocket 前端通讯
演示
GitHub 地址
netty 服务端实现
Server.java
启动类
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.net.InetSocketAddress; //websocket长连接示例 public class Server { public static void main(String[] args) throws Exception{ // 主线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 从线程组 EventLoopGroup wokerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,wokerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerChannelInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } } }
ServerChannelInitializer.java
import com.example.nettydemo.protobuf.MessageData; import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLiteOrBuilder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.stream.ChunkedWriteHandler; import java.util.List; import static io.netty.buffer.Unpooled.wrappedBuffer; public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // HTTP请求的解码和编码 pipeline.addLast(new HttpServerCodec()); // 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse, // 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent pipeline.addLast(new HttpObjectAggregator(65536)); // 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; 增加之后就不用考虑这个问题了 pipeline.addLast(new ChunkedWriteHandler()); // WebSocket数据压缩 pipeline.addLast(new WebSocketServerCompressionHandler()); // 协议包长度限制 pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true)); // 协议包解码 pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() { @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception { ByteBuf buf = ((BinaryWebSocketFrame) frame).content(); objs.add(buf); buf.retain(); } }); // 协议包编码 pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() { @Override protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception { ByteBuf result = null; if (msg instanceof MessageLite) { result = wrappedBuffer(((MessageLite) msg).toByteArray()); } if (msg instanceof MessageLite.Builder) { result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray()); } // ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ==== // 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的 WebSocketFrame frame = new BinaryWebSocketFrame(result); out.add(frame); } }); // 协议包解码时指定Protobuf字节数实例化为CommonProtocol类型 pipeline.addLast(new ProtobufDecoder(MessageData.RequestUser.getDefaultInstance())); // websocket定义了传递数据的6中frame类型 pipeline.addLast(new ServerFrameHandler()); } }
ServerFrameHandler.java
import com.example.nettydemo.protobuf.MessageData; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.List; //处理文本协议数据,处理TextWebSocketFrame类型的数据,websocket专门处理文本的frame就是TextWebSocketFrame public class ServerFrameHandler extends SimpleChannelInboundHandler<MessageData.RequestUser> { private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //读到客户端的内容并且向客户端去写内容 @Override protected void channelRead0(ChannelHandlerContext ctx, MessageData.RequestUser msg) throws Exception { // channelGroup.add(); Channel channel = ctx.channel(); System.out.println(msg.getUserName()); System.out.println(msg.getAge()); System.out.println(msg.getPassword()); MessageData.ResponseUser bank = MessageData .ResponseUser.newBuilder() .setUserName("你好,请问有什么可以帮助你!") .setAge(18).setPassword("11111").build(); channel.writeAndFlush(bank); } //每个channel都有一个唯一的id值 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //打印出channel唯一值,asLongText方法是channel的id的全名 // System.out.println("handlerAdded:"+ctx.channel().id().asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // System.out.println("handlerRemoved:" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生"); ctx.close(); } }
protobuf 文件的使用
proto 文件
syntax ="proto2"; package com.example.nettydemo.protobuf; //optimize_for 加快解析的速度 option optimize_for = SPEED; option java_package = "com.example.nettydemo.protobuf"; option java_outer_classname="MessageData"; // 客户端发送过来的消息实体 message RequestUser{ optional string user_name = 1; optional int32 age = 2; optional string password = 3; } // 返回给客户端的消息实体 message ResponseUser{ optional string user_name = 1; optional int32 age = 2; optional string password = 3; }
生成 proto 的Java 类
批量生成工具,直接找到这个 bat 或者 sh 文件,在对应的平台执行就可以了具体可以自行百度 protobuf 怎么使用
Windows 版本
set outPath=../../java set fileArray=(MessageDataProto ATestProto) # 将.proto文件生成java类 for %%i in %fileArray% do ( echo generate cli protocol java code: %%i.proto protoc --java_out=%outPath% ./%%i.proto ) pause
sh 版本
地址: https://github.com/lmxdawn/ne...
#!/bin/bash outPath=../../java fileArray=(MessageDataProto ATestProto) for i in ${fileArray[@]}; do echo "generate cli protocol java code: ${i}.proto" protoc --java_out=$outPath ./$i.proto done
websocket 实现
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>WebSocket客户端</title> </head> <body> <script src="protobuf.min.js"></script> <script type="text/javascript"> var socket; //如果浏览器支持WebSocket if (window.WebSocket) { //参数就是与服务器连接的地址 socket = new WebSocket("ws://localhost:8899/ws"); //客户端收到服务器消息的时候就会执行这个回调方法 socket.onmessage = function (event) { var ta = document.getElementById("responseText"); // 解码 responseUserDecoder({ data: event.data, success: function (responseUser) { var content = "客服小姐姐: " + responseUser.userName + ", 小姐姐年龄: " + responseUser.age + ", 密码: " + responseUser.password; ta.value = ta.value + "\n" + content; }, fail: function (err) { console.log(err); }, complete: function () { console.log("解码全部完成") } }) } //连接建立的回调函数 socket.onopen = function (event) { var ta = document.getElementById("responseText"); ta.value = "连接开启"; } //连接断掉的回调函数 socket.onclose = function (event) { var ta = document.getElementById("responseText"); ta.value = ta.value + "\n" + "连接关闭"; } } else { alert("浏览器不支持WebSocket!"); } //发送数据 function send(message) { if (!window.WebSocket) { return; } // socket.binaryType = "arraybuffer"; // 判断是否开启 if (socket.readyState !== WebSocket.OPEN) { alert("连接没有开启"); return; } var data = { userName: message, age: 18, password: "11111" }; requestUserEncoder({ data: data, success: function (buffer) { console.log("编码成功"); socket.send(buffer); }, fail: function (err) { console.log(err); }, complete: function () { console.log("编码全部完成") } }); } /** * 发送的消息编码成 protobuf */ function requestUserEncoder(obj) { var data = obj.data; var success = obj.success; // 成功的回调 var fail = obj.fail; // 失败的回调 var complete = obj.complete; // 成功或者失败都会回调 protobuf.load("../proto/MessageDataProto.proto", function (err, root) { if (err) { if (typeof fail === "function") { fail(err) } if (typeof complete === "function") { complete() } return; } // Obtain a message type var RequestUser = root.lookupType("com.example.nettydemo.protobuf.RequestUser"); // Exemplary payload var payload = data; // Verify the payload if necessary (i.e. when possibly incomplete or invalid) var errMsg = RequestUser.verify(payload); if (errMsg) { if (typeof fail === "function") { fail(errMsg) } if (typeof complete === "function") { complete() } return; } // Create a new message var message = RequestUser.create(payload); // or use .fromObject if conversion is necessary // Encode a message to an Uint8Array (browser) or Buffer (node) var buffer = RequestUser.encode(message).finish(); if (typeof success === "function") { success(buffer) } if (typeof complete === "function") { complete() } }); } /** * 接收到服务器二进制流的消息进行解码 */ function responseUserDecoder(obj) { var data = obj.data; var success = obj.success; // 成功的回调 var fail = obj.fail; // 失败的回调 var complete = obj.complete; // 成功或者失败都会回调 protobuf.load("../proto/MessageDataProto.proto", function (err, root) { if (err) { if (typeof fail === "function") { fail(err) } if (typeof complete === "function") { complete() } return; } // Obtain a message type var ResponseUser = root.lookupType("com.example.nettydemo.protobuf.ResponseUser"); var reader = new FileReader(); reader.readAsArrayBuffer(data); reader.onload = function (e) { var buf = new Uint8Array(reader.result); var responseUser = ResponseUser.decode(buf); if (typeof success === "function") { success(responseUser) } if (typeof complete === "function") { complete() } } }); } </script> <h1>欢迎访问客服系统</h1> <form onsubmit="return false"> <textarea name="message" style="width: 400px;height: 200px"></textarea> <input type="button" value="发送数据" onclick="send(this.form.message.value);"> <h3>回复消息:</h3> <textarea id="responseText" style="width: 400px;height: 300px;"></textarea> <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据"> </form> </body> </html>
扩展阅读
spring boot 实现的后台管理系统
vue + element-ui 实现的后台管理界面,接入 spring boot API接口
相关推荐
fengshantao 2020-10-29
arctan0 2020-10-14
爱传文档 2020-07-28
gzx0 2020-07-05
fengshantao 2020-07-04
fengshantao 2020-07-02
jannal 2020-06-21
arctan0 2020-06-19
arctan0 2020-06-16
gzx0 2020-06-14
fengshantao 2020-06-13
gzx0 2020-06-12
arctan0 2020-06-11
fengshantao 2020-06-11
mbcsdn 2020-05-19
arctan0 2020-05-16
爱传文档 2020-05-08
爱传文档 2020-05-04