追踪解析 ThreadPoolExecutor 源码

零 前期准备

0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

2 ThreadPoolExecutor 简介

ThreadPoolExecutor 是 jdk4 中加入的工具,被封装在 jdk 自带的 Executors 框架中,是 java 中最经典的线程池技术。

ThreadPoolExecutor 类在 concurrent 包下,和其它线程工具类一样都由 Doug Lea 大神操刀完成。

[ 在看完 Spring ioc 和 Gson 之后有点乏了,换换口味看一些 jdk 的源码 ]

3 Demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {

    public static void main(String[] args){
        //创建线程池
        //这里使用固定线程数的线程池,线程数为 5
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for(int i = 0 ; i < 100 ; i ++){
            final int ii = i;
            //创建 Runnable 作为线程池的任务
            Runnable r = () -> System.out.println(ii);
            //执行
            executorService.execute(r);
        }
    }
}

一 线程池的初始化

线程池的初始化调用的 Executors 框架的静态方法:

//Executors.class
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

继续追踪这个构造方法:

//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

继续追踪:

//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) {

    //验证参数的有效性                        
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    //本例中不涉及权限
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();

    //线程数
    this.corePoolSize = corePoolSize;
    //最大线程数
    //本例中使用固定线程数的线程池,所以线程数和最大线程数相等
    this.maximumPoolSize = maximumPoolSize;
    //用于存储任务的队列
    //此处使用 LinkedBlockingQueue 来储存任务,其线程安全
    this.workQueue = workQueue;
    //keepAliveTime 参数用于表示:
    //对于超出线程和队列缓存总和的任务,是否要临时增加线程来处理
    //超出的线程的存在时间是多少
    //这里使用的是定长线程池,所以 keepAliveTime = 0,即不增加线程
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    //用于创建线程的工厂类
    this.threadFactory = threadFactory;
    //handler 用来处理 task 太多时候的拒绝策略
    //此例中使用的是默认的,即定义在 ThreadPoolExecutor 中的 defaultHandler 对象
    this.handler = handler;
}

二 Worker

Worker 是 ThreadPoolExecutor 的内部类,可以看做是 Runnable 的代理类:

//ThreadPoolExecutor.class
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    final Thread thread;
    Runnable firstTask;
    //完成 task 数量的计数器
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        //这个方法是 AbstractQueuedSynchronizer 中的方法,功能相当于加锁
        //-1 的意思是后续的任务会处于阻塞状态,即为已经加锁
        setState(-1);
        //在创建的时候存入一个要处理的 task
        //需要注意的是每个 worker 对象被创建出来之后是可以重复利用来处理多个 task 的
        this.firstTask = firstTask;
        //worker 会用自身作为 Runnable 对象去创建一个线程
        //这里调用线程工厂进行线程创建
        this.thread = getThreadFactory().newThread(this);
    }

    //对于线程变量来说,其启动的就是 worker 的 run() 方法
    public void run() {
        //runWorker(...) 方法在 ThreadPoolExecutor 里
        runWorker(this);
    }

    //获取锁的状态
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    //重写了 AbstractQueuedSynchronizer 中的 tryAcquire(...) 方法
    //尝试加锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    //重写了 AbstractQueuedSynchronizer 中的 tryRelease(...) 方法
    //尝试释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    //真正的加锁方法
    public void lock() { 
        acquire(1); 
    }
    //尝试加锁
    public boolean tryLock() { 
        return tryAcquire(1); 
    }
    //真正的释放锁方法
    public void unlock() { 
        release(1); 
    }
    //判断是否在锁中
    public boolean isLocked() { 
        return isHeldExclusively(); 
    }
    //中断线程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

追踪一下 runWorker(...) 方法:

