聊聊Elasticsearch的EsThreadPoolExecutor

本文主要研究一下Elasticsearch的EsThreadPoolExecutor

EsThreadPoolExecutor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

public class EsThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadContext contextHolder;
    private volatile ShutdownListener listener;

    private final Object monitor = new Object();
    /**
     * Name used in error reporting.
     */
    private final String name;

    final String getName() {
        return name;
    }

    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
        this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
            ThreadContext contextHolder) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.name = name;
        this.contextHolder = contextHolder;
    }

    public void shutdown(ShutdownListener listener) {
        synchronized (monitor) {
            if (this.listener != null) {
                throw new IllegalStateException("Shutdown was already called on this thread pool");
            }
            if (isTerminated()) {
                listener.onTerminated();
            } else {
                this.listener = listener;
            }
        }
        shutdown();
    }

    @Override
    protected synchronized void terminated() {
        super.terminated();
        synchronized (monitor) {
            if (listener != null) {
                try {
                    listener.onTerminated();
                } finally {
                    listener = null;
                }
            }
        }
    }

    public interface ShutdownListener {
        void onTerminated();
    }

    @Override
    public void execute(Runnable command) {
        command = wrapRunnable(command);
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it.
                try {
                    ((AbstractRunnable) command).onRejection(ex);
                } finally {
                    ((AbstractRunnable) command).onAfter();

                }
            } else {
                throw ex;
            }
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        EsExecutors.rethrowErrors(unwrap(r));
        assert assertDefaultContext(r);
    }

    private boolean assertDefaultContext(Runnable r) {
        try {
            assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
                Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
        } catch (IllegalStateException ex) {
            // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
            // this must not trigger an exception here since we only assert if the default is restored and
            // we don't really care if we are closed
            if (contextHolder.isClosed() == false) {
                throw ex;
            }
        }
        return true;
    }

    /**
     * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted
     * {@link Runnable} instances rather than potentially wrapped ones.
     */
    public Stream<Runnable> getTasks() {
        return this.getQueue().stream().map(this::unwrap);
    }

    @Override
    public final String toString() {
        StringBuilder b = new StringBuilder();
        b.append(getClass().getSimpleName()).append('[');
        b.append("name = ").append(name).append(", ");
        if (getQueue() instanceof SizeBlockingQueue) {
            @SuppressWarnings("rawtypes")
            SizeBlockingQueue queue = (SizeBlockingQueue) getQueue();
            b.append("queue capacity = ").append(queue.capacity()).append(", ");
        }
        appendThreadPoolExecutorDetails(b);
        /*
         * ThreadPoolExecutor has some nice information in its toString but we
         * can't get at it easily without just getting the toString.
         */
        b.append(super.toString()).append(']');
        return b.toString();
    }

    /**
     * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in
     * the form "%s = %s, "
     *
     * @param sb the {@link StringBuilder} to append to
     */
    protected void appendThreadPoolExecutorDetails(final StringBuilder sb) {

    }

    protected Runnable wrapRunnable(Runnable command) {
        return contextHolder.preserveContext(command);
    }

    protected Runnable unwrap(Runnable runnable) {
        return contextHolder.unwrap(runnable);
    }
}
  • EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
  • 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
  • 它提供了wrapRunnable及unwrap方法,分别会调用contextHolder.preserveContext及contextHolder.unwrap方法

XRejectedExecutionHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {

    /**
     * The number of rejected executions.
     */
    long rejected();
}
  • XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy

EsAbortPolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java

public class EsAbortPolicy implements XRejectedExecutionHandler {
    private final CounterMetric rejected = new CounterMetric();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if (!(queue instanceof SizeBlockingQueue)) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    ((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        rejected.inc();
        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
    }

    @Override
    public long rejected() {
        return rejected.count();
    }
}
  • EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数

ForceQueuePolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

static class ForceQueuePolicy implements XRejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // force queue policy should only be used with a scaling queue
                assert executor.getQueue() instanceof ExecutorScalingQueue;
                executor.getQueue().put(r);
            } catch (final InterruptedException e) {
                // a scaling queue never blocks so a put to it can never be interrupted
                throw new AssertionError(e);
            }
        }

        @Override
        public long rejected() {
            return 0;
        }

    }
  • ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0

AbstractRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java

public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
  • AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution

小结

  • EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
  • 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
  • XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy
  • EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数
  • ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0
  • AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution

doc

相关推荐