Java并发编程序列之JUC底层AQS(二)
Java并发编程序列之JUC底层AQS(二)
Hello,大家好,在上一篇文章中,作者简单的把Lock接口和AQS的API,以及关系大致说了一下,本文还是围绕AQS为话题(AQS是重中之重,这个搞明白了。后面JUC自己看源码都很Easy可以看懂),先具体到API大致说下API的对应关系,然后作者自己写俩自定义Lock说明问题,最后再详细讲解AQS提供的一些模板API具体怎么实现的.文章结构:
- AQS模板API的对应关系。
- 自定义俩Lock.
- AQS模板API实现.
1. AQS模板API的对应关系。
在上文的讲解Lock和AQS关系时,有一张图大致讲解了这些API的调用关系,本文作者决定再画一张,具体到AQS中具体API的对应关系:
图中可以看到绿色箭头表示独占性锁的实现逻辑,红色箭头表示共享式锁实现的逻辑,所谓的独占性锁意思就是,只要有一个线程拿到锁,其他线程全部T出去到队列等待。共享性锁就好理解了,一部分个性化(根据tryAcquire返回值决定)的线程可以拿到锁,没拿到的到队列。2. 自定义俩Lock.
好了,根据上面的知识,结合上一节讲解的各个模块API调用关系,作者不废话了,来,自定义一个独占性锁:任何情况下,只允许一个线程拿到锁!即使是自己也只能拿到一次!(不可重入!)
/** * Created by zdy on 17/12/22. */ public class OnlyOneLock implements Lock { private static class OnlyOneLockAQS extends AbstractQueuedSynchronizer{ //在该类内部调用State相关API,维护是否获取到锁的逻辑 @Override protected boolean tryAcquire(int arg) { //状态为0时,设置为1,表示当前线程获取到锁 if(compareAndSetState(0,1)){ //设置当前线程为获取到锁的线程 setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false; } @Override protected boolean tryRelease(int arg) { //如果状态为0,表示还没有线程获取到锁,你释放什么释放 if(getState()==0) {throw new IllegalMonitorStateException();} setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { //判断该线程是否被占有 return getState()==1; } } //讲所有锁的语音,直接调用aqs的api来实现 private final OnlyOneLockAQS aqs=new OnlyOneLockAQS(); @Override public void lock() { aqs.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { aqs.acquireInterruptibly(1); } @Override public boolean tryLock() { return aqs.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return aqs.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { //记住一定不能直接调用tyrRelease那一套API,因为release方法帮我们维护释放后的通知逻辑. aqs.release(1); } @Override public Condition newCondition() { return null; } }
好了,大家好好理解理解,其实也没那么难。可以看到。Lock中的实现API都是调用了同步器AQS的模板方法和我们实现的方法来实现锁的逻辑的。我们实现的try开头的那几个API根本不用管什么队列,什么通知逻辑。只需要管是否获取到锁的逻辑。是不是很神奇?这里强调一下,其实在Lock接口中只有tryLock()这个API会直接调用tryAcquire()这个我们实现的API之外,其他的API其实都是调用的AQS的模板方法,应为模板方法封装了很多复杂的队列通知等逻辑。
OK,废话少说,上面可以说是自己写了一个独占性的锁,永远只有一个线程可以获取到锁。下面再定义一个共享性的锁。来个简单点的。大家应该都知道限流,现在假如有个需求,一个应用级的限流(业务接口层面的),要求一个接口最后只能被5个线程并发访问,后续的线程再访问,直接返回,不做业务逻辑处理.(常用语秒杀业务中某个商品只有5个,那么有必要放很多请求进来吗?)
package com.zdy; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * Created by zdy on 17/12/22. */ public class ShareLock implements Lock { private class ShareLockAQS extends AbstractQueuedSynchronizer{ protected ShareLockAQS(Integer count) { super(); setState(count); } @Override protected int tryAcquireShared(int arg) { for (; ; ) { Integer state = getState(); Integer newCount = state - arg; if (newCount < 0 || compareAndSetState(state, newCount)) { return newCount; } } } @Override protected boolean tryReleaseShared(int arg) { for (; ; ) { //注意这里不能直接setState了,因为可能多个线程同时release Integer state = getState(); Integer newCount = state + arg; if (compareAndSetState(state,newCount)) { return true; } } } @Override protected boolean isHeldExclusively() { return getState()==0; } } private ShareLockAQS aqs=new ShareLockAQS(5); @Override public void lock() { aqs.acquireShared(1); } @Override public void lockInterruptibly() throws InterruptedException { aqs.acquireInterruptibly(1); } @Override public boolean tryLock() { return aqs.tryAcquireShared(1)>=0; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return aqs.tryAcquireSharedNanos(1,unit.toNanos(time)); } @Override public void unlock() { aqs.releaseShared(1); } @Override public Condition newCondition() { return null; } }
大致逻辑:内部维护了一个state,初始化为5,获取一个锁,减1,释放一个锁,+1。注意获取和释放的并发性!尤其是在释放时,释放失败了,一定要for(;;),切记!一句话说,就是释放必须是成功的.
下面来验证一下这个并发Lock:
其实思路还是比较清晰的。在web请求接口中每进来就去取锁。由于我们的锁最多只能取5次,所有当第6次请求进来后直接return "活动已经结束...",结果我就不演示了,这就是我们的限流Lock了。看,限流没想的那么高大上。当然了,我这里说的只是应用限流的一种,其实粗暴一点,直接用个Integer变量,加锁每次减一就可以了。后面JUC里面还有许多内置的工具类来提供给我们使用。说到限流这一块,其实应用层面接口的限流是最Low的,效果最不好旳,为什么呢?因为接口已经进入到容器了,对资源的消耗也是存在的。其实最标准的限流都是在网关或者接入层,比如Nginx层面的限流,效果比较显著。好了好了,又扯这么多。回到主题...3. AQS模板API实现.
通过上面的例子可以知道,其实我们想要实现自己的同步组件还是比较简单的。只需要写好AQS,而AQS中需要我们覆盖的几个方法只需要处理好拿到锁和没有拿到锁的逻辑,至于线程怎么维护,我们还是不清楚的,这一小节就带大家揭开这个神秘的面纱。先说简单独占Lock的,比如在我们重写了tryAcquire时,AQS的模板方法acquire内部会调用这个方法,然后维护队列逻辑。废话不多说直接上源码:(代码全为源码,注释为小弟所加)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先调用tryAcquire();如果拿到,代码直接跑完,不阻塞。如果没有拿到。调用addWaiter();把当前线程构建成一个Node加入到队列尾部。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; //Cas算法设置进去。 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
把当前线程加入到队列(FIFO)尾部后,调用acquireQueued();
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //parkAndCheckInterrupt这个API会Park当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这个API贼有意思,先检查当前线程在队列中的前一个线程是否为头结点,如果是就在尝试拿一次,没拿到的话会在parkAndCheckInterrupt时把自己给Park掉,Park大致可以理解为休眠(Sleep),这个时候,线程就老老实实的在队列里面等着,等什么呢?等获取锁的线程释放锁后通知它,它好继续在for循环里面获取锁。。我贴一下流程图。然后大家根据源码对着看一看。因为源码细节比较多。不可能一一讲解。
大家要明白,这个流程是独占式的队列流程。首节点永远为获取到锁的那个节点。
然后说下共享式的队列如何维护,还拿上面那个例子,支持并发5个线程获取到锁:
- 入队列:线程获取锁失败后,创建一个节点,并将节点添加到等待队列尾,然后将线程阻塞,等待唤醒;
- 唤醒:另一个线程释放锁,取队列的第一个节点,将节点对应线程唤醒;
- 出队列:唤醒后的线程将尝试获取锁,成功后将自己移出队列,同时判断是否任然存在空闲的锁,如果存在则继续唤醒下一个节点。
- 每次只会唤醒第一个节点,如果同时释放多个锁,后续的节点将由前面被唤醒的节点来唤醒,尽量减少数据竞争。
这一块的逻辑和独占Lock还是很大差别的,我就不和大家Show源码了。还是比较麻烦的。大致和独占Lock一样。只是队列的维护不一样,大家感兴趣自己看一看。顺便提一个小知识点:Sleep()和Object.wait()遇到Interrupt出异常。
LockSupport.park()遇到则会唤醒继续运行。
结语
好了,其实AQS这一块源码翻起来远不止这么多,我这里只是大致说了个主线,队列如何入队,如何唤醒,稍微说了下,感兴趣的同学可以再仔细琢磨,因为文字确实不太好跟源码。Have a good day .