聊聊flink的ScheduledExecutor
序
本文主要研究一下flink的ScheduledExecutor
Executor
java.base/java/util/concurrent/Executor.java
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
- jdk的Executor接口定义了execute方法,接收参数类型为Runnable
ScheduledExecutor
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
public interface ScheduledExecutor extends Executor { /** * Executes the given command after the given delay. * * @param command the task to execute in the future * @param delay the time from now to delay the execution * @param unit the time unit of the delay parameter * @return a ScheduledFuture representing the completion of the scheduled task */ ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * Executes the given callable after the given delay. The result of the callable is returned * as a {@link ScheduledFuture}. * * @param callable the callable to execute * @param delay the time from now to delay the execution * @param unit the time unit of the delay parameter * @param <V> result type of the callable * @return a ScheduledFuture which holds the future value of the given callable */ <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * Executes the given command periodically. The first execution is started after the * {@code initialDelay}, the second execution is started after {@code initialDelay + period}, * the third after {@code initialDelay + 2*period} and so on. * The task is executed until either an execution fails, or the returned {@link ScheduledFuture} * is cancelled. * * @param command the task to be executed periodically * @param initialDelay the time from now until the first execution is triggered * @param period the time after which the next execution is triggered * @param unit the time unit of the delay and period parameter * @return a ScheduledFuture representing the periodic task. This future never completes * unless an execution of the given task fails or if the future is cancelled */ ScheduledFuture<?> scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit); /** * Executed the given command repeatedly with the given delay between the end of an execution * and the start of the next execution. * The task is executed repeatedly until either an exception occurs or if the returned * {@link ScheduledFuture} is cancelled. * * @param command the task to execute repeatedly * @param initialDelay the time from now until the first execution is triggered * @param delay the time between the end of the current and the start of the next execution * @param unit the time unit of the initial delay and the delay parameter * @return a ScheduledFuture representing the repeatedly executed task. This future never * completes unless the execution of the given task fails or if the future is cancelled */ ScheduledFuture<?> scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit); }
- ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter
ScheduledExecutorServiceAdapter
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
public class ScheduledExecutorServiceAdapter implements ScheduledExecutor { private final ScheduledExecutorService scheduledExecutorService; public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) { this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return scheduledExecutorService.schedule(command, delay, unit); } @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return scheduledExecutorService.schedule(callable, delay, unit); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit); } @Override public void execute(Runnable command) { scheduledExecutorService.execute(command); } }
- ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、
scheduleWithFixedDelay、execute方法
ActorSystemScheduledExecutorAdapter
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java
public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor { private final ActorSystem actorSystem; public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) { this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); } @Override @Nonnull public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L); Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override @Nonnull public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L); Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override @Nonnull public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( command, triggerTime(unit.toNanos(initialDelay)), unit.toNanos(period)); Cancellable cancellable = actorSystem.scheduler().schedule( new FiniteDuration(initialDelay, unit), new FiniteDuration(period, unit), scheduledFutureTask, actorSystem.dispatcher()); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override @Nonnull public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( command, triggerTime(unit.toNanos(initialDelay)), unit.toNanos(-delay)); Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override public void execute(@Nonnull Runnable command) { actorSystem.dispatcher().execute(command); } private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) { return actorSystem.scheduler().scheduleOnce( new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); } private long now() { return System.nanoTime(); } private long triggerTime(long delay) { return now() + delay; } private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private long time; private final long period; private volatile Cancellable cancellable; ScheduledFutureTask(Callable<V> callable, long time, long period) { super(callable); this.time = time; this.period = period; } ScheduledFutureTask(Runnable runnable, long time, long period) { super(runnable, null); this.time = time; this.period = period; } public void setCancellable(Cancellable newCancellable) { this.cancellable = newCancellable; } @Override public void run() { if (!isPeriodic()) { super.run(); } else if (runAndReset()){ if (period > 0L) { time += period; } else { cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS); // check whether we have been cancelled concurrently if (isCancelled()) { cancellable.cancel(); } else { time = triggerTime(-period); } } } } @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = super.cancel(mayInterruptIfRunning); return result && cancellable.cancel(); } @Override public long getDelay(@Nonnull TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } @Override public int compareTo(@Nonnull Delayed o) { if (o == this) { return 0; } long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0; } @Override public boolean isPeriodic() { return period != 0L; } } }
- ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法
- schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果
- scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period
小结
- ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter
- ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute方法
- ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法;schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果;scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11