聊聊dubbo的EagerThreadPool
序
本文主要研究一下dubbo的EagerThreadPool
EagerThreadPool
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPool.java
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); // init queue and executor TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } }
- EagerThreadPool实现了ThreadPool接口,其getExecutor创建的是EagerThreadPoolExecutor,它使用的queue为TaskQueue,使用的threadFactory为NamedInternalThreadFactory,使用的rejectedExecutionHandler为AbortPolicyWithReport
EagerThreadPoolExecutor
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
public class EagerThreadPoolExecutor extends ThreadPoolExecutor { /** * task count */ private final AtomicInteger submittedTaskCount = new AtomicInteger(0); public EagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } /** * @return current tasks which are executed */ public int getSubmittedTaskCount() { return submittedTaskCount.get(); } @Override protected void afterExecute(Runnable r, Throwable t) { submittedTaskCount.decrementAndGet(); } @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! submittedTaskCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { // retry to offer the task into queue. final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); throw t; } } }
- EagerThreadPoolExecutor继承了ThreadPoolExecutor,它维护了submittedTaskCount,在执行任务之前递增,在afterExecute的时候胡递减;其execute方法会捕获RejectedExecutionException,然后使用TaskQueue的retryOffer再重新入队,入队不成功才抛出RejectedExecutionException
TaskQueue
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> { private static final long serialVersionUID = -2635853580887179627L; private EagerThreadPoolExecutor executor; public TaskQueue(int capacity) { super(capacity); } public void setExecutor(EagerThreadPoolExecutor exec) { executor = exec; } @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } int currentPoolThreadSize = executor.getPoolSize(); // have free worker. put task into queue to let the worker deal with task. if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } // return false to let executor create new worker. if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // currentPoolThreadSize >= max return super.offer(runnable); } /** * retry offer task * * @param o task * @return offer success or not * @throws RejectedExecutionException if executor is terminated. */ public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException("Executor is shutdown!"); } return super.offer(o, timeout, unit); } }
- TaskQueue继承了LinkedBlockingQueue,它覆盖了offer方法,该方法在submittedTaskCount小于poolSize的时候会入队,如果大于等于poolSize则再判断currentPoolThreadSize是否小于maximumPoolSize,如果小于则返回false让线程池创建新线程,最后在currentPoolThreadSize大于等于maximumPoolSize的时候入队
NamedInternalThreadFactory
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadlocal/NamedInternalThreadFactory.java
public class NamedInternalThreadFactory extends NamedThreadFactory { public NamedInternalThreadFactory() { super(); } public NamedInternalThreadFactory(String prefix) { super(prefix, false); } public NamedInternalThreadFactory(String prefix, boolean daemon) { super(prefix, daemon); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); InternalThread ret = new InternalThread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; } }
- NamedInternalThreadFactory继承了NamedThreadFactory,这里创建的是InternalThread
AbortPolicyWithReport
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReport.java
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; private static volatile long lastPrintTime = 0; private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000; private static final String OS_WIN_PREFIX = "win"; private static final String OS_NAME_KEY = "os.name"; private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss"; private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss"; private static Semaphore guard = new Semaphore(1); public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: " + "%d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); } private void dumpJStack() { long now = System.currentTimeMillis(); //dump every 10 minutes if (now - lastPrintTime < TEN_MINUTES_MILLS) { return; } if (!guard.tryAcquire()) { return; } ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(() -> { String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home")); SimpleDateFormat sdf; String os = System.getProperty(OS_NAME_KEY).toLowerCase(); // window system don't support ":" in file name if (os.contains(OS_WIN_PREFIX)) { sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT); } else { sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT); } String dateStr = sdf.format(new Date()); //try-with-resources try (FileOutputStream jStackStream = new FileOutputStream( new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) { JVMUtil.jstack(jStackStream); } catch (Throwable t) { logger.error("dump jStack error", t); } finally { guard.release(); } lastPrintTime = System.currentTimeMillis(); }); //must shutdown thread pool ,if not will lead to OOM pool.shutdown(); } }
- AbortPolicyWithReport继承了ThreadPoolExecutor.AbortPolicy,其rejectedExecution方法会输出包含thread pool相关信息的msg,然后使用warn级别打印出来,然后进行dumpJStack,最后再抛出RejectedExecutionException
小结
EagerThreadPool实现了ThreadPool接口,其getExecutor创建的是EagerThreadPoolExecutor,它使用的queue为TaskQueue,使用的threadFactory为NamedInternalThreadFactory,使用的rejectedExecutionHandler为AbortPolicyWithReport