聊聊Elasticsearch的EsThreadPoolExecutor
序
本文主要研究一下Elasticsearch的EsThreadPoolExecutor
EsThreadPoolExecutor
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java
public class EsThreadPoolExecutor extends ThreadPoolExecutor { private final ThreadContext contextHolder; private volatile ShutdownListener listener; private final Object monitor = new Object(); /** * Name used in error reporting. */ private final String name; final String getName() { return name; } EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) { this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder); } @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; } public void shutdown(ShutdownListener listener) { synchronized (monitor) { if (this.listener != null) { throw new IllegalStateException("Shutdown was already called on this thread pool"); } if (isTerminated()) { listener.onTerminated(); } else { this.listener = listener; } } shutdown(); } @Override protected synchronized void terminated() { super.terminated(); synchronized (monitor) { if (listener != null) { try { listener.onTerminated(); } finally { listener = null; } } } } public interface ShutdownListener { void onTerminated(); } @Override public void execute(Runnable command) { command = wrapRunnable(command); try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); EsExecutors.rethrowErrors(unwrap(r)); assert assertDefaultContext(r); } private boolean assertDefaultContext(Runnable r) { try { assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" + Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]"; } catch (IllegalStateException ex) { // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks // this must not trigger an exception here since we only assert if the default is restored and // we don't really care if we are closed if (contextHolder.isClosed() == false) { throw ex; } } return true; } /** * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted * {@link Runnable} instances rather than potentially wrapped ones. */ public Stream<Runnable> getTasks() { return this.getQueue().stream().map(this::unwrap); } @Override public final String toString() { StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()).append('['); b.append("name = ").append(name).append(", "); if (getQueue() instanceof SizeBlockingQueue) { @SuppressWarnings("rawtypes") SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); b.append("queue capacity = ").append(queue.capacity()).append(", "); } appendThreadPoolExecutorDetails(b); /* * ThreadPoolExecutor has some nice information in its toString but we * can't get at it easily without just getting the toString. */ b.append(super.toString()).append(']'); return b.toString(); } /** * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in * the form "%s = %s, " * * @param sb the {@link StringBuilder} to append to */ protected void appendThreadPoolExecutorDetails(final StringBuilder sb) { } protected Runnable wrapRunnable(Runnable command) { return contextHolder.preserveContext(command); } protected Runnable unwrap(Runnable runnable) { return contextHolder.unwrap(runnable); } }
- EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
- 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
- 它提供了wrapRunnable及unwrap方法,分别会调用contextHolder.preserveContext及contextHolder.unwrap方法
XRejectedExecutionHandler
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java
public interface XRejectedExecutionHandler extends RejectedExecutionHandler { /** * The number of rejected executions. */ long rejected(); }
- XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy
EsAbortPolicy
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java
public class EsAbortPolicy implements XRejectedExecutionHandler { private final CounterMetric rejected = new CounterMetric(); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r instanceof AbstractRunnable) { if (((AbstractRunnable) r).isForceExecution()) { BlockingQueue<Runnable> queue = executor.getQueue(); if (!(queue instanceof SizeBlockingQueue)) { throw new IllegalStateException("forced execution, but expected a size queue"); } try { ((SizeBlockingQueue) queue).forcePut(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("forced execution, but got interrupted", e); } return; } } rejected.inc(); throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } @Override public long rejected() { return rejected.count(); } }
- EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数
ForceQueuePolicy
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
static class ForceQueuePolicy implements XRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // force queue policy should only be used with a scaling queue assert executor.getQueue() instanceof ExecutorScalingQueue; executor.getQueue().put(r); } catch (final InterruptedException e) { // a scaling queue never blocks so a put to it can never be interrupted throw new AssertionError(e); } } @Override public long rejected() { return 0; } }
- ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0
AbstractRunnable
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java
public abstract class AbstractRunnable implements Runnable { /** * Should the runnable force its execution in case it gets rejected? */ public boolean isForceExecution() { return false; } @Override public final void run() { try { doRun(); } catch (Exception t) { onFailure(t); } finally { onAfter(); } } /** * This method is called in a finally block after successful execution * or on a rejection. */ public void onAfter() { // nothing by default } /** * This method is invoked for all exception thrown by {@link #doRun()} */ public abstract void onFailure(Exception e); /** * This should be executed if the thread-pool executing this action rejected the execution. * The default implementation forwards to {@link #onFailure(Exception)} */ public void onRejection(Exception e) { onFailure(e); } /** * This method has the same semantics as {@link Runnable#run()} * @throws InterruptedException if the run method throws an InterruptedException */ protected abstract void doRun() throws Exception; }
- AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution
小结
- EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
- 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
- XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy
- EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数
- ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0
- AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution
doc
相关推荐
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。