Netty中使用零拷贝方式写大数据
因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。让我们考虑下将一个文件内容写出到网络的情况。
我们知道在传输的过程中,由于NIO的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在Netty的核心中,所以应用程序所有需要做的就是使用一个FileRegion接口的实现,其在Netty的API文档中的定义是:“通过支持零拷贝的文件传输的Channel来发送的文件区域。”
下面的代码清单展示了如何通过从FileInputStream创建一个DefaultFileRegion,并将其写入Channel,从而利用零拷贝特性来传输一个文件的内容。
使用FileRegion传输文件的内容
FileInputStream in = new FileInputStream(file); // 创建一个FileInputStream FileRegion region = new DefaultFileRegion(in.getChannel(), 0, // 以该文件的完整长度创建一个新的DefaultFileRegion file.length());channel.writeAndFlush(region).addListener(// 发送该DefaultFileRegion,并注册一个ChannelFutureListener new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { Throwable cause = future.cause();// 处理失败 // Do something } } });
这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗。
关键是interfaceChunkedInput<B>,其中类型参数B是readChunk()方法返回的类型。Netty预置了该接口的4个实现,如下表中所列出的。每个都代表了一个将由ChunkedWriteHandler处理的不定长度的数据流。
ChunkedInput 的实现
ChunkedFile 从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用
ChunkedNioFile 和ChunkedFile 类似,只是它使用了FileChannel
ChunkedStream 从InputStream 中逐块传输内容
ChunkedNioStream 从ReadableByteChannel 中逐块传输内容
下面代码清单说明了ChunkedStream的用法,它是实践中最常用的实现。所示的类使用了一个File以及一个SslContext进行实例化。当initChannel()方法被调用时,它将使用所示的ChannelHandler 链初始化该Channel。
当Channel 的状态变为活动的时,WriteStreamHandler 将会逐块地把来自文件中的数据作为ChunkedStream 写入。数据在传输之前将会由SslHandler加密。
使用ChunkedStream 传输文件内容
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> { private final File file; private final SslContext sslCtx; public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) { this.file = file; this.sslCtx = sslCtx; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());// 将SslHandler添加到ChannelPipeline中 pipeline.addLast(new ChunkedWriteHandler());// 添加ChunkedWriteHandler以处理作为ChunkedInput传入的数据 pipeline.addLast(new WriteStreamHandler());// 一旦连接建立,WriteStreamHandler就开始写文件数据 } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {// 当连接建立时,channelActive()方法将使用ChunkedInput写文件数据 super.channelActive(ctx); ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file))); } } }
逐块输入要使用你自己的ChunkedInput实现,请在ChannelPipeline中安装一个ChunkedWriteHandler。在本节中,我们讨论了如何通过使用零拷贝特性来高效地传输文件,以及如何通过使用ChunkedWriteHandler来写大型数据而又不必冒着导致OutOfMemoryError的风险。