J.U.C|AQS共享式源码分析
一、写在前面
上篇给大家聊了独占式的源码,具体参见《J.U.C|AQS独占式源码分析》
这一章我们继续在AQS的源码世界中遨游,解读共享式同步状态的获取和释放。
二、什么是共享式
共享式与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态。
我们以读写锁为例来看两者,一个线程在对一个资源文件进行读操作时,那么这一时刻对于文件的写操作均被阻塞,而其它线程的读操作可以同时进行。
当写操作要求对资源独占操作,而读操作可以是共享的,两种不同的操作对同一资源进行操作会是什么样的?看下图
共享式访问资源,其他共享时均被允许,而独占式被阻塞。
独占式访问资源时,其它访问均被阻塞。
通过读写锁给大家一起温故下独占式和共享式概念,上一节我们已经聊过独占式,本章我们主要聊共享式。
主要讲解方法
- protected int tryAcquireShared(int arg);共享式获取同步状态,返回值 >= 0 表示获取成功,反之则失败。
- protected boolean tryReleaseShared(int arg): 共享式释放同步状态。
三、核心方法分析
3.1 同步状态的获取
public final void acquireShared(int arg)
共享式获取同步状态的顶级入口,如果当前线程未获取到同步状态,将会加入到同步队列中等待,与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态。
方法源码
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
方法函数解析
- tryAcquireShared(arg):获取同步状态,返回值大于等于0表示获取成功,否则失败。
- doAcquireShared(arg):共享式获取共享状态,包含构建节点,加入队列等待,唤醒节点等操作。
源码分析
同步器的 acquireShared 和 doAcquireShared 方法
//请求共享锁的入口 public final void acquireShared(int arg) { // 当state != 0 并且tryAcquireShared(arg) < 0 时才去才获取资源 if (tryAcquireShared(arg) < 0) // 获取锁 doAcquireShared(arg); }
// 以共享不可中断模式获取锁 private void doAcquireShared(int arg) { // 将当前线程一共享方式构建成 node 节点并将其加入到同步队列的尾部。这里addWaiter(Node.SHARED)操作和独占式基本一样, final Node node = addWaiter(Node.SHARED); // 是否成功标记 boolean failed = true; try { // 等待过程是否被中断标记 boolean interrupted = false; 自旋 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 判断前驱节点是否是head节点,也就是看自己是不是老二节点 if (p == head) { // 如果自己是老二节点,尝试获取资源锁,返回三种状态 // state < 0 : 表示获取资源失败 // state = 0: 表示当前正好线程获取到资源, 此时不需要进行向后继节点传播。 // state > 0: 表示当前线程获取资源锁后,还有多余的资源,需要向后继节点继续传播,获取资源。 int r = tryAcquireShared(arg); // 获取资源成功 if (r >= 0) { // 当前节点线程获取资源成功后,对后继节点进行逻辑操作 setHeadAndPropagate(node, r); // setHeadAndPropagate(node, r) 已经对node.prev = null,在这有对p.next = null; 等待GC进行垃圾收集。 p.next = null; // help GC // 如果等待过程被中断了, 将中断给补上。 if (interrupted) selfInterrupt(); failed = false; return; } } // 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在acquireShared(int arg)方法中,同步器调用tryAcquireShared(arg)方法获取同步状态,返回同步状态有两种。
当同步状态大于等于0时: 表示可以获取到同步状态,退出自旋,在doAcquireShared(int arg)方法中可以看到节点获取资源退出自旋的条件就是大于等于0
小于0会加入同步队列中等待被唤醒。
addWaiter和enq方法
// 创建节点,并将节点加入到同步队列尾部中。 private Node addWaiter(Node mode) { // 以共享方式为线程构建Node节点 Node node = new Node(Thread.currentThread(), mode); // 尝试快速加入到队列尾部 Node pred = tail; if (pred != null) { node.prev = pred; // CAS保证原子操作,将node节点加入到队列尾部 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速加入失败,走 enq(node)方法 enq(node); return node; }
//以自旋的方式,将node节点加入到队列的尾部 private Node enq(final Node node) { // 自旋 for (;;) { // 获取尾部节点 Node t = tail; // 如果tail节点为空, 说明同步队列还没初始化,必须先进行初始化 if (t == null) { // Must initialize // CAS保证原子操作, 新建一个空 node 节点并将其设置为head节点 if (compareAndSetHead(new Node())) // 设置成功并将tail也指向该节点 tail = head; } else { // 将node节点加入到队列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这两个方法和独占式的基本相同,注释中都标明了,在这就不多做解释了。
获取资源成功后对后继节点的操作setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) { // 记录老的head节点,以便核对 Node h = head; // Record old head for check below // 将node 设置成head节点 setHead(node); // 这里表示: 如果资源足够(propagate > 0)或者旧头节点为空(h == null)或者旧节点的waitStatus为 SIGNAL(-1) 或者 PROPAGATE(-3)(h.waitStatus < 0) // 或者当前head节点不为空或者waitStatus为SIGNAL(-1) 或者 PROPAGATE(-3),此时需要继续唤醒后继节点来尝试获取资源。 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 当前node节点的后继节点 Node s = node.next; //如果后节点为空或者属于共享节点 if (s == null || s.isShared()) // 继续尝试获取资源 doReleaseShared(); } }
首先将当前节点设置为head节点 setHead(node), 其次根据条件看是否对后继节点继续唤醒。
获取资源失败进行阻塞等待unpark
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱节点的等待状态 int ws = pred.waitStatus; // 如果等待状态已经为SIGNAL(表示当前当前节点的后继节点处于等待状态,如果当前节点释放了同步状态或者被中断, 则会唤醒后继节点) if (ws == Node.SIGNAL) // 直接返回,表示可以安心的去休息了 return true; // 如果前驱的节点的状态 ws > 0(表示该节点已经被取消或者中断,也就是成无效节点,需要从同步队列中取消的) if (ws > 0) { // 循环往前需寻找,知道寻找到一个有效的安全点(一个等待状态<= 0 的节点,排在它后面) do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 注意这一波操作后,获奖取消的节点全部变成GC可回收的废弃链。 pred.next = node; } else { //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它获取资源后通知自己一下。有可能失败,人家说不定刚刚释放完呢! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 调用park方法使当前节点的线程进入waiting LockSupport.park(this); //返回线程中断状态 return Thread.interrupted(); }
这两个方法和独占式基本相同。
接着看doReleaseShared 这个比较复杂
private void doReleaseShared() { //注意,这里的头结点已经是上面新设定的头结点了,从这里可以看出,如果propagate=0, //不会进入doReleaseShared方法里面,那就有共享式变成了独占式 for (;;) { // 死循环以防在执行此操作时添加新节点:退出条件 h == head Node h = head; // 前提条件,当前的头节点不为空,并且不是尾节点 if (h != null && h != tail) { // 当前头节点的等待状态 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 如果当前节点的状态为SIGNAL,则利用CAS将其状态设置为0(也就是初始状态) //这里不直接设为Node.PROPAGATE,是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases 设置失败,重新循环 // 唤醒后继节点 unparkSuccessor(h); } // 如果等待状态不为0 则利用CAS将其状态设置为PROPAGATE ,以确保在释放资源时能够继续通知后继节点。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed 如果head 期间发生了改变,则需要从新循坏 break; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 在此再次判断当前头节点的的状态,如果小于0 将设置为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //获取后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { //如果后继节点为空或者等待状态大于0 直接放弃。 s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 循环从尾部往前寻找下一个等待状态不大于0的节点 if (t.waitStatus <= 0) s = t; } // 唤醒该节点的线程 if (s != null) LockSupport.unpark(s.thread); }
3.2 共享状态释放
最后一步释放资源就比较简单了。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
四、总结
在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程会加入到队列中并进行自旋,出列的(或者停止自旋)的条件时前驱节点为头节点并且成功获取了同步状态。在释放同步状态时,调用Release方法释放同步状态,然后唤醒头节点的后继节点。
共享式方式在唤醒后继节点获得资源后会判断当前资源是否还有多余的,如果有会继续唤醒下一个节点。