聊聊flink的KvStateRegistryGateway
序
本文主要研究一下flink的KvStateRegistryGateway
KvStateRegistryGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
public interface KvStateRegistryGateway { /** * Notifies that queryable state has been registered. * * @param jobId identifying the job for which to register a key value state * @param jobVertexId JobVertexID the KvState instance belongs to. * @param keyGroupRange Key group range the KvState instance belongs to. * @param registrationName Name under which the KvState has been registered. * @param kvStateId ID of the registered KvState instance. * @param kvStateServerAddress Server address where to find the KvState instance. * @return Future acknowledge if the key-value state has been registered */ CompletableFuture<Acknowledge> notifyKvStateRegistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress); /** * Notifies that queryable state has been unregistered. * * @param jobId identifying the job for which to unregister a key value state * @param jobVertexId JobVertexID the KvState instance belongs to. * @param keyGroupRange Key group index the KvState instance belongs to. * @param registrationName Name under which the KvState has been registered. * @return Future acknowledge if the key-value state has been unregistered */ CompletableFuture<Acknowledge> notifyKvStateUnregistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName); }
- KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
JobMaster
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway { /** Default names for Flink's distributed components. */ public static final String JOB_MANAGER_NAME = "jobmanager"; public static final String ARCHIVE_NAME = "archive"; // ------------------------------------------------------------------------ private final JobMasterConfiguration jobMasterConfiguration; private final ResourceID resourceId; private final JobGraph jobGraph; private final Time rpcTimeout; private final HighAvailabilityServices highAvailabilityServices; private final BlobServer blobServer; private final JobManagerJobMetricGroupFactory jobMetricGroupFactory; private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager; private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; private final ScheduledExecutorService scheduledExecutorService; private final OnCompletionActions jobCompletionActions; private final FatalErrorHandler fatalErrorHandler; private final ClassLoader userCodeLoader; private final SlotPool slotPool; private final SlotPoolGateway slotPoolGateway; private final RestartStrategy restartStrategy; // --------- BackPressure -------- private final BackPressureStatsTracker backPressureStatsTracker; // --------- ResourceManager -------- private final LeaderRetrievalService resourceManagerLeaderRetriever; // --------- TaskManagers -------- private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers; // -------- Mutable fields --------- private ExecutionGraph executionGraph; @Nullable private JobManagerJobStatusListener jobStatusListener; @Nullable private JobManagerJobMetricGroup jobManagerJobMetricGroup; @Nullable private String lastInternalSavepoint; @Nullable private ResourceManagerAddress resourceManagerAddress; @Nullable private ResourceManagerConnection resourceManagerConnection; @Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection; //...... @Override public CompletableFuture<Acknowledge> notifyKvStateRegistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) { if (jobGraph.getJobID().equals(jobId)) { if (log.isDebugEnabled()) { log.debug("Key value state registered for job {} under name {}.", jobGraph.getJobID(), registrationName); } try { executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Exception e) { log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); return FutureUtils.completedExceptionally(e); } } else { if (log.isDebugEnabled()) { log.debug("Notification about key-value state registration for unknown job {} received.", jobId); } return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } } @Override public CompletableFuture<Acknowledge> notifyKvStateUnregistered( JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) { if (jobGraph.getJobID().equals(jobId)) { if (log.isDebugEnabled()) { log.debug("Key value state unregistered for job {} under name {}.", jobGraph.getJobID(), registrationName); } try { executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( jobVertexId, keyGroupRange, registrationName); return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Exception e) { log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e); return FutureUtils.completedExceptionally(e); } } else { if (log.isDebugEnabled()) { log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId); } return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } } //...... }
- JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
KvStateLocationRegistry
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
public class KvStateLocationRegistry { /** JobID this coordinator belongs to. */ private final JobID jobId; /** Job vertices for determining parallelism per key. */ private final Map<JobVertexID, ExecutionJobVertex> jobVertices; /** * Location info keyed by registration name. The name needs to be unique * per JobID, i.e. two operators cannot register KvState with the same * name. */ private final Map<String, KvStateLocation> lookupTable = new HashMap<>(); /** * Creates the registry for the job. * * @param jobId JobID this coordinator belongs to. * @param jobVertices Job vertices map of all vertices of this job. */ public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) { this.jobId = Preconditions.checkNotNull(jobId, "JobID"); this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices"); } /** * Returns the {@link KvStateLocation} for the registered KvState instance * or <code>null</code> if no location information is available. * * @param registrationName Name under which the KvState instance is registered. * @return Location information or <code>null</code>. */ public KvStateLocation getKvStateLocation(String registrationName) { return lookupTable.get(registrationName); } /** * Notifies the registry about a registered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @param kvStateId ID of the registered KvState instance * @param kvStateServerAddress Server address where to find the KvState instance * * @throws IllegalArgumentException If JobVertexID does not belong to job * @throws IllegalArgumentException If state has been registered with same * name by another operator. * @throws IndexOutOfBoundsException If key group index is out of bounds. */ public void notifyKvStateRegistered( JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) { KvStateLocation location = lookupTable.get(registrationName); if (location == null) { // First registration for this operator, create the location info ExecutionJobVertex vertex = jobVertices.get(jobVertexId); if (vertex != null) { int parallelism = vertex.getMaxParallelism(); location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName); lookupTable.put(registrationName, location); } else { throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId); } } // Duplicated name if vertex IDs don't match if (!location.getJobVertexId().equals(jobVertexId)) { IllegalStateException duplicate = new IllegalStateException( "Registration name clash. KvState with name '" + registrationName + "' has already been registered by another operator (" + location.getJobVertexId() + ")."); ExecutionJobVertex vertex = jobVertices.get(jobVertexId); if (vertex != null) { vertex.fail(new SuppressRestartsException(duplicate)); } throw duplicate; } location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress); } /** * Notifies the registry about an unregistered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to * @param keyGroupRange Key group index the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @throws IllegalArgumentException If another operator registered the state instance * @throws IllegalArgumentException If the registration name is not known */ public void notifyKvStateUnregistered( JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) { KvStateLocation location = lookupTable.get(registrationName); if (location != null) { // Duplicate name if vertex IDs don't match if (!location.getJobVertexId().equals(jobVertexId)) { throw new IllegalArgumentException("Another operator (" + location.getJobVertexId() + ") registered the KvState " + "under '" + registrationName + "'."); } location.unregisterKvState(keyGroupRange); if (location.getNumRegisteredKeyGroups() == 0) { lookupTable.remove(registrationName); } } else { throw new IllegalArgumentException("Unknown registration name '" + registrationName + "'. " + "Probably registration/unregistration race."); } } }
- KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系
- notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法
- notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除
小结
- KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
- JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
- KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系;notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11