//ThreadPoolExecutor.class
final void runWorker(Worker w) {
    //获取当前所在的线程的实例对象
    Thread wt = Thread.currentThread();
    //获取 task
    Runnable task = w.firstTask;
    //取出来之后把 task 置空
    w.firstTask = null;
    //此处释放锁
    w.unlock();
    //指示器,此变量为 true 的时候确认该方法已经执行完毕
    boolean completedAbruptly = true;
    try {
        //此处为一个 while 循环,用于不断的执行 task
        //getTask() 方法会从队列里不断抓取 task 并进行执行
        //当 task 为 null,且队列里已经没有更多 task 的时候,就会终止循环
        while (task != null || (task = getTask()) != null) {
            //加锁,独占线程
            w.lock();
            //在这里会判断线程的状态,如果存在符合中断的情况,就会直接中断掉
            if ((runStateAtLeast(ctl.get(), STOP) 
                    || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();

            try {
                //beforeExecute(...) 和 afterExecute(...) 方法在 ThreadPoolExecutor 中并没有实现
                //是预留出来给使用者重写,以达到业务需求的方法
                beforeExecute(wt, task);
                try {
                    //此处执行 task
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                //将执行的 task 置空
                task = null;
                //每完成一个 task 就会加 1
                w.completedTasks++;
                //释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //这个方法会销毁掉 worker
        //同时如果检测到有新的 task 又会重新创建 Worker
        processWorkerExit(w, completedAbruptly);
    }
}

Worker 是线程池中真正起完成业务逻辑的组件,是任务和线程的封装。

三 线程池的状态控制

线程池的状态主要由 ctl 变量来进行控制:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 是一个 AtomicInteger 类型的变量,其实可以简单理解为一个 int 值,AtomicInteger 只是能够适应高并发的原子化操作的需要。

ctl 的前 29 位数用来表示线程(Worker)的数量,后面三位用来表示线程池的状态。

线程池的状态有五种,分别是 Running、Shutdown、Stop、Tidying、Terminate,根据单词就能猜出大概。

注意的是,这五种状态在线程池中都以 int 变量的形式存在,从前到后依次变大,对状态的比较有一系列方法:

//ThreadPoolExecutor.class
private static boolean runStateLessThan(int c, int s) {
    //c 的状态值要小于 s
    return c < s;
}
//ThreadPoolExecutor.class
private static boolean runStateAtLeast(int c, int s) {
    //c 的状态值要大于或等于 s
    return c >= s;
}
//ThreadPoolExecutor.class
private static boolean isRunning(int c) {
    //状态里只有 RUNNING 是小于 SHUTDOWN 的
    return c < SHUTDOWN;
}

在这些方法里,传入的参数 c 一般指的是当前线程池状态,s 是用来对比的参照状态。

四 线程池的执行

该 part 的起点:

executorService.execute(r);

来追踪 execute(...) 方法:

public void execute(Runnable command) {
    //有效性验证
    if (command == null)
        throw new NullPointerException();
    
    //ctl 是一个 AtomicInteger 类型的变量,用来记录线程池的状态
    int c = ctl.get();
    
    //workerCountOf(...) 方法会返回当前运行的 Worker 的数量
    if (workerCountOf(c) < corePoolSize) {
        //Worker 的数量小于线程池容量的情况下
        //直接增加 Worker 并取出 task 去运行
        if (addWorker(command, true))
            return;
        //如果 Worker 已经顺利执行了 task,应该会直接返回掉
        //如果执行中出现了其它情况,则会继续往下走
        //此处刷新状态
        c = ctl.get();
    }
    //当 Worker 数量已经达到线程池的指定数量,或者添加 Worker 的时候出问题的时候,会进入此判断语句
    //先判断线程池是否处于活跃状态,且 task 是否已经被成功添加到队列中
    //如果不满足,会进入 else 语句中,先最后尝试一次 addWorker(...) 方法,如果不成功就拒绝 task
    //reject(...) 方法会调用 handler 的拒绝策略
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }else if (!addWorker(command, false))
        reject(command);
}

1 reject

这里先提及一下 reject(...) 方法:

//ThreadPoolExecutor.class
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

本质是调用了 handler 对象的相关方法。在本例中,handler 对象指向了 defaultHandler:

//ThreadPoolExecutor.class
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

defaultHandler 是一个 AbortPolicy 类型的对象,而 AbortPolicy 是 ThreadPoolExecutor 的静态内部类。

AbortPolicy 起作用的方法为 rejectedExecution(...) 方法:

//AbortPolicy.class
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                        " rejected from " + e.toString());
}

也就是说,在 task 过多的情况下,AbortPolicy 的应对策略是抛出异常。

2 addWorker

来看一下核心方法 addWorker(...):

//ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {
    //先标记这个 for 循环,方便退出循环
    retry:
    //在每一次循环开始之前会刷新一次状态标识
    for (int c = ctl.get();;) {
        //这里先进行判断,如果线程池已经关闭了,或者没有 task 了,就会返回 false
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            //如果 Worker 数量已经超出了最大值就会直接返回 false
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            //将 ctl 变量的值加 1,如果成功了就会跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            //在状态值比 SHUTDOWN 大的时候会直接跳到最外头的循环里
            //需要注意的是最外面的 for 循环会判断状态值是否大于 SHUTDOWN
            //如果大于 SHUTDOWN 的话就返回 false 了
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个 Worker
        w = new Worker(firstTask);
        //获取线程对象
        final Thread t = w.thread;
        if (t != null) {
            //加锁,此处加的是一把全局的锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                //如果状态值 c 是 RUNNING,或者 [c 是 RUNNING 或者 SHUTDOWN 且 firstTask 是 null] 就会进入这个判断语句
                //
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    //如果这个线程已经处于运作状态,会抛出异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //workers 是一个列表,用于存储 Worker 对象
                    workers.add(w);
                    //获取 Worker 的数量
                    int s = workers.size();
                    //largestPoolSize 用来记录线程池达到过的最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //标记 Worker 已经被添加
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //先判断 Worker 是否已经被添加到 workers 内了
            if (workerAdded) {
                //这是该方法核心的启动线程方法
                t.start();
                //标记 Worker 已经开始运行了
                workerStarted = true;
            }
        }
    } finally {
        //如果没有标记 Worker 已经开始工作,会在这里销毁掉 Worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

五 一点唠叨

先总结一下线程池的业务逻辑:

1 接收到 task (即实现了 Runnable 接口的实例对象) [execute(...) 方法]

2 用 task 去尝试创建一个 Worker 实例 [execute(...) 方法]
    2.1 如果 Worker 数量没有达到线程池的指定最大值 -> 新建
    2.2 如果 Worker 数量达到了线程池的指定最大值 -> 不会再创建,而是把 task 储存起来等待空闲的 Worker 去提取
    2.3 如果 task 队列也已经满了,无法再添加 -> 触发拒绝机制(handler)

3 Worker 在执行的时候调用其内部的 Thread 实例对象的 start() 方法 [addWorker(...) 方法]

4 该 start() 方法会调用到 Worker 的 run() 方法 [Worker.class 内的 run() 方法]

5 Worker 的 run() 方法本质上是封装了 task 的 run() 方法 [runWorker(...) 方法]

主线业务逻辑不算复杂,比较艰难的是为了保证数据的一致性,线程池代码中充斥着大量的状态判断和锁机制。

并且为了考虑性能问题,线程池的设计没有使用悲观锁(synchronized 关键字),而是大量使用了 ASQ 和 ReetrentLock 机制。


本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

相关推荐