Netty 中队列的使用
任务队列中的Task有3种典型使用场景
用户程序自定义的普通任务
NettyServerHandler代码有改动:
package com.ronnie.netty.sample; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil; /** * 1. 自定义一个Handler需要继承 netty 规定好的某个 HandlerAdapter(适配器模式) * 2. 这时我们自定义一个Handler, 才能称为一个handler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取数据事件(这里我们可以读取客户端发送的消息) * ChannelHandlerContext ctx: 上下文对象, 含有管道 pipeline, 通道 channel, 地址 address * Object msg: 就是客户端发送的数据 默认Object * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { /* 比如这里我们有一个非常耗时的业务 -> 异步执行 -> 提交该channel对应的 NioEventLoop 的 taskQueue中 */ // 解决方案1: 用户程序自定义的普通任务 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, atme ", CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("Exception occurs: " + e.getMessage()); } } }); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(20 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, yang ", CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("Exception occurs: " + e.getMessage()); } } }); // Thread.sleep(10 * 1000); // ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, atme ", CharsetUtil.UTF_8)); System.out.println("go on ..."); // System.out.println("The server is reading thread: " + Thread.currentThread().getName()); // System.out.println("server ctx = " + ctx); // System.out.println("Check the relationship between channel and pipeline"); // Channel channel = ctx.channel(); // ChannelPipeline pipeline = ctx.pipeline(); // 本质是一个双向链表, 涉及到出栈入栈问题 // // 将 msg转成一个 ByteBuf(是netty提供的, 不是NIO的 ByteBuffer, 性能更高) // ByteBuf buf = (ByteBuf) msg; // System.out.println("The message that client send: " + buf.toString(CharsetUtil.UTF_8)); // System.out.println("The address of client: " + ctx.channel().remoteAddress()); } /** * 数据读取完毕 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // write + flush, 将数据写入到缓冲并刷新 // 一般来说, 我们对发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, dear client, Kappa", CharsetUtil.UTF_8)); } /** * 处理异常, 一般需要关闭通道 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
打上断点,debug启动
左键点击ctx
pipeline -> channel -> eventLoop -> taskQueue
可以看到两个线程任务存入了任务队列中
用户自定义定时任务
在NettyServerHandler中之前添加的任务线程代码之下, 打印go on之前添加以下代码:
// 用户自定义定时任务 -> 该任务是提交到 scheduleTaskQueue中的 ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { Thread.sleep(20 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, yyf ", CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("Exception occurs: " + e.getMessage()); } } }, 5, TimeUnit.SECONDS);
打上断点, debug启动
左键点击ctx
pipeline -> channel -> eventLoop -> taskQueue
你会发现taskQueue中只有2个线程任务
我们刚刚写的那个任务在scheduledTaskQueue中(pipeline -> channel -> eventLoop -> scheduledTaskQueue)
非当前Reactor 线程调用Channel的各种方法
- 例如在推送系统的业务线程中, 根据用户的标识, 找到对应的Channel引用, 然后调用 Write 类方法向该用户推送消息, 就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费。
Netty 方案再说明
- Netty 抽象出两组线程池, BossGroup 专门负责接收客户端连接, WorkerGroup 专门负责网络读写操作。
- NioEventLoop表示一个不断循环执行处理任务的线程, 每个 NioEventLoop都有一个selector, 用于监听绑定在其上的socket网络通道。
- NioEventLoop内部采用串行化设计, 从消息的读取 -> 解码 -> 处理 -> 编码 -> 发送, 始终由 IO 线程 NioEventLoop 负责
- NioEventLoopGroup下包含多个NioEventLoop
- 每个NioEventLoop 中包含有一个Selector, 一个 taskQueue
- 每个NioEventLoop 中的 Selector 上可以注册监听多个 NioChannel
- 每个NioChannel 只会绑定在唯一的NioEventLoop上
- 每个NioChannel 都绑定有一个自己的 ChannelPipline
相关推荐
fengshantao 2020-10-29
arctan0 2020-10-14
爱传文档 2020-07-28
gzx0 2020-07-05
fengshantao 2020-07-04
fengshantao 2020-07-02
jannal 2020-06-21
arctan0 2020-06-19
arctan0 2020-06-16
gzx0 2020-06-14
fengshantao 2020-06-13
gzx0 2020-06-12
arctan0 2020-06-11
fengshantao 2020-06-11
mbcsdn 2020-05-19
arctan0 2020-05-16
爱传文档 2020-05-08
爱传文档 2020-05-04