Netty WebSocket 协议

HTTP 协议的弊端

  1. HTTP 协议为半双工协议. 半双工协议指数据可以在客户端和服务端两个方向上传输, 但是不能同时传输. 它意味这同一时刻, 只有一个方向上的数据传输; 客户端发送请求, 服务器等待, 直到收到完整的请求. 然后发送回应, 客户端和服务器无法同时发送.
  2. HTTP 消息冗长而繁琐. HTTP 消息包含消息头、消息头、换行符等, 通常情况下采用文本方式传输, 相比于其他的二进制通信协议, 冗长而繁琐;
  3. 针对服务器推送的黑客攻击. 例如长时间轮询.

WebSocket 入门

webSocket 是 HTML5 开始提供的一种浏览器于服务器间进行全双工通信的技术.

在 WebSocket API 中, 浏览器和服务器只需要做一个握手的动作, 然后, 浏览器和服务器之间就形成了一条快速通道, 两者就可以直接相互传送数据了. WebSocket 基于 TCP 双向全双工进行消息传递, 在同一时刻, 既可以发送消息, 也可以接收消息, 相比 HTTP 的半双工协议, 性能得到很大提升.

WebSocket 的特点:

  • 单一的 TCP 连接, 采用全双工模式通信;
  • 对代理、防火墙和路由器透明;
  • 无头部信息、Cookie和身份验证;
  • 无安全开销;
  • 通过 ping/pong 帧保持链路激活;
  • 服务器可以主动传递消息给客户端, 不再需要客户端轮询.

WebSocket 连接建立

建立 webSocket 连接时, 需要通过客户端或浏览器发出握手请求, 类似下面的 http 报文.

Netty WebSocket 协议

这个请求和通常的 HTTP 请求不同, 包含了一些附加头信息, 其中附加头信息 Upgrade:WebSocket 表明这是一个申请协议升级的 HTTP 请求.

服务器解析这些附加的头信息, 然后生成应答信息返回给客户端, 客户端和服务端的 WebSocket 连接就建立起来了, 双方可以通过这个连接通道自由的传递信息, 并且这个连接会持续存在直到客户端或服务端的某一方主动关闭连接.

服务端返回给客户端的应答消息, 类似如下报文

Netty WebSocket 协议

请求消息中的 Sec-WebSocket-Key 是随机的, 服务端会用这些数据来构造出一个 SHA-1 的信息摘要, 把 Sec-WebSocket-Key 加上一个魔幻字符串 258EAFA5-E914-47DA-95CA-C5AB0DC85B11. 使用 SHA-1 加密, 然后进行 BASE-64 编码, 将结果做为 Sec-WebSocket-Accept 头的值, 返回给客户端.

WebSocket 生命周期

握手成功之后, 服务端和客户端就可以通过 messages 的方式进行通讯, 一个消息由一个或多个帧组成.

帧都有自己对应的类型, 属于同一个消息的多个帧具有相同类型的数据. 从广义上讲, 数据类型可以是文本数据(UTF-8文字)、二进制数据和控制帧(协议级信令, 例如信号).

WebSocket 连接生命周期如下:

Netty WebSocket 协议

Netty WebSocket 协议开发

示例代码

public class TimeServer {

    public void bind(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChildChannelHandler());

            // 绑定端口, 同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            System.out.println("shutdownGracefully");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) {
            ch.pipeline().addLast("http-codec", new HttpServerCodec());

            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));

            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
            ch.pipeline().addLast("handler", new WebSOcketServerHandler());
        }
    }

    private class WebSOcketServerHandler extends SimpleChannelInboundHandler<Object> {

        private WebSocketServerHandshaker handshaker;

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 传统的 HTTP 接入
            if (msg instanceof FullHttpRequest) {
                System.out.println("传统的 HTTP 接入");
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            }
            // WebSocket 接入
            else if (msg instanceof WebSocketFrame) {
                System.out.println("WebSocket 接入");
                handleWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }

        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
            // 如果 HTTP 解码失败, 返回HTTP异常
            if (!req.getDecoderResult().isSuccess() || (!"websocket".equalsIgnoreCase(req.headers().get("Upgrade")))) {
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
                return;
            }

            // 构造握手响应返回, 本机测试
            WebSocketServerHandshakerFactory wsFactory =
                    new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);

            handshaker = wsFactory.newHandshaker(req);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
            } else {
                handshaker.handshake(ctx.channel(), req);
            }
        }

        private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
            // 判断是否是关闭链路的指令
            if (frame instanceof CloseWebSocketFrame) {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                return;
            }

            // 判断是否是 ping 信息
            if (frame instanceof PingWebSocketFrame) {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }

            // 本例程仅支持文本消息, 不支持二进制消息
            if (!(frame instanceof TextWebSocketFrame)) {
                throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
            }

            // 返回应答信息
            String request = ((TextWebSocketFrame) frame).text();
            ctx.channel().write(new TextWebSocketFrame(request + " , 欢迎使用 Netty WebSocket 服务, 现在时刻: "
                    + new java.util.Date().toString()));

        }

        private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
            if (res.getStatus().code() != 200) {
                ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
                setContentLength(res, res.content().readableBytes());
            }

            // 如果是非 Keep-Alive, 关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!isKeepAlive(req) || res.getStatus().code() != 200) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    }

}

HttpServerCodec: 将请求和应答消息编码或解码为 HTTP 消息.
HttpObjectAggregator: 它的目的是将 HTTP 消息的多个部分组合成一条完整的 HTTP 消息. Netty 可以聚合 HTTP 消息, 使用 FullHttpResponseFullHttpRequestChannelPipeline 中的下一个 ChannelHandler, 这就消除了断裂消息, 保证了消息的完整.
ChunkedWriteHandler: 来向客户端发送 HTML5 文件, 主要用于支持浏览器和服务端进行 WebSocket 通信.

第一次握手请求消息由 HTTP 协议承载, 所以它是一个 HTTP 消息, 执行 handleHttpRequest 方法来处理 WebSocket 握手请求. 通过判断请求消息判断是否包含 Upgrade 字段或它的值不是 websocket, 则返回 HTTP 400 响应.

握手请求校验通过之后, 开始构造握手工厂, 创建握手处理类 WebSocketServerHandshaker, 通过它构造握手响应消息返回给客户端.

添加 WebSocket Encoder 和 WebSocket Decoder 之后, 服务端就可以自动对 WebSocket 消息进行编解码了, 后面的 handler 可以直接对 WebSocket 对象进行操作.

handleWebSocketFrame 对消息进行判断, 首先判断是否是控制帧, 如果是就关闭链路. 如果是维持链路的 Ping 消息, 则构造 Pong 消息返回. 由于本例程的 WebSocket 通信双方使用的都是文本消息, 所以对请求新消息的类型进行判断, 而不是文本的抛出异常.

最后, 从 TextWebSocketFrame 中获取请求消息字符串, 对它处理后通过构造新的 TextWebSocketFrame 消息返回给客户端, 由于握手应答时, 动态增加了 TextWebSocketFrame 的编码类, 所以可以直接发送 TextWebSocketFrame 对象.

相关推荐