并发编程之Semaphore源码解析
一 什么是Semaphore、Semaphore用来做什么
semaphore是计数信号量,可用于多线程并发执行时,限制获取资源的线程数量。常用场景为:限流。
二 Semaphore用法
public class SemaphoreTest { public static void main(String[] args) { // 声明5个窗口 state: 资源数 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 3; i++) { new Thread(new Runnable() { @Override public void run() { try { // 占用窗口 semaphore.acquire(2); System.out.println(Thread.currentThread().getName() + ": 开始买票"); //模拟买票流程 Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + ": 购票成功"); // 释放窗口 semaphore.release(2); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } }
执行结果为:
为什么会出现这种情况呢?
三 Semaphore源码解析
看上图执行结果和执行过程,我们会提出三个疑问:
1、Semaphore如何获取信号量
2、Semaphore获取信号量失败,又如何进入等待队列
3、等待队列线程如何阻塞的
4、release信号量时,又如何唤醒下一个线程的。
看上图可以解答如何获取信号量、获取信号量失败时如何添加到等待队列
1、Semaphore如何获取信号量
核心逻辑在Sync.nonfairTryAcquireShared()方法,首先getState(),state为volatitle变量,remaining>0且cas更新成功,代表获取信号量成功
abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
2、Semaphore获取信号量失败,又如何进入等待队列
首先等待队列是一个双向链表。
thread2插入的时候,链表为空,需要初始化head节点,然后把thread2放到head节点的next
thread3,可以快速插入,放到thread2的next节点。
private Node addWaiter(Node mode) { // 以给定的模式来构建节点, mode有两种模式 // 共享式SHARED, 独占式EXCLUSIVE; Node node = new Node(Thread.currentThread(), mode); // 尝试快速将该节点加入到队列的尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果快速加入失败,则通过 anq方式入列 enq(node); return node; } private Node enq(final Node node) { // CAS自旋,直到加入队尾成功 for (;;) { Node t = tail; if (t == null) { // 如果队列为空,则必须先初始化CLH队列,新建一个空节点标识作为 Hader节点,并将tail 指向它 if (compareAndSetHead(new Node())) tail = head; } else {// 正常流程,加入队列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } 出队操作 同步队列(CLH)遵循FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒 它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点 Condition队列 Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些 对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。 } } }
3、等待队列线程如何阻塞的
加入等待队列成功后,调用shouldParkAfterFailedAcquire方法把等待队列waitStatus置为-1(等待被通知执行状态),然后调用parkAndCheckInterrupt方法阻塞当前线程。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
4、release信号量时,又如何唤醒下一个线程的
1、自旋为aqs的state值cas添加释放的信号量
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
2、当前线程出队,next节点为head节点,并且唤醒next线程
private void doReleaseShared() { //移除当前节点,设置next接单为head for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒next线程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
unparkSuccessor---->LockSupport.unpark(s.thread);