聊聊flink的MemoryPool
序
本文主要研究一下flink的MemoryPool
MemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
abstract static class MemoryPool { abstract int getNumberOfAvailableMemorySegments(); abstract MemorySegment allocateNewSegment(Object owner); abstract MemorySegment requestSegmentFromPool(Object owner); abstract void returnSegmentToPool(MemorySegment segment); abstract void clear(); }
- MemoryPool定义了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear这几个抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool这两个子类
HybridHeapMemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
static final class HybridHeapMemoryPool extends MemoryPool { /** The collection of available memory segments. */ private final ArrayDeque<byte[]> availableMemory; private final int segmentSize; HybridHeapMemoryPool(int numInitialSegments, int segmentSize) { this.availableMemory = new ArrayDeque<>(numInitialSegments); this.segmentSize = segmentSize; for (int i = 0; i < numInitialSegments; i++) { this.availableMemory.add(new byte[segmentSize]); } } @Override MemorySegment allocateNewSegment(Object owner) { return MemorySegmentFactory.allocateUnpooledSegment(segmentSize, owner); } @Override MemorySegment requestSegmentFromPool(Object owner) { byte[] buf = availableMemory.remove(); return MemorySegmentFactory.wrapPooledHeapMemory(buf, owner); } @Override void returnSegmentToPool(MemorySegment segment) { if (segment.getClass() == HybridMemorySegment.class) { HybridMemorySegment heapSegment = (HybridMemorySegment) segment; availableMemory.add(heapSegment.getArray()); heapSegment.free(); } else { throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName()); } } @Override protected int getNumberOfAvailableMemorySegments() { return availableMemory.size(); } @Override void clear() { availableMemory.clear(); } }
- HybridHeapMemoryPool继承了MemoryPool,它使用的是jvm的heap内存;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为byte[]
- allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledSegment,用于分配unpooled memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意
- returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的byte[]归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
HybridOffHeapMemoryPool
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java
static final class HybridOffHeapMemoryPool extends MemoryPool { /** The collection of available memory segments. */ private final ArrayDeque<ByteBuffer> availableMemory; private final int segmentSize; HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) { this.availableMemory = new ArrayDeque<>(numInitialSegments); this.segmentSize = segmentSize; for (int i = 0; i < numInitialSegments; i++) { this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize)); } } @Override MemorySegment allocateNewSegment(Object owner) { return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner); } @Override MemorySegment requestSegmentFromPool(Object owner) { ByteBuffer buf = availableMemory.remove(); return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner); } @Override void returnSegmentToPool(MemorySegment segment) { if (segment.getClass() == HybridMemorySegment.class) { HybridMemorySegment hybridSegment = (HybridMemorySegment) segment; ByteBuffer buf = hybridSegment.getOffHeapBuffer(); availableMemory.add(buf); hybridSegment.free(); } else { throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName()); } } @Override protected int getNumberOfAvailableMemorySegments() { return availableMemory.size(); } @Override void clear() { availableMemory.clear(); } }
- HybridOffHeapMemoryPool继承了MemoryPool,它使用的是OffHeap;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为ByteBuffer
- allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配unpooled off-heap memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledOffHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意
- returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的ByteBuffer归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
小结
- MemoryPool定义了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear这几个抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool这两个子类
- HybridHeapMemoryPool继承了MemoryPool,它使用的是jvm的heap内存;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为byte[];allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledSegment,用于分配unpooled memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意;returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的byte[]归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
- HybridOffHeapMemoryPool继承了MemoryPool,它使用的是OffHeap;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为ByteBuffer;allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配unpooled off-heap memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledOffHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意;returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的ByteBuffer归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11