Semaphore源码分析

Semaphore介绍

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,他通过协调各个线程,以保证合理的使用公共资源。

Semaphore的简单示例

使用Semaphore来简单模拟数据库连接池

public class Pool {//可同时访问资源的最大线程数    private static final int MAX_AVAILABLE = 100;    //信号量 表示:可获取的对象通行证    private final Semaphore available = new Semaphore(MAX_AVAILABLE,true);    //共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是连接池    protected Object[] items = new Object[MAX_AVAILABLE];    //共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false    protected boolean[] used =  new boolean[MAX_AVAILABLE];    /**     * 获取一个空闲对象     * 如若无空闲对象则等待,直到有空闲对象为止     */    public Object getItem() throws InterruptedException{available.acquire();        return getNextAvailableItem();    }/**     * 获取池中的一个空闲对象,获取成功后返回Object,失败返回null     * 成功后将对应的used[i] 设置为true     */    private synchronized Object getNextAvailableItem() {for (int i = 0; i < MAX_AVAILABLE; i++) {if(!used[i]) {used[i] = true;                return items[i];            }        }return null;    }/**     * 归还对象到池中     */    public void putItem(Object x) {if (markAsUnused(x))available.release();    }/**     *  归还对象到池中,归还成功返回true     *  归还失败:     *  1、池中不存在该对象的引用,返回false     *  2、池中含有该对象的引用,但该对象目前状态为空闲状态,也返回false     */    private synchronized boolean markAsUnused(Object item) {for (int i = 0; i < MAX_AVAILABLE; i++) {if(item == items[i]) {if(used[i]) {used[i] = false;                    return true;                }else {return false;                }            }        }return false;    }}

 Semaphore的源码分析

 1、Semaphore的构造方法

/** *  permits:通行证的个数 *  fair: 是否公平的获取锁 */public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}

/** *  默认是非公平锁 */public Semaphore(int permits) {sync = new NonfairSync(permits);}
 

 2、Semaphore公平锁模式下的acquire()方法

public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();    sync.acquireSharedInterruptibly(permits);}
/** *  共享式地获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待, *  在同一时刻可以有多个线程获取到同步状态,该方法可以响应中断 */public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//条件成立:说明当前调用acquire方法的线程 已经是 中断状态了,直接抛出异常..    if (Thread.interrupted())throw new InterruptedException();    //如果获取同步状态失败,返回值小于0    if (tryAcquireShared(arg) < 0)     //可共享的中断模式下尝试获取锁        doAcquireSharedInterruptibly(arg);}
/** * 尝试获取通行证,获取成功返回 >= 0的值; * 获取失败 返回 < 0 值 */protected int tryAcquireShared(int acquires) {for (;;) {//  //判断当前 AQS 阻塞队列内 是否有等待者线程,如果有直接返回-1,表示当前aquire操作的线程需要进入到队列等待..        if (hasQueuedPredecessors())return -1;        //执行到这里,有哪几种情况?        //1.调用aquire时 AQS阻塞队列内没有其它等待者        //2.当前节点 在阻塞队列内是headNext节点        //获取state ,state这里表示 通行证        int available = getState();        //remaining 表示当前线程 获取通行证完成之后,semaphore还剩余数量        int remaining = available - acquires;        //条件一:remaining < 0 成立,说明线程获取通行证失败..        //条件二:前置条件,remaning >= 0, CAS更新state 成功,说明线程获取通行证成功,CAS失败,则自旋。        if (remaining < 0 ||            compareAndSetState(available, remaining))return remaining;    }}
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//将调用semaphore.await()方法的线程 包装成node加入到 AQS的阻塞队列当中。    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {for (;;) {//获取当前线程节点的前驱节点            final Node p = node.predecessor();            //条件成立,说明当前线程对应的节点 为 head.next节点            if (p == head) {int r = tryAcquireShared(arg);                //说明还有剩余的通行证                if (r >= 0) {//设置当前节点为 head节点,并且向后传播!(依次唤醒!)                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    failed = false;                    return;                }            }//shouldParkAfterFailedAcquire  会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 signal(-1),返回true            //parkAndCheckInterrupt 挂起当前节点对应的线程...            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())throw new InterruptedException();        }    } finally {//如果发生了中断,取消该节点竞争共享锁if (failed)            cancelAcquire(node);    }}
/** * 设置当前节点为 head节点,并且向后传播!(依次唤醒!) */private void setHeadAndPropagate(Node node, int propagate) {    Node h = head; // Record old head for check below    //将当前节点设置为 新的 head节点。    setHead(node);    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {//获取当前节点的后继节点..        Node s = node.next;        //条件一:s == null  什么时候成立呢?  当前node节点已经是 tail了,条件一会成立。 doReleaseShared() 里面会处理这种情况..        //条件二:前置条件,s != null , 要求s节点的模式必须是 共享模式。 latch.await() -> addWaiter(Node.SHARED)        if (s == null || s.isShared())//基本上所有情况都会执行到 doReleasseShared() 方法。            doReleaseShared();    }}
 /** * 唤醒获取资源失败的线程 * * Semaphore版本 * 都有哪几种路径会调用到doReleaseShared方法呢? * 1、semaphore.release() -> sync.releaseShared(1) -> tryReleaseShared() -> doReleaseShared * 2、被唤醒的线程 ->doAcquireSharedInterruptibly() parkAndCheckInterrupt() 唤醒 -> setHeadAndPropagate() -> doReleaseShared() */private void doReleaseShared() {for (;;) {//获取当前AQS 内的 头结点        Node h = head;        //条件一:h != null 成立,说明阻塞队列不为空..        //条件二:h != tail 成立,说明当前阻塞队列内,除了head节点以外  还有其他节点。        if (h != null && h != tail) {//执行到if里面,说明当前head 一定有 后继节点!            int ws = h.waitStatus;            //当前head状态 为 signal 说明 后继节点并没有被唤醒过呢...            if (ws == Node.SIGNAL) {//唤醒后继节点前 将head节点的状态改为 0                //这里为什么,使用CAS呢? 回头说...                //当doReleaseShared方法 存在多个线程 唤醒 head.next 逻辑时,                //CAS 可能会失败...                                   if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases                //唤醒后继节点                unparkSuccessor(h);            }else if (ws == 0 &&                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS        }     //false 没有人执行setHead(node)方法,有可能是没有线程需要唤醒了,或者唤醒线程已经更新过了head节点,那么执行自旋,在唤醒接下来的节点     //true  唤醒线程还没有执行