聊聊flink的AbstractNonHaServices

本文主要研究一下flink的AbstractNonHaServices

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

public interface HighAvailabilityServices extends AutoCloseable {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /**
     * This UUID should be used when no proper leader election happens, but a simple
     * pre-configured leader is used. That is for example the case in non-highly-available
     * standalone setups.
     */
    UUID DEFAULT_LEADER_ID = new UUID(0, 0);

    /**
     * This JobID should be used to identify the old JobManager when using the
     * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
     * distinct JobID assigned.
     */
    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    /**
     * Gets the leader retriever for the cluster's resource manager.
     */
    LeaderRetrievalService getResourceManagerLeaderRetriever();

    /**
     * Gets the leader retriever for the dispatcher. This leader retrieval service
     * is not always accessible.
     */
    LeaderRetrievalService getDispatcherLeaderRetriever();

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @return Leader retrieval service to retrieve the job manager for the given job
     * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
     */
    @Deprecated
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @param defaultJobManagerAddress JobManager address which will be returned by
     *                              a static leader retrieval service.
     * @return Leader retrieval service to retrieve the job manager for the given job
     */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

    LeaderRetrievalService getWebMonitorLeaderRetriever();

    /**
     * Gets the leader election service for the cluster's resource manager.
     *
     * @return Leader election service for the resource manager leader election
     */
    LeaderElectionService getResourceManagerLeaderElectionService();

    /**
     * Gets the leader election service for the cluster's dispatcher.
     *
     * @return Leader election service for the dispatcher leader election
     */
    LeaderElectionService getDispatcherLeaderElectionService();

    /**
     * Gets the leader election service for the given job.
     *
     * @param jobID The identifier of the job running the election.
     * @return Leader election service for the job manager leader election
     */
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

    LeaderElectionService getWebMonitorLeaderElectionService();

    /**
     * Gets the checkpoint recovery factory for the job manager
     *
     * @return Checkpoint recovery factory
     */
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();

    /**
     * Gets the submitted job graph store for the job manager
     *
     * @return Submitted job graph store
     * @throws Exception if the submitted job graph store could not be created
     */
    SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;

    /**
     * Gets the registry that holds information about whether jobs are currently running.
     *
     * @return Running job registry to retrieve running jobs
     */
    RunningJobsRegistry getRunningJobsRegistry() throws Exception;

    /**
     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
     *
     * @return Blob store
     * @throws IOException if the blob store could not be created
     */
    BlobStore createBlobStore() throws IOException;

    // ------------------------------------------------------------------------
    //  Shutdown and Cleanup
    // ------------------------------------------------------------------------

    /**
     * Closes the high availability services, releasing all resources.
     * 
     * <p>This method <b>does not delete or clean up</b> any data stored in external stores
     * (file systems, ZooKeeper, etc). Another instance of the high availability
     * services will be able to recover the job.
     * 
     * <p>If an exception occurs during closing services, this method will attempt to
     * continue closing other services and report exceptions only after all services
     * have been attempted to be closed.
     *
     * @throws Exception Thrown, if an exception occurred while closing these services.
     */
    @Override
    void close() throws Exception;

    /**
     * Closes the high availability services (releasing all resources) and deletes
     * all data stored by these services in external stores.
     * 
     * <p>After this method was called, the any job or session that was managed by
     * these high availability services will be unrecoverable.
     * 
     * <p>If an exception occurs during cleanup, this method will attempt to
     * continue the cleanup and report exceptions only after all cleanup steps have
     * been attempted.
     * 
     * @throws Exception Thrown, if an exception occurred while closing these services
     *                   or cleaning up data stored by them.
     */
    void closeAndCleanupAllData() throws Exception;
}
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices

AbstractNonHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java

public abstract class AbstractNonHaServices implements HighAvailabilityServices {
    protected final Object lock = new Object();

    private final RunningJobsRegistry runningJobsRegistry;

    private final VoidBlobStore voidBlobStore;

    private boolean shutdown;

