聊聊flink的RestartStrategies
序
本文主要研究一下flink的RestartStrategies
RestartStrategies
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@PublicEvolving public class RestartStrategies { /** * Generates NoRestartStrategyConfiguration. * * @return NoRestartStrategyConfiguration */ public static RestartStrategyConfiguration noRestart() { return new NoRestartStrategyConfiguration(); } public static RestartStrategyConfiguration fallBackRestart() { return new FallbackRestartStrategyConfiguration(); } /** * Generates a FixedDelayRestartStrategyConfiguration. * * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy * @return FixedDelayRestartStrategy */ public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) { return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS)); } /** * Generates a FixedDelayRestartStrategyConfiguration. * * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy * @return FixedDelayRestartStrategy */ public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) { return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval); } /** * Generates a FailureRateRestartStrategyConfiguration. * * @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job * @param failureInterval Time interval for failures * @param delayInterval Delay in-between restart attempts */ public static FailureRateRestartStrategyConfiguration failureRateRestart( int failureRate, Time failureInterval, Time delayInterval) { return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval); } //...... }
- RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration
RestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public abstract static class RestartStrategyConfiguration implements Serializable { private static final long serialVersionUID = 6285853591578313960L; private RestartStrategyConfiguration() {} /** * Returns a description which is shown in the web interface. * * @return Description of the restart strategy */ public abstract String getDescription(); }
- RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类
NoRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration { private static final long serialVersionUID = -5894362702943349962L; @Override public String getDescription() { return "Restart deactivated."; } @Override public boolean equals(Object o) { if (this == o) { return true; } return o instanceof NoRestartStrategyConfiguration; } @Override public int hashCode() { return Objects.hash(); } }
- NoRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表no restart strategy
FixedDelayRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration { private static final long serialVersionUID = 4149870149673363190L; private final int restartAttempts; private final Time delayBetweenAttemptsInterval; FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) { this.restartAttempts = restartAttempts; this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval; } public int getRestartAttempts() { return restartAttempts; } public Time getDelayBetweenAttemptsInterval() { return delayBetweenAttemptsInterval; } @Override public int hashCode() { int result = restartAttempts; result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0); return result; } @Override public boolean equals(Object obj) { if (obj instanceof FixedDelayRestartStrategyConfiguration) { FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj; return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval); } else { return false; } } @Override public String getDescription() { return "Restart with fixed delay (" + delayBetweenAttemptsInterval + "). #" + restartAttempts + " restart attempts."; } }
- FixedDelayRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表fixed delay restart strategy,它有restartAttempts及delayBetweenAttemptsInterval两个属性
FailureRateRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration { private static final long serialVersionUID = 1195028697539661739L; private final int maxFailureRate; private final Time failureInterval; private final Time delayBetweenAttemptsInterval; public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) { this.maxFailureRate = maxFailureRate; this.failureInterval = failureInterval; this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval; } public int getMaxFailureRate() { return maxFailureRate; } public Time getFailureInterval() { return failureInterval; } public Time getDelayBetweenAttemptsInterval() { return delayBetweenAttemptsInterval; } @Override public String getDescription() { return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString() + " and fixed delay " + delayBetweenAttemptsInterval.toString(); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o; return maxFailureRate == that.maxFailureRate && Objects.equals(failureInterval, that.failureInterval) && Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval); } @Override public int hashCode() { return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval); } }
- FailureRateRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表failure rate restart strategy,它有maxFailureRate、failureInterval、delayBetweenAttemptsInterval三个属性
FallbackRestartStrategyConfiguration
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration { private static final long serialVersionUID = -4441787204284085544L; @Override public String getDescription() { return "Cluster level default restart strategy"; } @Override public boolean equals(Object o) { if (this == o) { return true; } return o instanceof FallbackRestartStrategyConfiguration; } @Override public int hashCode() { return Objects.hash(); } }
- FallbackRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表Cluster level default restart strategy
RestartStrategyResolving
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
public final class RestartStrategyResolving { /** * Resolves which {@link RestartStrategy} to use. It should be used only on the server side. * The resolving strategy is as follows: * <ol> * <li>Strategy set within job graph.</li> * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing * is enabled.</li> * <li>If no strategy was set on client and server side and checkpointing was enabled then * {@link FixedDelayRestartStrategy} is used</li> * </ol> * * @param clientConfiguration restart configuration given within the job graph * @param serverStrategyFactory default server side strategy factory * @param isCheckpointingEnabled if checkpointing was enabled for the job * @return resolved strategy */ public static RestartStrategy resolve( RestartStrategies.RestartStrategyConfiguration clientConfiguration, RestartStrategyFactory serverStrategyFactory, boolean isCheckpointingEnabled) { final RestartStrategy clientSideRestartStrategy = RestartStrategyFactory.createRestartStrategy(clientConfiguration); if (clientSideRestartStrategy != null) { return clientSideRestartStrategy; } else { if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) { return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory) .createRestartStrategy(isCheckpointingEnabled); } else { return serverStrategyFactory.createRestartStrategy(); } } } private RestartStrategyResolving() { } }
- RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy
RestartStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
public interface RestartStrategy { /** * True if the restart strategy can be applied to restart the {@link ExecutionGraph}. * * @return true if restart is possible, otherwise false */ boolean canRestart(); /** * Called by the ExecutionGraph to eventually trigger a full recovery. * The recovery must be triggered on the given callback object, and may be delayed * with the help of the given scheduled executor. * * <p>The thread that calls this method is not supposed to block/sleep. * * @param restarter The hook to restart the ExecutionGraph * @param executor An scheduled executor to delay the restart */ void restart(RestartCallback restarter, ScheduledExecutor executor); }
- RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类
NoRestartStrategy
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
public class NoRestartStrategy implements RestartStrategy { @Override public boolean canRestart() { return false; } @Override public void restart(RestartCallback restarter, ScheduledExecutor executor) { throw new UnsupportedOperationException("NoRestartStrategy does not support restart."); } /** * Creates a NoRestartStrategyFactory instance. * * @param configuration Configuration object which is ignored * @return NoRestartStrategyFactory instance */ public static NoRestartStrategyFactory createFactory(Configuration configuration) { return new NoRestartStrategyFactory(); } @Override public String toString() { return "NoRestartStrategy"; } public static class NoRestartStrategyFactory extends RestartStrategyFactory { private static final long serialVersionUID = -1809462525812787862L; @Override public RestartStrategy createRestartStrategy() { return new NoRestartStrategy(); } } }
- NoRestartStrategy实现了RestartStrategy接口,它的canRestart方法返回false,restart方法抛出UnsupportedOperationException
FixedDelayRestartStrategy
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
public class FixedDelayRestartStrategy implements RestartStrategy { private final int maxNumberRestartAttempts; private final long delayBetweenRestartAttempts; private int currentRestartAttempt; public FixedDelayRestartStrategy( int maxNumberRestartAttempts, long delayBetweenRestartAttempts) { Preconditions.checkArgument(maxNumberRestartAttempts >= 0, "Maximum number of restart attempts must be positive."); Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive"); this.maxNumberRestartAttempts = maxNumberRestartAttempts; this.delayBetweenRestartAttempts = delayBetweenRestartAttempts; currentRestartAttempt = 0; } public int getCurrentRestartAttempt() { return currentRestartAttempt; } @Override public boolean canRestart() { return currentRestartAttempt < maxNumberRestartAttempts; } @Override public void restart(final RestartCallback restarter, ScheduledExecutor executor) { currentRestartAttempt++; executor.schedule(new Runnable() { @Override public void run() { restarter.triggerFullRecovery(); } }, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS); } /** * Creates a FixedDelayRestartStrategy from the given Configuration. * * @param configuration Configuration containing the parameter values for the restart strategy * @return Initialized instance of FixedDelayRestartStrategy * @throws Exception */ public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception { int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY); long delay; try { delay = Duration.apply(delayString).toMillis(); } catch (NumberFormatException nfe) { throw new Exception("Invalid config value for " + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString + ". Value must be a valid duration (such as '100 milli' or '10 s')"); } return new FixedDelayRestartStrategyFactory(maxAttempts, delay); } @Override public String toString() { return "FixedDelayRestartStrategy(" + "maxNumberRestartAttempts=" + maxNumberRestartAttempts + ", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts + ')'; } public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory { private static final long serialVersionUID = 6642934067762271950L; private final int maxAttempts; private final long delay; public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) { this.maxAttempts = maxAttempts; this.delay = delay; } @Override public RestartStrategy createRestartStrategy() { return new FixedDelayRestartStrategy(maxAttempts, delay); } } }
- FixedDelayRestartStrategy实现了RestartStrategy接口,它的canRestart方法依据currentRestartAttempt及maxNumberRestartAttempts来判断;restart方法则直接调用ScheduledExecutor.schedule方法,延时delayBetweenRestartAttempts毫秒执行RestartCallback.triggerFullRecovery()
FailureRateRestartStrategy
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
public class FailureRateRestartStrategy implements RestartStrategy { private final Time failuresInterval; private final Time delayInterval; private final int maxFailuresPerInterval; private final ArrayDeque<Long> restartTimestampsDeque; public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) { Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null."); Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null."); Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0."); Preconditions.checkArgument(failuresInterval.getSize() > 0, "Failures interval must be greater than 0 ms."); Preconditions.checkArgument(delayInterval.getSize() >= 0, "Delay interval must be at least 0 ms."); this.failuresInterval = failuresInterval; this.delayInterval = delayInterval; this.maxFailuresPerInterval = maxFailuresPerInterval; this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval); } @Override public boolean canRestart() { if (isRestartTimestampsQueueFull()) { Long now = System.currentTimeMillis(); Long earliestFailure = restartTimestampsDeque.peek(); return (now - earliestFailure) > failuresInterval.toMilliseconds(); } else { return true; } } @Override public void restart(final RestartCallback restarter, ScheduledExecutor executor) { if (isRestartTimestampsQueueFull()) { restartTimestampsDeque.remove(); } restartTimestampsDeque.add(System.currentTimeMillis()); executor.schedule(new Runnable() { @Override public void run() { restarter.triggerFullRecovery(); } }, delayInterval.getSize(), delayInterval.getUnit()); } private boolean isRestartTimestampsQueueFull() { return restartTimestampsDeque.size() >= maxFailuresPerInterval; } @Override public String toString() { return "FailureRateRestartStrategy(" + "failuresInterval=" + failuresInterval + "delayInterval=" + delayInterval + "maxFailuresPerInterval=" + maxFailuresPerInterval + ")"; } public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception { int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1); String failuresIntervalString = configuration.getString( ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString() ); String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL); String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString); Duration failuresInterval = Duration.apply(failuresIntervalString); Duration delay = Duration.apply(delayString); return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis())); } public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory { private static final long serialVersionUID = -373724639430960480L; private final int maxFailuresPerInterval; private final Time failuresInterval; private final Time delayInterval; public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) { this.maxFailuresPerInterval = maxFailuresPerInterval; this.failuresInterval = Preconditions.checkNotNull(failuresInterval); this.delayInterval = Preconditions.checkNotNull(delayInterval); } @Override public RestartStrategy createRestartStrategy() { return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval); } } }
- FailureRateRestartStrategy实现了RestartStrategy接口,它的canRestart方法在restartTimestampsDeque队列大小小于maxFailuresPerInterval时返回true,大于等于maxFailuresPerInterval时则判断当前时间距离earliestFailure是否大于failuresInterval;restart方法则往restartTimestampsDeque添加当前时间,然后调用ScheduledExecutor.schedule方法,延时delayInterval执行RestartCallback.triggerFullRecovery()
小结
- RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration
- RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类
- RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy;RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类