聊聊flink的MemCheckpointStreamFactory
序
本文主要研究一下flink的MemCheckpointStreamFactory
CheckpointStreamFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.java
/** * A factory for checkpoint output streams, which are used to persist data for checkpoints. * * <p>Stream factories can be created from the {@link CheckpointStorage} through * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}. */ public interface CheckpointStreamFactory { CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; abstract class CheckpointStateOutputStream extends FSDataOutputStream { @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; @Override public abstract void close() throws IOException; } }
- CheckpointStreamFactory为checkpoint output streams(
用于持久化checkpoint的数据
)的工厂,它定义了createCheckpointStateOutputStream方法,这里返回的是CheckpointStateOutputStream;CheckpointStateOutputStream继承了FSDataOutputStream,它定义了closeAndGetHandle及close两个抽象方法 - CheckpointStreamFactory有两个以factory命名的实现类,分别是MemCheckpointStreamFactory(
它有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation
)、FsCheckpointStreamFactory(它有一个子类为FsCheckpointStorageLocation
) - CheckpointStorageLocation接口继承了CheckpointStreamFactory接口,它有三个实现类,分别是NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocation
FSDataOutputStream
flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.java
@Public public abstract class FSDataOutputStream extends OutputStream { public abstract long getPos() throws IOException; public abstract void flush() throws IOException; public abstract void sync() throws IOException; public abstract void close() throws IOException; }
- FSDataOutputStream继承了java的OutputStream,它多定义了getPos、flush、sync、close几个抽象方法
CheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.java
/** * A storage location for one particular checkpoint, offering data persistent, metadata persistence, * and lifecycle/cleanup methods. * * <p>CheckpointStorageLocations are typically created and initialized via * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}. */ public interface CheckpointStorageLocation extends CheckpointStreamFactory { CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException; void disposeOnFailure() throws IOException; CheckpointStorageLocationReference getLocationReference(); }
- CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法;这里定义了createMetadataOutputStream方法用来创建CheckpointMetadataOutputStream;disposeOnFailure方法用于在checkpoint失败的时候dispose checkpoint location;getLocationReference用于返回CheckpointStorageLocationReference
MemCheckpointStreamFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
/** * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. */ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { /** The maximal size that the snapshotted memory state may have */ private final int maxStateSize; /** * Creates a new in-memory stream factory that accepts states whose serialized forms are * up to the given number of bytes. * * @param maxStateSize The maximal size of the serialized state */ public MemCheckpointStreamFactory(int maxStateSize) { this.maxStateSize = maxStateSize; } @Override public CheckpointStateOutputStream createCheckpointStateOutputStream( CheckpointedStateScope scope) throws IOException { return new MemoryCheckpointOutputStream(maxStateSize); } @Override public String toString() { return "In-Memory Stream Factory"; } static void checkSize(int size, int maxSize) throws IOException { if (size > maxSize) { throw new IOException( "Size of the state is larger than the maximum permitted memory-backed state. Size=" + size + " , maxSize=" + maxSize + " . Consider using a different state backend, like the File System State backend."); } } /** * A {@code CheckpointStateOutputStream} that writes into a byte array. */ public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); private final int maxSize; private AtomicBoolean closed; boolean isEmpty = true; public MemoryCheckpointOutputStream(int maxSize) { this.maxSize = maxSize; this.closed = new AtomicBoolean(false); } @Override public void write(int b) throws IOException { os.write(b); isEmpty = false; } @Override public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); isEmpty = false; } @Override public void flush() throws IOException { os.flush(); } @Override public void sync() throws IOException { } // -------------------------------------------------------------------- @Override public void close() { if (closed.compareAndSet(false, true)) { closeInternal(); } } @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { if (isEmpty) { return null; } return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes()); } @Override public long getPos() throws IOException { return os.getPosition(); } public boolean isClosed() { return closed.get(); } /** * Closes the stream and returns the byte array containing the stream's data. * @return The byte array containing the stream's data. * @throws IOException Thrown if the size of the data exceeds the maximal */ public byte[] closeAndGetBytes() throws IOException { if (closed.compareAndSet(false, true)) { checkSize(os.size(), maxSize); byte[] bytes = os.toByteArray(); closeInternal(); return bytes; } else { throw new IOException("stream has already been closed"); } } private void closeInternal() { os.reset(); } } }
- MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,这里createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream
- MemoryCheckpointOutputStream继承了CheckpointStateOutputStream,里头使用了ByteArrayOutputStreamWithPos,它在closeAndGetHandle的时候会校验大小是否超过maxSize的限制,超出则抛出IOException异常
- MemCheckpointStreamFactory有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation,它们都实现了CheckpointStorageLocation接口
NonPersistentMetadataCheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
/** * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence * for metadata has been configured. */ public class NonPersistentMetadataCheckpointStorageLocation extends MemCheckpointStreamFactory implements CheckpointStorageLocation { /** The external pointer returned for checkpoints that are not externally addressable. */ public static final String EXTERNAL_POINTER = "<checkpoint-not-externally-addressable>"; public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) { super(maxStateSize); } @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new MetadataOutputStream(); } @Override public void disposeOnFailure() {} @Override public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); } // ------------------------------------------------------------------------ // CompletedCheckpointStorageLocation // ------------------------------------------------------------------------ /** * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the * metadata in an internal byte array. */ private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation { private static final long serialVersionUID = 1L; private final ByteStreamStateHandle metaDataHandle; NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) { this.metaDataHandle = metaDataHandle; } @Override public String getExternalPointer() { return EXTERNAL_POINTER; } @Override public StreamStateHandle getMetadataHandle() { return metaDataHandle; } @Override public void disposeStorageLocation() {} } // ------------------------------------------------------------------------ // CheckpointMetadataOutputStream // ------------------------------------------------------------------------ private static class MetadataOutputStream extends CheckpointMetadataOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); private boolean closed; @Override public void write(int b) throws IOException { os.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); } @Override public void flush() throws IOException { os.flush(); } @Override public long getPos() throws IOException { return os.getPosition(); } @Override public void sync() throws IOException { } @Override public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { synchronized (this) { if (!closed) { closed = true; byte[] bytes = os.toByteArray(); ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes); return new NonPersistentCompletedCheckpointStorageLocation(handle); } else { throw new IOException("Already closed"); } } } @Override public void close() { if (!closed) { closed = true; os.reset(); } } } }
- MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是MetadataOutputStream
- MetadataOutputStream继承了CheckpointMetadataOutputStream,里头使用的是ByteArrayOutputStreamWithPos,而closeAndFinalizeCheckpoint返回的是NonPersistentCompletedCheckpointStorageLocation
- NonPersistentCompletedCheckpointStorageLocation实现了CompletedCheckpointStorageLocation接口,其getMetadataHandle方法返回的是ByteStreamStateHandle
PersistentMetadataCheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
/** * A checkpoint storage location for the {@link MemoryStateBackend} when it durably * persists the metadata in a file system. */ public class PersistentMetadataCheckpointStorageLocation extends MemCheckpointStreamFactory implements CheckpointStorageLocation { private final FileSystem fileSystem; private final Path checkpointDirectory; private final Path metadataFilePath; /** * Creates a checkpoint storage persists metadata to a file system and stores state * in line in state handles with the metadata. * * @param fileSystem The file system to which the metadata will be written. * @param checkpointDir The directory where the checkpoint metadata will be written. */ public PersistentMetadataCheckpointStorageLocation( FileSystem fileSystem, Path checkpointDir, int maxStateSize) { super(maxStateSize); this.fileSystem = checkNotNull(fileSystem); this.checkpointDirectory = checkNotNull(checkpointDir); this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME); } // ------------------------------------------------------------------------ @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory); } @Override public void disposeOnFailure() throws IOException { // on a failure, no chunk in the checkpoint directory needs to be saved, so // we can drop it as a whole fileSystem.delete(checkpointDirectory, true); } @Override public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); } }
- MemoryBackendCheckpointStorage在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是FsCheckpointMetadataOutputStream;FsCheckpointMetadataOutputStream的构造器接收三个参数,分别是fileSystem、metadataFilePath、exclusiveCheckpointDir;其中fileSystem用于根据metadataFilePath来创建FSDataOutputStream,而exclusiveCheckpointDir则在返回FsCompletedCheckpointStorageLocation的时候用到
小结
- MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocation
- NonPersistentMetadataCheckpointStorageLocation及PersistentMetadataCheckpointStorageLocation都继承了MemCheckpointStreamFactory类,同时实现了CheckpointStorageLocation接口(
其createMetadataOutputStream方法返回的CheckpointMetadataOutputStream类型分别为MetadataOutputStream、FsCheckpointMetadataOutputStream
) - MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,它的createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream;CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法