聊聊flink的ScheduledExecutor

本文主要研究一下flink的ScheduledExecutor

Executor

java.base/java/util/concurrent/Executor.java

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
  • jdk的Executor接口定义了execute方法,接收参数类型为Runnable

ScheduledExecutor

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java

public interface ScheduledExecutor extends Executor {

    /**
     * Executes the given command after the given delay.
     *
     * @param command the task to execute in the future
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @return a ScheduledFuture representing the completion of the scheduled task
     */
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    /**
     * Executes the given callable after the given delay. The result of the callable is returned
     * as a {@link ScheduledFuture}.
     *
     * @param callable the callable to execute
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @param <V> result type of the callable
     * @return a ScheduledFuture which holds the future value of the given callable
     */
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /**
     * Executes the given command periodically. The first execution is started after the
     * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
     * the third after {@code initialDelay + 2*period} and so on.
     * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
     * is cancelled.
     *
     * @param command the task to be executed periodically
     * @param initialDelay the time from now until the first execution is triggered
     * @param period the time after which the next execution is triggered
     * @param unit the time unit of the delay and period parameter
     * @return a ScheduledFuture representing the periodic task. This future never completes
     * unless an execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleAtFixedRate(
        Runnable command,
        long initialDelay,
        long period,
        TimeUnit unit);

    /**
     * Executed the given command repeatedly with the given delay between the end of an execution
     * and the start of the next execution.
     * The task is executed repeatedly until either an exception occurs or if the returned
     * {@link ScheduledFuture} is cancelled.
     *
     * @param command the task to execute repeatedly
     * @param initialDelay the time from now until the first execution is triggered
     * @param delay the time between the end of the current and the start of the next execution
     * @param unit the time unit of the initial delay and the delay parameter
     * @return a ScheduledFuture representing the repeatedly executed task. This future never
     * completes unless the execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleWithFixedDelay(
        Runnable command,
        long initialDelay,
        long delay,
        TimeUnit unit);
}
  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter

ScheduledExecutorServiceAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java

public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {

    private final ScheduledExecutorService scheduledExecutorService;

    public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

    @Override
    public void execute(Runnable command) {
        scheduledExecutorService.execute(command);
    }
}
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、

scheduleWithFixedDelay、execute方法

ActorSystemScheduledExecutorAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java

public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

    private final ActorSystem actorSystem;

    public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
        this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);

        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);

        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(period));

        Cancellable cancellable = actorSystem.scheduler().schedule(
            new FiniteDuration(initialDelay, unit),
            new FiniteDuration(period, unit),
            scheduledFutureTask,
            actorSystem.dispatcher());

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(-delay));

        Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    public void execute(@Nonnull Runnable command) {
        actorSystem.dispatcher().execute(command);
    }

    private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
        return actorSystem.scheduler().scheduleOnce(
            new FiniteDuration(delay, unit),
            runnable,
            actorSystem.dispatcher());
    }

    private long now() {
        return System.nanoTime();
    }

    private long triggerTime(long delay) {
        return now() + delay;
    }

    private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

        private long time;

        private final long period;

        private volatile Cancellable cancellable;

        ScheduledFutureTask(Callable<V> callable, long time, long period) {
            super(callable);
            this.time = time;
            this.period = period;
        }

        ScheduledFutureTask(Runnable runnable, long time, long period) {
            super(runnable, null);
            this.time = time;
            this.period = period;
        }

        public void setCancellable(Cancellable newCancellable) {
            this.cancellable = newCancellable;
        }

        @Override
        public void run() {
            if (!isPeriodic()) {
                super.run();
            } else if (runAndReset()){
                if (period > 0L) {
                    time += period;
                } else {
                    cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);

                    // check whether we have been cancelled concurrently
                    if (isCancelled()) {
                        cancellable.cancel();
                    } else {
                        time = triggerTime(-period);
                    }
                }
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean result = super.cancel(mayInterruptIfRunning);

            return result && cancellable.cancel();
        }

        @Override
        public long getDelay(@Nonnull  TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int compareTo(@Nonnull Delayed o) {
            if (o == this) {
                return 0;
            }

            long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
        }

        @Override
        public boolean isPeriodic() {
            return period != 0L;
        }
    }
}
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法
  • schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果
  • scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

小结

  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute方法
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法;schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果;scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

doc

相关推荐