并发编程之Semaphore源码解析

一  什么是Semaphore、Semaphore用来做什么

    semaphore是计数信号量,可用于多线程并发执行时,限制获取资源的线程数量。常用场景为:限流。

二  Semaphore用法

public class SemaphoreTest {

    public static void main(String[] args) {
        // 声明5个窗口  state:  资源数
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 占用窗口
                        semaphore.acquire(2);
                        System.out.println(Thread.currentThread().getName() + ": 开始买票");
                        //模拟买票流程
                        Thread.sleep(5000);
                        System.out.println(Thread.currentThread().getName() + ": 购票成功");
                        // 释放窗口
                        semaphore.release(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

执行结果为:

 并发编程之Semaphore源码解析

为什么会出现这种情况呢?

并发编程之Semaphore源码解析

三 Semaphore源码解析

看上图执行结果和执行过程,我们会提出三个疑问:

1、Semaphore如何获取信号量

2、Semaphore获取信号量失败,又如何进入等待队列

3、等待队列线程如何阻塞的

4、release信号量时,又如何唤醒下一个线程的。

并发编程之Semaphore源码解析

看上图可以解答如何获取信号量、获取信号量失败时如何添加到等待队列

1、Semaphore如何获取信号量

   核心逻辑在Sync.nonfairTryAcquireShared()方法,首先getState(),state为volatitle变量,remaining>0且cas更新成功,代表获取信号量成功

abstract static class Sync extends AbstractQueuedSynchronizer {
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
}

2、Semaphore获取信号量失败,又如何进入等待队列

  首先等待队列是一个双向链表。

thread2插入的时候,链表为空,需要初始化head节点,然后把thread2放到head节点的next

thread3,可以快速插入,放到thread2的next节点。

private Node addWaiter(Node mode) {
// 以给定的模式来构建节点, mode有两种模式
// 共享式SHARED, 独占式EXCLUSIVE;
 Node node = new Node(Thread.currentThread(), mode);
  // 尝试快速将该节点加入到队列的尾部
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
      if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
     }
   }
    // 如果快速加入失败,则通过 anq方式入列
    enq(node);
    return node;
 }
private Node enq(final Node node) {
// CAS自旋,直到加入队尾成功    
for (;;) {
  Node t = tail;
    if (t == null) { // 如果队列为空,则必须先初始化CLH队列,新建一个空节点标识作为
Hader节点,并将tail 指向它
      if (compareAndSetHead(new Node()))
        tail = head;
     } else {// 正常流程,加入队列尾部
        node.prev = t;
          if (compareAndSetTail(t, node)) {
            t.next = node;
            return t;
       }
出队操作
同步队列(CLH)遵循FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒
它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点
Condition队列
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些
对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了
synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
     }
   }
 }

并发编程之Semaphore源码解析

3、等待队列线程如何阻塞的

  加入等待队列成功后,调用shouldParkAfterFailedAcquire方法把等待队列waitStatus置为-1(等待被通知执行状态),然后调用parkAndCheckInterrupt方法阻塞当前线程。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

4、release信号量时,又如何唤醒下一个线程的

  1、自旋为aqs的state值cas添加释放的信号量

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

  2、当前线程出队,next节点为head节点,并且唤醒next线程

private void doReleaseShared() {
//移除当前节点,设置next接单为head
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                   //唤醒next线程
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
unparkSuccessor---->LockSupport.unpark(s.thread);

并发编程之Semaphore源码解析