聊聊flink的slot.request.timeout配置
序
本文主要研究一下flink的slot.request.timeout配置
JobManagerOptions
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving public class JobManagerOptions { //...... /** * The timeout in milliseconds for requesting a slot from Slot Pool. */ public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = key("slot.request.timeout") .defaultValue(5L * 60L * 1000L) .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool."); //...... }
- slot.request.timeout默认为5分钟
SlotManagerConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
public class SlotManagerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class); private final Time taskManagerRequestTimeout; private final Time slotRequestTimeout; private final Time taskManagerTimeout; public SlotManagerConfiguration( Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout) { this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); } public Time getTaskManagerRequestTimeout() { return taskManagerRequestTimeout; } public Time getSlotRequestTimeout() { return slotRequestTimeout; } public Time getTaskManagerTimeout() { return taskManagerTimeout; } public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); final Time rpcTimeout; try { rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + "value " + AkkaOptions.ASK_TIMEOUT + '.', e); } final Time slotRequestTimeout = getSlotRequestTimeout(configuration); final Time taskManagerTimeout = Time.milliseconds( configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout); } private static Time getSlotRequestTimeout(final Configuration configuration) { final long slotRequestTimeoutMs; if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) { LOGGER.warn("Config key {} is deprecated; use {} instead.", ResourceManagerOptions.SLOT_REQUEST_TIMEOUT, JobManagerOptions.SLOT_REQUEST_TIMEOUT); slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT); } else { slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT); } return Time.milliseconds(slotRequestTimeoutMs); } }
- SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT
SlotManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
public class SlotManager implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); /** Scheduled executor for timeouts. */ private final ScheduledExecutor scheduledExecutor; /** Timeout for slot requests to the task manager. */ private final Time taskManagerRequestTimeout; /** Timeout after which an allocation is discarded. */ private final Time slotRequestTimeout; /** Timeout after which an unused TaskManager is released. */ private final Time taskManagerTimeout; /** Map for all registered slots. */ private final HashMap<SlotID, TaskManagerSlot> slots; /** Index of all currently free slots. */ private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; /** All currently registered task managers. */ private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; /** Map of fulfilled and active allocations for request deduplication purposes. */ private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; /** Map of pending/unfulfilled slot allocation requests. */ private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots; /** ResourceManager's id. */ private ResourceManagerId resourceManagerId; /** Executor for future callbacks which have to be "synchronized". */ private Executor mainThreadExecutor; /** Callbacks for resource (de-)allocations. */ private ResourceActions resourceActions; private ScheduledFuture<?> taskManagerTimeoutCheck; private ScheduledFuture<?> slotRequestTimeoutCheck; /** True iff the component has been started. */ private boolean started; public SlotManager( ScheduledExecutor scheduledExecutor, Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout) { this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); slots = new HashMap<>(16); freeSlots = new LinkedHashMap<>(16); taskManagerRegistrations = new HashMap<>(4); fulfilledSlotRequests = new HashMap<>(16); pendingSlotRequests = new HashMap<>(16); pendingSlots = new HashMap<>(16); resourceManagerId = null; resourceActions = null; mainThreadExecutor = null; taskManagerTimeoutCheck = null; slotRequestTimeoutCheck = null; started = false; } public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info("Starting the SlotManager."); this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); started = true; taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkTaskManagerTimeouts()), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** * Suspends the component. This clears the internal state of the slot manager. */ public void suspend() { LOG.info("Suspending the SlotManager."); // stop the timeout checks for the TaskManagers and the SlotRequests if (taskManagerTimeoutCheck != null) { taskManagerTimeoutCheck.cancel(false); taskManagerTimeoutCheck = null; } if (slotRequestTimeoutCheck != null) { slotRequestTimeoutCheck.cancel(false); slotRequestTimeoutCheck = null; } for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { cancelPendingSlotRequest(pendingSlotRequest); } pendingSlotRequests.clear(); ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet()); for (InstanceID registeredTaskManager : registeredTaskManagers) { unregisterTaskManager(registeredTaskManager); } resourceManagerId = null; resourceActions = null; started = false; } public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { checkInit(); if (checkDuplicateRequest(slotRequest.getAllocationId())) { LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); return false; } else { PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); try { internalRequestSlot(pendingSlotRequest); } catch (ResourceManagerException e) { // requesting the slot failed --> remove pending slot request pendingSlotRequests.remove(slotRequest.getAllocationId()); throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); } return true; } } private void checkSlotRequestTimeouts() { if (!pendingSlotRequests.isEmpty()) { long currentTime = System.currentTimeMillis(); Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); while (slotRequestIterator.hasNext()) { PendingSlotRequest slotRequest = slotRequestIterator.next().getValue(); if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) { slotRequestIterator.remove(); if (slotRequest.isAssigned()) { cancelPendingSlotRequest(slotRequest); } resourceActions.notifyAllocationFailure( slotRequest.getJobId(), slotRequest.getAllocationId(), new TimeoutException("The allocation could not be fulfilled in time.")); } } } } //...... }
- SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
- registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException
- checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure
小结
- SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT;slot.request.timeout默认为5分钟
- SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
- registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException;checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11