聊聊flink的AbstractNonHaServices
序
本文主要研究一下flink的AbstractNonHaServices
HighAvailabilityServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
public interface HighAvailabilityServices extends AutoCloseable { // ------------------------------------------------------------------------ // Constants // ------------------------------------------------------------------------ /** * This UUID should be used when no proper leader election happens, but a simple * pre-configured leader is used. That is for example the case in non-highly-available * standalone setups. */ UUID DEFAULT_LEADER_ID = new UUID(0, 0); /** * This JobID should be used to identify the old JobManager when using the * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a * distinct JobID assigned. */ JobID DEFAULT_JOB_ID = new JobID(0L, 0L); // ------------------------------------------------------------------------ // Services // ------------------------------------------------------------------------ /** * Gets the leader retriever for the cluster's resource manager. */ LeaderRetrievalService getResourceManagerLeaderRetriever(); /** * Gets the leader retriever for the dispatcher. This leader retrieval service * is not always accessible. */ LeaderRetrievalService getDispatcherLeaderRetriever(); /** * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. * @return Leader retrieval service to retrieve the job manager for the given job * @deprecated This method should only be used by the legacy code where the JobManager acts as the master. */ @Deprecated LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID); /** * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. * @param defaultJobManagerAddress JobManager address which will be returned by * a static leader retrieval service. * @return Leader retrieval service to retrieve the job manager for the given job */ LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress); LeaderRetrievalService getWebMonitorLeaderRetriever(); /** * Gets the leader election service for the cluster's resource manager. * * @return Leader election service for the resource manager leader election */ LeaderElectionService getResourceManagerLeaderElectionService(); /** * Gets the leader election service for the cluster's dispatcher. * * @return Leader election service for the dispatcher leader election */ LeaderElectionService getDispatcherLeaderElectionService(); /** * Gets the leader election service for the given job. * * @param jobID The identifier of the job running the election. * @return Leader election service for the job manager leader election */ LeaderElectionService getJobManagerLeaderElectionService(JobID jobID); LeaderElectionService getWebMonitorLeaderElectionService(); /** * Gets the checkpoint recovery factory for the job manager * * @return Checkpoint recovery factory */ CheckpointRecoveryFactory getCheckpointRecoveryFactory(); /** * Gets the submitted job graph store for the job manager * * @return Submitted job graph store * @throws Exception if the submitted job graph store could not be created */ SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; /** * Gets the registry that holds information about whether jobs are currently running. * * @return Running job registry to retrieve running jobs */ RunningJobsRegistry getRunningJobsRegistry() throws Exception; /** * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. * * @return Blob store * @throws IOException if the blob store could not be created */ BlobStore createBlobStore() throws IOException; // ------------------------------------------------------------------------ // Shutdown and Cleanup // ------------------------------------------------------------------------ /** * Closes the high availability services, releasing all resources. * * <p>This method <b>does not delete or clean up</b> any data stored in external stores * (file systems, ZooKeeper, etc). Another instance of the high availability * services will be able to recover the job. * * <p>If an exception occurs during closing services, this method will attempt to * continue closing other services and report exceptions only after all services * have been attempted to be closed. * * @throws Exception Thrown, if an exception occurred while closing these services. */ @Override void close() throws Exception; /** * Closes the high availability services (releasing all resources) and deletes * all data stored by these services in external stores. * * <p>After this method was called, the any job or session that was managed by * these high availability services will be unrecoverable. * * <p>If an exception occurs during cleanup, this method will attempt to * continue the cleanup and report exceptions only after all cleanup steps have * been attempted. * * @throws Exception Thrown, if an exception occurred while closing these services * or cleaning up data stored by them. */ void closeAndCleanupAllData() throws Exception; }
- HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
AbstractNonHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
public abstract class AbstractNonHaServices implements HighAvailabilityServices { protected final Object lock = new Object(); private final RunningJobsRegistry runningJobsRegistry; private final VoidBlobStore voidBlobStore; private boolean shutdown; public AbstractNonHaServices() { this.runningJobsRegistry = new StandaloneRunningJobsRegistry(); this.voidBlobStore = new VoidBlobStore(); shutdown = false; } // ---------------------------------------------------------------------- // HighAvailabilityServices method implementations // ---------------------------------------------------------------------- @Override public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { synchronized (lock) { checkNotShutdown(); return new StandaloneCheckpointRecoveryFactory(); } } @Override public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { synchronized (lock) { checkNotShutdown(); return new StandaloneSubmittedJobGraphStore(); } } @Override public RunningJobsRegistry getRunningJobsRegistry() throws Exception { synchronized (lock) { checkNotShutdown(); return runningJobsRegistry; } } @Override public BlobStore createBlobStore() throws IOException { synchronized (lock) { checkNotShutdown(); return voidBlobStore; } } @Override public void close() throws Exception { synchronized (lock) { if (!shutdown) { shutdown = true; } } } @Override public void closeAndCleanupAllData() throws Exception { // this stores no data, so this method is the same as 'close()' close(); } // ---------------------------------------------------------------------- // Helper methods // ---------------------------------------------------------------------- @GuardedBy("lock") protected void checkNotShutdown() { checkState(!shutdown, "high availability services are shut down"); } protected boolean isShutDown() { return shutdown; } }
- AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
EmbeddedHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
public class EmbeddedHaServices extends AbstractNonHaServices { private final Executor executor; private final EmbeddedLeaderService resourceManagerLeaderService; private final EmbeddedLeaderService dispatcherLeaderService; private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices; private final EmbeddedLeaderService webMonitorLeaderService; public EmbeddedHaServices(Executor executor) { this.executor = Preconditions.checkNotNull(executor); this.resourceManagerLeaderService = new EmbeddedLeaderService(executor); this.dispatcherLeaderService = new EmbeddedLeaderService(executor); this.jobManagerLeaderServices = new HashMap<>(); this.webMonitorLeaderService = new EmbeddedLeaderService(executor); } // ------------------------------------------------------------------------ // services // ------------------------------------------------------------------------ @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { return resourceManagerLeaderService.createLeaderRetrievalService(); } @Override public LeaderRetrievalService getDispatcherLeaderRetriever() { return dispatcherLeaderService.createLeaderRetrievalService(); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { return resourceManagerLeaderService.createLeaderElectionService(); } @Override public LeaderElectionService getDispatcherLeaderElectionService() { return dispatcherLeaderService.createLeaderElectionService(); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { checkNotNull(jobID); synchronized (lock) { checkNotShutdown(); EmbeddedLeaderService service = getOrCreateJobManagerService(jobID); return service.createLeaderRetrievalService(); } } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) { return getJobManagerLeaderRetriever(jobID); } @Override public LeaderRetrievalService getWebMonitorLeaderRetriever() { return webMonitorLeaderService.createLeaderRetrievalService(); } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { checkNotNull(jobID); synchronized (lock) { checkNotShutdown(); EmbeddedLeaderService service = getOrCreateJobManagerService(jobID); return service.createLeaderElectionService(); } } @Override public LeaderElectionService getWebMonitorLeaderElectionService() { return webMonitorLeaderService.createLeaderElectionService(); } // ------------------------------------------------------------------------ // internal // ------------------------------------------------------------------------ @GuardedBy("lock") private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) { EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID); if (service == null) { service = new EmbeddedLeaderService(executor); jobManagerLeaderServices.put(jobID, service); } return service; } // ------------------------------------------------------------------------ // shutdown // ------------------------------------------------------------------------ @Override public void close() throws Exception { synchronized (lock) { if (!isShutDown()) { // stop all job manager leader services for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) { service.shutdown(); } jobManagerLeaderServices.clear(); resourceManagerLeaderService.shutdown(); webMonitorLeaderService.shutdown(); } super.close(); } } }
- EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices
StandaloneHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
public class StandaloneHaServices extends AbstractNonHaServices { /** The constant name of the ResourceManager RPC endpoint */ private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager"; /** The fix address of the ResourceManager */ private final String resourceManagerAddress; /** The fix address of the Dispatcher */ private final String dispatcherAddress; /** The fix address of the JobManager */ private final String jobManagerAddress; private final String webMonitorAddress; /** * Creates a new services class for the fix pre-defined leaders. * * @param resourceManagerAddress The fix address of the ResourceManager * @param webMonitorAddress */ public StandaloneHaServices( String resourceManagerAddress, String dispatcherAddress, String jobManagerAddress, String webMonitorAddress) { this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress"); this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress"); this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress"); this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress); } // ------------------------------------------------------------------------ // Services // ------------------------------------------------------------------------ @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID); } } @Override public LeaderRetrievalService getDispatcherLeaderRetriever() { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID); } } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderElectionService(); } } @Override public LeaderElectionService getDispatcherLeaderElectionService() { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderElectionService(); } } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID); } } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID); } } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderElectionService(); } } @Override public LeaderRetrievalService getWebMonitorLeaderRetriever() { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID); } } @Override public LeaderElectionService getWebMonitorLeaderElectionService() { synchronized (lock) { checkNotShutdown(); return new StandaloneLeaderElectionService(); } } }
- StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices
小结
- HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
- AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
- EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11