Flink从入门到放弃之源码解析系列-第4章 TaskManager
TaskManager 在 Flink 中也被叫做一个 Instance,统一管理该物理节点上的所有 Flink job 的 task 的运行,它的功能包括了 task 的启动销毁、内存管理、磁盘IO、网络传输管理等,本章将一一介绍这些功能,方面后续章节的开展
MemoryManager
MemoryManager 统一管理了 flink 的内存使用,内存被划分为相同大小的 segment,通过申请不同数量的 segment 来分配不同大小的内存
这里支持两种内存:on-heap 内存和 off-heap 内存,通过参数可以控制分配内存的种类
MemoryManager 管理内存也分两种模式:预分配和按需分配。预分配模式下,内存在启动时就会分好,这就会意味着不会发生 OOM 异常,释放的内存会重新归还 MemoryManager 的内存池;按需模式下,MemoryManager 仅仅追踪内存的使用【做记录】,释放内存不会归还 MemoryManager 的内存池,而是通过托管给 JVM 的垃圾回收来最终释放,这样便可能会发生 OOM
下面我们就来分析下 MemoryManager 的实现细节
MemorySegment
上面已经提到,MemoryManager 以 segment 为单位来实现内存的分配和管理,在 flink 中一个 segment 被抽象为 MemorySegment,MemorySegment 为抽象类,定义了基本的 put/get 方法,以及 swap、compare 等工具方法,同时维护了一个偏移量:BYTE_ARRAY_BASE_OFFSET,这个偏移量为 byte[] 对象在内存中的基本偏移量,后续通过 sun.misc.Unsafe 直接操纵内存就是基于这个偏移量来完成,这个类定义的实现方法屏蔽了内存的种类【堆和非堆】,当其成员变量 heapMemory 不为空时就是堆内存,此时的 address 就是 BYTE_ARRAY_BASE_OFFSET;而 heapMemory 为 null 时代表非堆内存,此时的 address 是内存中的绝对地址。
MemorySegment 有两个实现类:HeapMemorySegment 和 HibridMemorySegment,分别代表堆内存 segment 和 非堆内存 segment,具体的继承关系如下:
HeapMemorySegment 和 HibridMemorySegment 中都分别定义了工厂类来实例化对象实例。
MemoryPool
MemoryPool 是 MemoryManager 用来统一管理资源的组件,具体又分为 HeapMemoryPool 和 HybridOffHeapMemoryPool,前者管理堆内存,后者管理非堆内存。
先来看HeapMemoryPool
//MemoryManager line 677
@Override
MemorySegment allocateNewSegment(Object owner) {
return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
}
@Override
MemorySegment requestSegmentFromPool(Object owner) {
ByteBuffer buf = availableMemory.remove();
return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
}
@Override
void returnSegmentToPool(MemorySegment segment) {
if (segment.getClass() == HybridMemorySegment.class) {
HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
ByteBuffer buf = hybridSegment.getOffHeapBuffer();
availableMemory.add(buf);
hybridSegment.free();
}
else {
throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
}
}
简单总结:
- allocateNewSegment 走的是 on demand 模式,通过 new byte[] 从堆上分配内存
- requestSegmentFromPool 走的是 pre allocate 模式,通过复用已有的堆对象
HybridOffHeapMemoryPool 的接口与其类似,不过分配内存走的是 ByteBuffer.allocateDirect(segmentSize); 直接分配了物理内存,也就是非堆内存
IOManager
flink 通过 IOManager 来控制磁盘 IO 的过程,提供同步和异步两种写模式【其实只有异步】,具体的读写方式又分为 block、buffer、bulk 三种方式;用户可以指定 IO 的文件目录集合,IOManager 会以 round-robin 的方式写不同目录的不同文件。
IOManager 提供两种方式枚举新的 IO 文件:
- 直接 round-robin 文件夹并生成文件,每个新文件的命名 pattern 为 random_hex_string.channel,最终对应的目录结构是:
- path1/random_hex_string1.channel
- path2/random_hex_string2.channel
- path3/random_hex_string3.channel
- 采取 Enumerator 的模式,每个 Enumerator 也是类似如上一种方式进行 round-robin,不过 Enumerator 会维护一个固定的本地命名前缀、一个本地计数器、一个全局计数器,命名前缀用于区分不同的 Enumerator 写的文件,本地计数器用于 Enumerator 自身的文件命名递增,全局计数器用于 round-robin 文件夹,最终的目录结构是:
- path1/prefix.local_counter1.channel
- path2/prefix.local_counter2.channel
- path3/prefix.local_counter3.channel
flink 又进一步将一个文件的 IO 抽象成了 FileIOChannel,通过 FileIOChannel 封装了底层的文件读写,具体的继承关系如下:
IOManager 的唯一实现类:IOManagerAsync 为每个人临时文件加【用户初始化的时候指定】维护了一个读线程和写线程,并且每个读写线程内部会维护一个请求队列: RequestQueue,上面的 FileIOChannel 通过将 读写请求加入到对应的 RequestQueue 中来实现文件读写,具体的线程模型如下:
ps: 默认的临时文件夹目录是 java.io.tmpDir
NetworkEnvironment
NetworkEnvironment 是每个 Instance 的网络 IO 组件,包含了追踪中间结果和数据交换的数据结构。它的构造器会统一将配置的内存先分配出来,抽象成 NetworkBufferPool 统一管理内存的申请和释放。
BufferPool
从 MemoryManager 的介绍中我们讲到 flink 是以 MemorySegment 为单位来管理内存的,而一个 MemorySegment 又被叫做一个 Buffer。BufferPool 是管理 Buffer 的工具。Buffer 的申请统一交给 NetworkBufferPool,具体的管理交给 LocalBufferPool。
LocalBufferPool
我们来看 LocalBufferPool 的关键接口,以了解具体都有哪些方式来管理 Buffer 。
申请buffer
//LocalBufferPool line201
@Override
public Buffer requestBuffer() throws IOException {
try {
return toBuffer(requestMemorySegment(false));
}
catch (InterruptedException e) {
throw new IOException(e);
}
}
//LocalBufferPool line221
private Buffer toBuffer(MemorySegment memorySegment) {
if (memorySegment == null) {
return null;
}
return new NetworkBuffer(memorySegment, this);
}
总结其逻辑:
- 申请 Buffer
- 释放超量申请的 Buffer
- 像 NetworkBufferPool 申请 Buffer
- 如果此 LocalBufferPool 有 owner【ResultPartition】,像 ResultPartition 释放内存,这里又会下发到 ResultPartition 的 subPartition,释放是以 subPartition 的全部内存为单位,会将内存中的数据吐到磁盘上或者不释放【依据配置的不同】
回收 Buffer
//LocalBufferPool line273
public void recycle(MemorySegment segment) {
BufferListener listener;
NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
while (!notificationResult.isBufferUsed()) {
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
return;
} else {
listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
return;
}
}
}
notificationResult = fireBufferAvailableNotification(listener, segment);
}
}
简单的总结:
- 如果此 LocalBuffer 已销毁或超量使用,将 Buffer 归还给 NetworkBufferPool
- 否则如果注册了 EventListener ,通知每个 listener 这个 Buffer 被回收
- 如果没有注册,将这个 Buffer 重新标记为可使用【加入到待申请队列】
调整 Buffer 大小
//LocalBufferPool line237
public void setNumBuffers(int numBuffers) throws IOException {
int numExcessBuffers;
synchronized (availableMemorySegments) {
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);
if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
} else {
currentPoolSize = numBuffers;
}
returnExcessMemorySegments();
numExcessBuffers = numberOfRequestedMemorySegments - currentPoolSize;
}
// If there is a registered owner and we have still requested more buffers than our
// size, trigger a recycle via the owner.
if (owner.isPresent() && numExcessBuffers > 0) {
owner.get().releaseMemory(numExcessBuffers);
}
}
简单总结:
- 归还超量使用的内存给 NetworkBufferPool
- 如果还是超量使用,调用 owner 的释放接口【以 ResultSubPartiton 为单位释放】
NetworkBufferPool
上面已经提到,NetworkbufferPool 统一管理了网络栈的内存,LocalBufferPool 只是管理 Buffer 的方式,具体的申请和释放还是要走 NetworkBufferPool 的接口。值得注意的是,NetworkBufferPool 在实例化的时候就将初始的固定大小的内存分配出来了【不管是堆还是非堆】。我们来看它的关键接口:
创建 LocalBufferPool
//NetworkBufferPool line256
public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
return createBufferPool(numRequiredBuffers, maxUsedBuffers, Optional.empty());
}
@Override
public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, Optional<BufferPoolOwner> owner) throws IOException {
// It is necessary to use a separate lock from the one used for buffer
// requests to ensure deadlock freedom for failure cases.
synchronized (factoryLock) {
if (isDestroyed) {
throw new IllegalStateException("Network buffer pool has already been destroyed.");
}
// Ensure that the number of required buffers can be satisfied.
// With dynamic memory management this should become obsolete.
if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
throw new IOException(String.format("Insufficient number of network buffers: " +
"required %d, but only %d available. The total number of network " +
"buffers is currently set to %d of %d bytes each. You can increase this " +
"number by setting the configuration keys '%s', '%s', and '%s'.",
numRequiredBuffers,
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}
this.numTotalRequiredBuffers += numRequiredBuffers;
// We are good to go, create a new buffer pool and redistribute
// non-fixed size buffers.
LocalBufferPool localBufferPool =
new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers, owner);
allBufferPools.add(localBufferPool);
try {
redistributeBuffers();
} catch (IOException e) {
try {
destroyBufferPool(localBufferPool);
} catch (IOException inner) {
e.addSuppressed(inner);
}
ExceptionUtils.rethrowIOException(e);
}
return localBufferPool;
}
}
简单总结:
- 做一些状态备份,包括整体使用的 Buffer 数、可动态调整大小的 BufferPool 等
- 对于可动态调整的 BufferPool,重新调整可用内存,调整方式为 round-robin
销毁 LocalBufferPool
//NetworkBufferPool line310
public void destroyBufferPool(BufferPool bufferPool) throws IOException {
if (!(bufferPool instanceof LocalBufferPool)) {
throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
}
synchronized (factoryLock) {
if (allBufferPools.remove(bufferPool)) {
numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
redistributeBuffers();
}
}
}
简单总结:
- 消除状态记录
- 对于可动态调整的 BufferPool,重新调整可用内存,调整方式为 round-robin轮询调度算法