聊聊flink的FsCheckpointStreamFactory
序
本文主要研究一下flink的FsCheckpointStreamFactory
CheckpointStreamFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.java
/** * A factory for checkpoint output streams, which are used to persist data for checkpoints. * * <p>Stream factories can be created from the {@link CheckpointStorage} through * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}. */ public interface CheckpointStreamFactory { CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; abstract class CheckpointStateOutputStream extends FSDataOutputStream { @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; @Override public abstract void close() throws IOException; } }
- CheckpointStreamFactory为checkpoint output streams(
用于持久化checkpoint的数据
)的工厂,它定义了createCheckpointStateOutputStream方法,这里返回的是CheckpointStateOutputStream;CheckpointStateOutputStream继承了FSDataOutputStream,它定义了closeAndGetHandle及close两个抽象方法 - CheckpointStreamFactory有两个以factory命名的实现类,分别是MemCheckpointStreamFactory(
它有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation
)、FsCheckpointStreamFactory(它有一个子类为FsCheckpointStorageLocation
) - CheckpointStorageLocation接口继承了CheckpointStreamFactory接口,它有三个实现类,分别是NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocation
FSDataOutputStream
flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.java
@Public public abstract class FSDataOutputStream extends OutputStream { public abstract long getPos() throws IOException; public abstract void flush() throws IOException; public abstract void sync() throws IOException; public abstract void close() throws IOException; }
- FSDataOutputStream继承了java的OutputStream,它多定义了getPos、flush、sync、close几个抽象方法
CheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.java
/** * A storage location for one particular checkpoint, offering data persistent, metadata persistence, * and lifecycle/cleanup methods. * * <p>CheckpointStorageLocations are typically created and initialized via * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}. */ public interface CheckpointStorageLocation extends CheckpointStreamFactory { CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException; void disposeOnFailure() throws IOException; CheckpointStorageLocationReference getLocationReference(); }
- CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法;这里定义了createMetadataOutputStream方法用来创建CheckpointMetadataOutputStream;disposeOnFailure方法用于在checkpoint失败的时候dispose checkpoint location;getLocationReference用于返回CheckpointStorageLocationReference
FsCheckpointStreamFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
public class FsCheckpointStreamFactory implements CheckpointStreamFactory { private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class); /** Maximum size of state that is stored with the metadata, rather than in files. */ public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; /** Default size for the write buffer. */ public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096; /** State below this size will be stored as part of the metadata, rather than in files. */ private final int fileStateThreshold; /** The directory for checkpoint exclusive state data. */ private final Path checkpointDirectory; /** The directory for shared checkpoint data. */ private final Path sharedStateDirectory; /** Cached handle to the file system for file operations. */ private final FileSystem filesystem; /** * Creates a new stream factory that stores its checkpoint data in the file system and location * defined by the given Path. * * <p><b>Important:</b> The given checkpoint directory must already exist. Refer to the class-level * JavaDocs for an explanation why this factory must not try and create the checkpoints. * * @param fileSystem The filesystem to write to. * @param checkpointDirectory The directory for checkpoint exclusive state data. * @param sharedStateDirectory The directory for shared checkpoint data. * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, * rather than in files */ public FsCheckpointStreamFactory( FileSystem fileSystem, Path checkpointDirectory, Path sharedStateDirectory, int fileStateSizeThreshold) { if (fileStateSizeThreshold < 0) { throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); } if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) { throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + MAX_FILE_STATE_THRESHOLD); } this.filesystem = checkNotNull(fileSystem); this.checkpointDirectory = checkNotNull(checkpointDirectory); this.sharedStateDirectory = checkNotNull(sharedStateDirectory); this.fileStateThreshold = fileStateSizeThreshold; } // ------------------------------------------------------------------------ @Override public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException { Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory; int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold); } // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ @Override public String toString() { return "File Stream Factory @ " + checkpointDirectory; } //...... }
- FsCheckpointStreamFactory实现了CheckpointStreamFactory接口,这里createCheckpointStateOutputStream方法返回FsCheckpointStateOutputStream;FsCheckpointStreamFactory有一个子类为FsCheckpointStorageLocation,它实现了CheckpointStorageLocation接口
FsCheckpointStateOutputStream
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
/** * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and * returns a {@link StreamStateHandle} upon closing. */ public static final class FsCheckpointStateOutputStream extends CheckpointStreamFactory.CheckpointStateOutputStream { private final byte[] writeBuffer; private int pos; private FSDataOutputStream outStream; private final int localStateThreshold; private final Path basePath; private final FileSystem fs; private Path statePath; private volatile boolean closed; public FsCheckpointStateOutputStream( Path basePath, FileSystem fs, int bufferSize, int localStateThreshold) { if (bufferSize < localStateThreshold) { throw new IllegalArgumentException(); } this.basePath = basePath; this.fs = fs; this.writeBuffer = new byte[bufferSize]; this.localStateThreshold = localStateThreshold; } @Override public void write(int b) throws IOException { if (pos >= writeBuffer.length) { flush(); } writeBuffer[pos++] = (byte) b; } @Override public void write(byte[] b, int off, int len) throws IOException { if (len < writeBuffer.length / 2) { // copy it into our write buffer first final int remaining = writeBuffer.length - pos; if (len > remaining) { // copy as much as fits System.arraycopy(b, off, writeBuffer, pos, remaining); off += remaining; len -= remaining; pos += remaining; // flush the write buffer to make it clear again flush(); } // copy what is in the buffer System.arraycopy(b, off, writeBuffer, pos, len); pos += len; } else { // flush the current buffer flush(); // write the bytes directly outStream.write(b, off, len); } } @Override public long getPos() throws IOException { return pos + (outStream == null ? 0 : outStream.getPos()); } @Override public void flush() throws IOException { if (!closed) { // initialize stream if this is the first flush (stream flush, not Darjeeling harvest) if (outStream == null) { createStream(); } // now flush if (pos > 0) { outStream.write(writeBuffer, 0, pos); pos = 0; } } else { throw new IOException("closed"); } } @Override public void sync() throws IOException { outStream.sync(); } /** * Checks whether the stream is closed. * @return True if the stream was closed, false if it is still open. */ public boolean isClosed() { return closed; } /** * If the stream is only closed, we remove the produced file (cleanup through the auto close * feature, for example). This method throws no exception if the deletion fails, but only * logs the error. */ @Override public void close() { if (!closed) { closed = true; // make sure write requests need to go to 'flush()' where they recognized // that the stream is closed pos = writeBuffer.length; if (outStream != null) { try { outStream.close(); } catch (Throwable throwable) { LOG.warn("Could not close the state stream for {}.", statePath, throwable); } finally { try { fs.delete(statePath, false); } catch (Exception e) { LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e); } } } } } @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { // check if there was nothing ever written if (outStream == null && pos == 0) { return null; } synchronized (this) { if (!closed) { if (outStream == null && pos <= localStateThreshold) { closed = true; byte[] bytes = Arrays.copyOf(writeBuffer, pos); pos = writeBuffer.length; return new ByteStreamStateHandle(createStatePath().toString(), bytes); } else { try { flush(); pos = writeBuffer.length; long size = -1L; // make a best effort attempt to figure out the size try { size = outStream.getPos(); } catch (Exception ignored) {} outStream.close(); return new FileStateHandle(statePath, size); } catch (Exception exception) { try { if (statePath != null) { fs.delete(statePath, false); } } catch (Exception deleteException) { LOG.warn("Could not delete the checkpoint stream file {}.", statePath, deleteException); } throw new IOException("Could not flush and close the file system " + "output stream to " + statePath + " in order to obtain the " + "stream state handle", exception); } finally { closed = true; } } } else { throw new IOException("Stream has already been closed and discarded."); } } } private Path createStatePath() { return new Path(basePath, UUID.randomUUID().toString()); } private void createStream() throws IOException { Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { try { OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware( fs, createStatePath(), WriteMode.NO_OVERWRITE); this.outStream = streamAndPath.stream(); this.statePath = streamAndPath.path(); return; } catch (Exception e) { latestException = e; } } throw new IOException("Could not open output stream for state backend", latestException); } }
- FsCheckpointStateOutputStream继承了CheckpointStreamFactory.CheckpointStateOutputStream,它的构造器要指定basePath、fs、bufferSize、localStateThreshold这几个参数
- bufferSize用于指定writeBuffer的大小,在write(int b)方法,会判断如果pos大于writeBuffer大小的话,会执行flush操作;在write(byte[] b, int off, int len)方法,对于len大于等于writeBuffer.length / 2的会先flush,然后直接写到outStream;对于len小于writeBuffer.length / 2的,则直接写到writeBuffer(
在这之前判断如果len大于remaining则拷贝remaining的数据到writeBuffer然后进行flush
) - closeAndGetHandle方法对于pos小于等于localStateThreshold的返回ByteStreamStateHandle,大于该阈值的则返回FileStateHandle
FsCheckpointStorageLocation
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
/** * A storage location for checkpoints on a file system. */ public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory implements CheckpointStorageLocation { private final FileSystem fileSystem; private final Path checkpointDirectory; private final Path sharedStateDirectory; private final Path taskOwnedStateDirectory; private final Path metadataFilePath; private final CheckpointStorageLocationReference reference; private final int fileStateSizeThreshold; public FsCheckpointStorageLocation( FileSystem fileSystem, Path checkpointDir, Path sharedStateDir, Path taskOwnedStateDir, CheckpointStorageLocationReference reference, int fileStateSizeThreshold) { super(fileSystem, checkpointDir, sharedStateDir, fileStateSizeThreshold); checkArgument(fileStateSizeThreshold >= 0); this.fileSystem = checkNotNull(fileSystem); this.checkpointDirectory = checkNotNull(checkpointDir); this.sharedStateDirectory = checkNotNull(sharedStateDir); this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir); this.reference = checkNotNull(reference); // the metadata file should not have entropy in its path Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir); this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME); this.fileStateSizeThreshold = fileStateSizeThreshold; } // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ public Path getCheckpointDirectory() { return checkpointDirectory; } public Path getSharedStateDirectory() { return sharedStateDirectory; } public Path getTaskOwnedStateDirectory() { return taskOwnedStateDirectory; } public Path getMetadataFilePath() { return metadataFilePath; } // ------------------------------------------------------------------------ // checkpoint metadata // ------------------------------------------------------------------------ @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory); } @Override public void disposeOnFailure() throws IOException { // on a failure, no chunk in the checkpoint directory needs to be saved, so // we can drop it as a whole fileSystem.delete(checkpointDirectory, true); } @Override public CheckpointStorageLocationReference getLocationReference() { return reference; } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @Override public String toString() { return "FsCheckpointStorageLocation {" + "fileSystem=" + fileSystem + ", checkpointDirectory=" + checkpointDirectory + ", sharedStateDirectory=" + sharedStateDirectory + ", taskOwnedStateDirectory=" + taskOwnedStateDirectory + ", metadataFilePath=" + metadataFilePath + ", reference=" + reference + ", fileStateSizeThreshold=" + fileStateSizeThreshold + '}'; } @VisibleForTesting FileSystem getFileSystem() { return fileSystem; } }
- FsCheckpointStorageLocation实现了CheckpointStorageLocation接口的createMetadataOutputStream、disposeOnFailure、getLocationReference方法
- createMetadataOutputStream方法创建的是FsCheckpointMetadataOutputStream;disposeOnFailure方法直接执行fileSystem.delete(checkpointDirectory, true)删除文件;getLocationReference方法返回的是CheckpointStorageLocationReference
- FsCheckpointStorageLocation继承了FsCheckpointStreamFactory,因此拥有了createCheckpointStateOutputStream方法
FsCheckpointMetadataOutputStream
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
/** * A {@link CheckpointMetadataOutputStream} that writes a specified file and directory, and * returns a {@link FsCompletedCheckpointStorageLocation} upon closing. */ public final class FsCheckpointMetadataOutputStream extends CheckpointMetadataOutputStream { private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointMetadataOutputStream.class); // ------------------------------------------------------------------------ private final FSDataOutputStream out; private final Path metadataFilePath; private final Path exclusiveCheckpointDir; private final FileSystem fileSystem; private volatile boolean closed; public FsCheckpointMetadataOutputStream( FileSystem fileSystem, Path metadataFilePath, Path exclusiveCheckpointDir) throws IOException { this.fileSystem = checkNotNull(fileSystem); this.metadataFilePath = checkNotNull(metadataFilePath); this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir); this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE); } // ------------------------------------------------------------------------ // I/O // ------------------------------------------------------------------------ @Override public final void write(int b) throws IOException { out.write(b); } @Override public final void write(@Nonnull byte[] b, int off, int len) throws IOException { out.write(b, off, len); } @Override public long getPos() throws IOException { return out.getPos(); } @Override public void flush() throws IOException { out.flush(); } @Override public void sync() throws IOException { out.sync(); } // ------------------------------------------------------------------------ // Closing // ------------------------------------------------------------------------ public boolean isClosed() { return closed; } @Override public void close() { if (!closed) { closed = true; try { out.close(); fileSystem.delete(metadataFilePath, false); } catch (Throwable t) { LOG.warn("Could not close the state stream for {}.", metadataFilePath, t); } } } @Override public FsCompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { synchronized (this) { if (!closed) { try { // make a best effort attempt to figure out the size long size = 0; try { size = out.getPos(); } catch (Exception ignored) {} out.close(); FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size); return new FsCompletedCheckpointStorageLocation( fileSystem, exclusiveCheckpointDir, metaDataHandle, metaDataHandle.getFilePath().getParent().toString()); } catch (Exception e) { try { fileSystem.delete(metadataFilePath, false); } catch (Exception deleteException) { LOG.warn("Could not delete the checkpoint stream file {}.", metadataFilePath, deleteException); } throw new IOException("Could not flush and close the file system " + "output stream to " + metadataFilePath + " in order to obtain the " + "stream state handle", e); } finally { closed = true; } } else { throw new IOException("Stream has already been closed and discarded."); } } } }
- FsCheckpointMetadataOutputStream继承了CheckpointMetadataOutputStream,而CheckpointMetadataOutputStream继承了FSDataOutputStream;这里的closeAndFinalizeCheckpoint方法返回的是FsCompletedCheckpointStorageLocation
小结
- FsCheckpointStorage的initializeLocationForCheckpoint方法、resolveCheckpointStorageLocation方法、createSavepointLocation方法创建的是FsCheckpointStorageLocation;而createTaskOwnedStateStream方法创建的是FsCheckpointStateOutputStream
- FsCheckpointStorageLocation继承了FsCheckpointStreamFactory,同时实现了CheckpointStorageLocation接口的createMetadataOutputStream、disposeOnFailure、getLocationReference方法;createMetadataOutputStream方法创建的是FsCheckpointMetadataOutputStream(
FsCheckpointMetadataOutputStream继承了CheckpointMetadataOutputStream,而CheckpointMetadataOutputStream继承了FSDataOutputStream;这里的closeAndFinalizeCheckpoint方法返回的是FsCompletedCheckpointStorageLocation
);disposeOnFailure方法直接执行fileSystem.delete(checkpointDirectory, true)删除文件;getLocationReference方法返回的是CheckpointStorageLocationReference - FsCheckpointStreamFactory实现了CheckpointStreamFactory接口,这里createCheckpointStateOutputStream方法返回FsCheckpointStateOutputStream;FsCheckpointStateOutputStream继承了CheckpointStreamFactory.CheckpointStateOutputStream;它的构造器要指定basePath、fs、bufferSize、localStateThreshold这几个参数,closeAndGetHandle方法对于pos小于等于localStateThreshold的返回ByteStreamStateHandle,大于该阈值的则返回FileStateHandle
doc
- The FsStateBackend
- 聊聊flink的MemCheckpointStreamFactory