    public AbstractNonHaServices() {
        this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
        this.voidBlobStore = new VoidBlobStore();

        shutdown = false;
    }

    // ----------------------------------------------------------------------
    // HighAvailabilityServices method implementations
    // ----------------------------------------------------------------------

    @Override
    public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneCheckpointRecoveryFactory();
        }
    }

    @Override
    public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneSubmittedJobGraphStore();
        }
    }

    @Override
    public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
        synchronized (lock) {
            checkNotShutdown();

            return runningJobsRegistry;
        }
    }

    @Override
    public BlobStore createBlobStore() throws IOException {
        synchronized (lock) {
            checkNotShutdown();

            return voidBlobStore;
        }
    }

    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!shutdown) {
                shutdown = true;
            }
        }
    }

    @Override
    public void closeAndCleanupAllData() throws Exception {
        // this stores no data, so this method is the same as 'close()'
        close();
    }

    // ----------------------------------------------------------------------
    // Helper methods
    // ----------------------------------------------------------------------

    @GuardedBy("lock")
    protected void checkNotShutdown() {
        checkState(!shutdown, "high availability services are shut down");
    }

    protected boolean isShutDown() {
        return shutdown;
    }
}
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices

EmbeddedHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java

public class EmbeddedHaServices extends AbstractNonHaServices {

    private final Executor executor;

    private final EmbeddedLeaderService resourceManagerLeaderService;

    private final EmbeddedLeaderService dispatcherLeaderService;

    private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;

    private final EmbeddedLeaderService webMonitorLeaderService;

    public EmbeddedHaServices(Executor executor) {
        this.executor = Preconditions.checkNotNull(executor);
        this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
        this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
        this.jobManagerLeaderServices = new HashMap<>();
        this.webMonitorLeaderService = new EmbeddedLeaderService(executor);
    }

    // ------------------------------------------------------------------------
    //  services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        return resourceManagerLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        return dispatcherLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        return resourceManagerLeaderService.createLeaderElectionService();
    }

    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        return dispatcherLeaderService.createLeaderElectionService();
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        checkNotNull(jobID);

        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderRetrievalService();
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        return getJobManagerLeaderRetriever(jobID);
    }

    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        return webMonitorLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        checkNotNull(jobID);

        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderElectionService();
        }
    }

    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        return webMonitorLeaderService.createLeaderElectionService();
    }

    // ------------------------------------------------------------------------
    // internal
    // ------------------------------------------------------------------------

    @GuardedBy("lock")
    private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
        EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
        if (service == null) {
            service = new EmbeddedLeaderService(executor);
            jobManagerLeaderServices.put(jobID, service);
        }
        return service;
    }

    // ------------------------------------------------------------------------
    //  shutdown
    // ------------------------------------------------------------------------

    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!isShutDown()) {
                // stop all job manager leader services
                for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
                    service.shutdown();
                }
                jobManagerLeaderServices.clear();

                resourceManagerLeaderService.shutdown();

                webMonitorLeaderService.shutdown();
            }

            super.close();
        }
    }
}
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices

StandaloneHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java

public class StandaloneHaServices extends AbstractNonHaServices {

    /** The constant name of the ResourceManager RPC endpoint */
    private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";

    /** The fix address of the ResourceManager */
    private final String resourceManagerAddress;

    /** The fix address of the Dispatcher */
    private final String dispatcherAddress;

    /** The fix address of the JobManager */
    private final String jobManagerAddress;

    private final String webMonitorAddress;

    /**
     * Creates a new services class for the fix pre-defined leaders.
     *
     * @param resourceManagerAddress    The fix address of the ResourceManager
     * @param webMonitorAddress
     */
    public StandaloneHaServices(
            String resourceManagerAddress,
            String dispatcherAddress,
            String jobManagerAddress,
            String webMonitorAddress) {
        this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
        this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
        this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
        this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);
    }

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
        }

    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

}
  • StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

小结

  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

doc

相关推荐