【源起Netty 正传】升级版卡车——ByteBuf
卡车
卡车指的是java原生类ByteBuffer
,这兄弟在NIO界大名鼎鼎,与Channel、Selector的铁三角组合构筑了NIO的核心。之所以称它为卡车,只因《编程思想》中有段比喻:
我们可以把它想象成一个煤矿,通道(Channel)是一个包含煤层(数据)的矿藏,而缓冲器(ByteBuffer)则是派送到矿藏中的卡车。卡车满载煤炭而归,我们再从卡车上获得煤炭。也就是说,我们并没有直接和通道交互;我们只是和缓冲器交互,并把缓冲器派送到通道。
那么升级版卡车,自然指的就是ByteBuf
。
结构和功能
Netty之所以再次打造了升级版的缓冲器,显然是不满ByteBuffer中的某些弊端。
- ByteBuffer长度固定
- 使用者经常需要调用flip()、rewind()方法调整
position
的位置,不方便 - API功能有限
ByteBuffer中有三个重要的位置属性:position、limit、capacity,一个写操作之后大概是这样的
如若想进行读操作,那么flip()的调用是少不了的,从图中不难看出,目前position到limit啥也没有。
调用flip()之后则不一样了(我们不一样~):
而ByteBuf的人设则不相同,它的两个位置属性readIndex
、writeIndex
,分别和读操作、写操作相对应。“写”不操作readIndex,“读”不操作writeIndex,两者不会相互干扰。这里盗几张图说明下好了:
- 初始状态
- 写入N个字节
- 读取M个(M<N)字节
- 释放已读缓存discardReadBytes
重点在于ByteBuf的read和write相关方法,已经封装好了对readIndex、writeIndex位置索引的操作,不需要使用者繁琐的flip()。且write()方法中,ByteBuf设计了自动扩容,这一点后续章节会进行详细说明。
功能方面,主要关注两点:
Derived buffers,类似于数据库视图。ByteBuf提供了多个接口用于创建某ByteBuf的视图或复制ByteBuf:
- duplicate:返回当前ByteBuf的复制对象,缓冲区内容共享(修改复制的ByteBuf,原来的ByteBuf内容也随之改变),索引独立维护。
- copy:内容和索引都独立。
- slice:返回当前ByteBuf的可读子缓冲区,内容共享,索引独立。
转换成
ByteBuffer
nio的SocketChanel进行网络操作,还是操作的java原生的ByteBuffer,所以ByteBuf转换成ByteBuffer的需求还是有市场的。- ByteBuffer nioBuffer():当前ByteBuf的可读缓冲区转换成ByteBuffer,缓冲区内容共享,索引独立。需要指出的是,返回后的ByteBuffer无法感知原ByteBuf的动态扩展操作。
ByteBuf星系
称之为“星系”,是因为ByteBuf一脉涉及到的类实在太多了,但多而不乱,归功于类关系结构的设计。
类关系结构
依然盗图:
从内存分配角度,ByteBuf可分为两类
- 堆内存HeapByteBuf字节缓冲区
- 直接内存DirectByteBuf字节缓冲区
从内存回收角度,ByteBuf也可分为两类:
- 普通缓冲区UnpooledByteBuf
- 池化缓冲区PooledByteBuf
纵观该关继承节构,给我留下的印象就是每层各司其职:读操作以及其它的一些公共功能由父类实现,差异化功能由子类实现。
下面聊下笔者感兴趣的几个点……
AbstractByteBuf的写操作簇
AbstractByteBuf的写操作有很多,这里以writeBytes(byte[] src, int srcIndex, int length)
方法为例
@Override public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { ensureWritable(length); //一、确保可写,对边界进行验证 setBytes(writerIndex, src, srcIndex, length); //二、写入操作,不同类型的子类实现方式不同 writerIndex += length; return this; }
注释部分分别展开看下。
注释一、确保可写,对边界进行验证
跟调用栈ensureWritable -> ensureWritable0
,观察ensureWritable0
方法
final void ensureWritable0(int minWritableBytes) { ensureAccessible(); //确保对象可用 if (minWritableBytes <= writableBytes()) { return; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // Normalize the current capacity to the power of 2. // 三、计算扩容量 int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); // Adjust to the new capacity. capacity(newCapacity); //四、内存分配 }
- 比较
先对要写入的字节数minWritableBytes
进行判断:如果minWritableBytes < capacity - writeIndex,那么很好,不需要扩容;如果minWritableBytes > maxCapacity - writerIndex,也就是要写入字节数超过了允许的最大字节数,直接抛出越界异常IndexOutOfBoundsException。
眼尖的朋友可能发现了,两次用来判断的上界并不相同——capacity
/ maxCapacity
。maxCapacity是AbstractByteBuf的属性,而capacity设定在其子类中。简单看下UnpooledDirectByteBuf
的构造函数:
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); //为AbstractByteBuf的maxCapacity属性赋值 /** * …… * 省略无关部分 */ setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); //capacity赋值 }
也就是说,ByteBuf的结构,可看成这样:
- 扩容计算
@Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } /** * 设置阀值为4MB * 1.如果扩展的容量大于阀值,对扩张后的内存和最大内存进行比较:大于最大长度使用最大长度,否则步进4M * 2.如果需要扩展的容量小于阀值,以64进行计数倍增:64->128->256;为防止倍增过猛,最后与最大值再次进行比较 */ final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
具体的扩容策略,已拍入注释中,尽可查看!
注释二、写入操作,不同类型的子类实现方式不同
对比下UnpooledDirectByteBuf
和UnpooledHeapByteBuf
的实现
- UnpooledDirectByteBuf
@Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); ByteBuffer tmpBuf = internalNioBuffer(); //分配 tmpBuf.clear().position(index).limit(index + length); tmpBuf.put(src, srcIndex, length); return this; }
- UnpooledHeapByteBuf
@Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); //分配 return this; }
篇幅有限,不展开说了,结论就是:
UnpooledDirectByteBuf的底层实现为ByteBuffer.allocateDirect
,分配时复制体通过buffer.duplicate()
获取复制体;而UnpooledHeapByteBuf的底层实现为byte[]
,分配时通过System.arraycopy
方法拷贝副本。
AbstractReferenceCountedByteBuf
AbstractReferenceCountedByteBuf
的名字就挺有意思——“引用计数”,一副JVM垃圾回收的即视感。而事实上,也差不多一个意思。
看下类属性:
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); private volatile int refCnt;
以原子方式更新属性的AtomicIntegerFieldUpdater
起了关键作用,将会对volatile
修饰的refCnt
进行更新,见retain方法(下面展示的是retain的关键部分retain0
):
private ByteBuf retain0(final int increment) { int oldRef = refCntUpdater.getAndAdd(this, increment); if (oldRef <= 0 || oldRef + increment < oldRef) { // Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow. refCntUpdater.getAndAdd(this, -increment); throw new IllegalReferenceCountException(oldRef, increment); } return this; }
源码阅读很有意思的一点就是能看到些自己不熟悉的类,比如AtomicIntegerFieldUpdater
我以前就没接触过!
内存池
内存池可有效的提升效率,道理和线程池、数据库连接池相通,即省去了重复创建销毁的过程。
到目前为止,看到的都是ByteBuf中的各Unpooled实现,而池化版的ByteBuf没怎么提过。为何如此?因为池化的实现较复杂,以我现在的功力尚不能完全掌握透彻。
先聊下内存池的设计思路,涨涨姿势:
为了集中集中管理内存的分配和释放,同事提高分配和释放内存时候的性能,很多框架和应用都会通过预先申请一大块内存,然后通过提供相应的分配和释放接口来使用内存。这样一来,堆内存的管理就被集中到几个类或函数中,由于不再频繁使用系统调用来申请和释放内存,应用或系统的性能也会大大提高。 ——节选自《Netty权威指南》
Netty的ByteBuf内存池也是按照这个思路搞的。首先,看下官方注释:
/** * Notation: The following terms are important to understand the code * > page - a page is the smallest unit of memory chunk that can be allocated * > chunk - a chunk is a collection of pages * > in this code chunkSize = 2^{maxOrder} * pageSize */
这里面有两个重要的概念page
(页)和chunk
(块),chunk管理多个page组成二叉树结构,大概就是这个样子:
选择二叉树是有原因的:
/** * To search for the first offset in chunk that has at least requested size available we construct a * complete balanced binary tree and store it in an array (just like heaps) - memoryMap */
为了在chunk中找到至少可用的size的偏移量offset。
继线性结构后,人们又发明了树形结构的意义在于“提升查询效率”,也同样是这里选择二叉树的原因。
小于一个page的内存,直接在PoolSubpage
中分配完成。
某块内存是否分配,将通过状态位进行标识。
后记
一如既往的啰嗦几句,最近工作忙,更新文章较慢,希望自己能坚持,如发现问题望大家指正!
thanks..