Semaphore源码分析
Semaphore介绍
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,他通过协调各个线程,以保证合理的使用公共资源。
Semaphore的简单示例
使用Semaphore来简单模拟数据库连接池
public class Pool {//可同时访问资源的最大线程数 private static final int MAX_AVAILABLE = 100; //信号量 表示:可获取的对象通行证 private final Semaphore available = new Semaphore(MAX_AVAILABLE,true); //共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是连接池 protected Object[] items = new Object[MAX_AVAILABLE]; //共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false protected boolean[] used = new boolean[MAX_AVAILABLE]; /** * 获取一个空闲对象 * 如若无空闲对象则等待,直到有空闲对象为止 */ public Object getItem() throws InterruptedException{available.acquire(); return getNextAvailableItem(); }/** * 获取池中的一个空闲对象,获取成功后返回Object,失败返回null * 成功后将对应的used[i] 设置为true */ private synchronized Object getNextAvailableItem() {for (int i = 0; i < MAX_AVAILABLE; i++) {if(!used[i]) {used[i] = true; return items[i]; } }return null; }/** * 归还对象到池中 */ public void putItem(Object x) {if (markAsUnused(x))available.release(); }/** * 归还对象到池中,归还成功返回true * 归还失败: * 1、池中不存在该对象的引用,返回false * 2、池中含有该对象的引用,但该对象目前状态为空闲状态,也返回false */ private synchronized boolean markAsUnused(Object item) {for (int i = 0; i < MAX_AVAILABLE; i++) {if(item == items[i]) {if(used[i]) {used[i] = false; return true; }else {return false; } } }return false; }}
Semaphore的源码分析
1、Semaphore的构造方法
/** * permits:通行证的个数 * fair: 是否公平的获取锁 */public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);} /** * 默认是非公平锁 */public Semaphore(int permits) {sync = new NonfairSync(permits);}
2、Semaphore公平锁模式下的acquire()方法
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits);} /** * 共享式地获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待, * 在同一时刻可以有多个线程获取到同步状态,该方法可以响应中断 */public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//条件成立:说明当前调用acquire方法的线程 已经是 中断状态了,直接抛出异常.. if (Thread.interrupted())throw new InterruptedException(); //如果获取同步状态失败,返回值小于0 if (tryAcquireShared(arg) < 0) //可共享的中断模式下尝试获取锁 doAcquireSharedInterruptibly(arg);} /** * 尝试获取通行证,获取成功返回 >= 0的值; * 获取失败 返回 < 0 值 */protected int tryAcquireShared(int acquires) {for (;;) {// //判断当前 AQS 阻塞队列内 是否有等待者线程,如果有直接返回-1,表示当前aquire操作的线程需要进入到队列等待.. if (hasQueuedPredecessors())return -1; //执行到这里,有哪几种情况? //1.调用aquire时 AQS阻塞队列内没有其它等待者 //2.当前节点 在阻塞队列内是headNext节点 //获取state ,state这里表示 通行证 int available = getState(); //remaining 表示当前线程 获取通行证完成之后,semaphore还剩余数量 int remaining = available - acquires; //条件一:remaining < 0 成立,说明线程获取通行证失败.. //条件二:前置条件,remaning >= 0, CAS更新state 成功,说明线程获取通行证成功,CAS失败,则自旋。 if (remaining < 0 || compareAndSetState(available, remaining))return remaining; }} /** * Acquires in shared interruptible mode. * @param arg the acquire argument */private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//将调用semaphore.await()方法的线程 包装成node加入到 AQS的阻塞队列当中。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try {for (;;) {//获取当前线程节点的前驱节点 final Node p = node.predecessor(); //条件成立,说明当前线程对应的节点 为 head.next节点 if (p == head) {int r = tryAcquireShared(arg); //说明还有剩余的通行证 if (r >= 0) {//设置当前节点为 head节点,并且向后传播!(依次唤醒!) setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } }//shouldParkAfterFailedAcquire 会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 signal(-1),返回true //parkAndCheckInterrupt 挂起当前节点对应的线程... if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())throw new InterruptedException(); } } finally {//如果发生了中断,取消该节点竞争共享锁if (failed) cancelAcquire(node); }} /** * 设置当前节点为 head节点,并且向后传播!(依次唤醒!) */private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //将当前节点设置为 新的 head节点。 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {//获取当前节点的后继节点.. Node s = node.next; //条件一:s == null 什么时候成立呢? 当前node节点已经是 tail了,条件一会成立。 doReleaseShared() 里面会处理这种情况.. //条件二:前置条件,s != null , 要求s节点的模式必须是 共享模式。 latch.await() -> addWaiter(Node.SHARED) if (s == null || s.isShared())//基本上所有情况都会执行到 doReleasseShared() 方法。 doReleaseShared(); }} /** * 唤醒获取资源失败的线程 * * Semaphore版本 * 都有哪几种路径会调用到doReleaseShared方法呢? * 1、semaphore.release() -> sync.releaseShared(1) -> tryReleaseShared() -> doReleaseShared * 2、被唤醒的线程 ->doAcquireSharedInterruptibly() parkAndCheckInterrupt() 唤醒 -> setHeadAndPropagate() -> doReleaseShared() */private void doReleaseShared() {for (;;) {//获取当前AQS 内的 头结点 Node h = head; //条件一:h != null 成立,说明阻塞队列不为空.. //条件二:h != tail 成立,说明当前阻塞队列内,除了head节点以外 还有其他节点。 if (h != null && h != tail) {//执行到if里面,说明当前head 一定有 后继节点! int ws = h.waitStatus; //当前head状态 为 signal 说明 后继节点并没有被唤醒过呢... if (ws == Node.SIGNAL) {//唤醒后继节点前 将head节点的状态改为 0 //这里为什么,使用CAS呢? 回头说... //当doReleaseShared方法 存在多个线程 唤醒 head.next 逻辑时, //CAS 可能会失败... if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases //唤醒后继节点 unparkSuccessor(h); }else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS } //false 没有人执行setHead(node)方法,有可能是没有线程需要唤醒了,或者唤醒线程已经更新过了head节点,那么执行自旋,在唤醒接下来的节点 //true 唤醒线程还没有执行