多线程之美7一ReentrantReadWriteLock源码分析
目录
前言
在多线程环境下,为了保证线程安全, 我们通常会对共享资源加锁操作,我们常用Synchronized关键字或者ReentrantLock 来实现,这两者加锁方式都是排他锁,即同一时刻最多允许一个线程操作,然而大多数场景中对共享资源读多于写,那么存在线程安全问题的是写操作(修改,添加,删除),我们是否应该考虑将读和写两个分开,只要运用合理,并发性能是不是可以提高,吞吐量增大呢? ReentrantReadWriteLock已经为我们实现了这种机制,我们一起来看它是怎样实现的吧!
1、读写锁的一些概念
在查看可重入读写锁的源码前,有几个概念需要先知道,对于后面理解源码很有帮助。
1、ReentrantReadWriteLock 内部 Sync类依然是继承AQS实现的,因此同步状态字段 state,依然表示对锁资源的占用情况。那么如何实现一个 int类型的state 同时来表示读写锁两种状态的占用情况呢? 这里实现非常巧妙,将4个字节的int类型, 32位拆分为2部分,高16位表示读锁的占用情况,低16位表示写锁的占用情况,这样读写锁互不影响,相互独立;也因此读写锁的最大值是2^16-1 = 65535,不能超过16位,下面源码有体现。
state值表示如图所示:
2、读锁是共享锁,只要不超过最大值,可多个线程同时获取; 写锁是排他锁,同一时刻最多允许一个线程获取。
写锁与其他锁都互斥,含写写互斥,写读互斥,读写互斥。
3、state可同时表示读写锁的状态,state的高16位表示获取读锁的线程数,读锁支持可重入,即一个线程也可多次获取读锁,怎么维护每个读锁线程的重入次数的? 每个线程有一个计数器 HoldCounter,用ThreadLocal来存放每个线程的计数器;state的低16位表示写锁的同步状态,因为写锁是排他锁,这里就不能表示获取写锁的线程数了,只能表示写锁的重入次数,获取写锁的线程可多次重复获取写锁(支持重入)。
读锁的计数器的实现原理如下:
可见ThreadLocalHoldCounter继承 ThreadLocal,每个获取读锁的线程是通过其本地变量来存储自己的计数器,来统计获取读锁的重入次数。ThreadLocal原理解析
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //重写了ThreadLocal的initialValue方法 public HoldCounter initialValue() { return new HoldCounter(); } }
4、state的高16位需要记录获取读锁的线程数,每增加一个线程获取读锁,在state的高16执行加1操作,即state+2^16,写锁增加重入次数,直接 state+1即可。
5、锁降级:获取写锁的线程,可以再次获取到读锁,即写锁降级为读锁。
? 读锁可以升级为写锁吗? 不可以,因为存在线程安全问题,试想获取读锁的线程有多个,其中一个线程升级为写锁,对临界区资源进行操作,比如修改了某个值,对其他已经获取读锁的线程不可见,出现线程安全问题。
代码演示:
1、读写状态
AQS(AbstractQueuedSynchronizer的简称)中同步状态字段 private volatile int state, int类型,4个字节,32位,拆分为高16位表示读状态,低16位表示写状态,如下定义了一些常量,实现获取读写锁的数量。
ReentrantReadWriteLock部分代码如下:
//分隔位数,16位 static final int SHARED_SHIFT = 16; //读锁加1的数量,1左位移16位, (16)0x10000 = (2)1000000000000000= (10) 65536 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读写锁的最大数量, (16)0xFFFFFFFF =(2)1111111111111111 =(10)65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //写锁的掩码,用于计算写锁重入次数时,将state的高16全部置为0, 等于(2)1111111111111111 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //获取读锁数,表示当前有多少个线程获取到读锁 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //获取写锁重入次数(不等于0表示有线程持有独占锁,大于1,表示写锁有重入) static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
分别看一下获取读写锁数量的方法。
获取占用读锁的线程数,代码如下:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
传入的c为 state,state 无符号右移16位,抹去低16位值,左边补0
示例图如下:
获取写锁的值的方法
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
与运算,将高16全部置为0,低16值代表写锁的值,&运算,相同为1,不同为0,得到低16位写锁值。
示例图如下:
2、三个锁概念
- int c =getState() ,获取state的值,代表同步锁状态,该值包含读写两个锁的同步状态
- int w = exclusiveCount(c); w代表写锁的同步状态,通过c获取到写锁的状态值
- int r = sharedCount(c); r 代表读锁的同步状态,通过c获取到读锁的状态值
以下分析三种情况下state,r, w 的值及代表的含义:
- 1、一个线程获取到写锁:
state =1, w =1, r =0
获取写锁加1操作就比较简单了,因为写锁是独占锁,与正常的ReentrantLock获取锁实现一样,占用state的低16位表示,不用看state的高16,左边补16位0。获取写锁一次,直接 c+1;
- 2、一个线程获取到读锁:
state =65536, w= 0, r=1
c初始为0 ,获取读锁,则读锁数量+1,执行 c + SHARED_UNIT, SHARED_UNIT = (2)1000000000000000 = (10)65536,括号内表示进制,SHARED_UNIT是每次读锁加1的数值。
如下图所示: 在获取读锁数量 r时,将state的低16位抹去,r=1,而state此时的值= 2^16 =65536,state的实际值可能会很大,但其实分别拆分读写锁的值不一定大,只是读锁值表示在高位,会造成state值很大。
- 3、一个线程获取到写锁,又获取到读锁情况(锁降级):
state = 65537,w=1, r=1
state二进制表示: 00000000 00000001 00000000 00000001
锁降级代码演示如下:
package readwritelock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author zdd * 2019/12/30 上午 * Description: 锁降级测试 */ public class ReadWriteLockTest { static Integer shareVar = 0; public static void main(String[] args) { ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); //1,首先获取写锁 rw.writeLock().lock(); //2.修改共享变量值 shareVar = 10 ; //3.再获取读锁 rw.readLock().lock(); System.out.println("读取变量值 shareVar:"+ shareVar); //4.释放写锁 rw.writeLock().unlock(); //5.释放读锁 rw.readLock().unlock(); } }
2、类结构和构造方法
ReentrantReadWriteLock 类中有ReadLock和WriteLock,分别对应读锁和写锁,而读写锁又分为公平方式和非公平方式获取锁。
简略类图结构如下:
构造方法如下:根据传入参数设置公平或者非公平获取锁方式,默认是非公平方式
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
3、写锁
由于写锁是独占锁,由于写锁是独占锁,获取写锁的方式在AQS中已经说过了,详见AQS源代码分析, 只是每个子类的尝试获取锁方式不同,所以ReentrantReadWriteLock类获取写锁过程就看一下尝试获取锁方法的源码。
3.1、尝试获取锁
tryAcquire(int acquires),获取锁失败则加入同步队列中等待获取锁,源代码如下:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //1,获取同步状态state的值,注意该值可表示读写锁的同步状态 int c = getState(); //2,获取写锁状态,低16位的值 int w = exclusiveCount(c); //3,如果同步锁状态不为0,有线程已经获取到了锁 if (c != 0) { //4,w==0则表示写锁为0,那么一定有线程获取了读锁,需要等待,读写互斥 //current != getExclusiveOwnerThread() 当前线程不等于已经获取到写锁的线程,则也需等待其释放,写写互斥 if (w == 0 || current != getExclusiveOwnerThread()) return false; //5,此时再次获取锁,判断锁重入次数是否超过最大限定次数 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //更新写锁重入次数 setState(c + acquires); return true; } //6,代码执行这,一定是c==0,同步锁空闲情况 //writerShouldBlock该方法是基于公平锁和非公平锁2种方式的体现 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //获取到锁,设置独占锁为当前写锁线程 setExclusiveOwnerThread(current); return true; }
写锁是否应该阻塞等待
- 1、 非公平锁方式
final boolean writerShouldBlock() { //直接返回false return false; // writers can always barge }
- 2、公平锁方式
需要判断同步队列中是否还有其他线程在挂起等待,如存在应该按照入队顺序获取锁
final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
public final boolean hasQueuedPredecessors() { //1.获取同步队列头,尾节点 Node t = tail; Node h = head; Node s; // h !=t 同步队列不为空 // 队列中还有其他线程在等待锁,则返回true return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
3.2、释放写锁
unlock方法释放锁
public void unlock() { sync.release(1); }
可见,调用内部类Sync的release方法,Sync继承AQS
public final boolean release(int arg) { if (tryRelease(arg)) { //1,释放锁成功 Node h = head; if (h != null && h.waitStatus != 0) //2.唤醒同步队列中等待线程 unparkSuccessor(h); return true; } return false; }
核心在尝试释放锁方法上,看看写锁的释放锁方法tryRelease
protected final boolean tryRelease(int releases) { //1,判断当前线程是否持有当前锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //2,同步状态 - 需要释放的写锁同步值 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) //3,free ==true,完全释放写锁,将当前获取独占锁线程置空 setExclusiveOwnerThread(null); //4,更新state值 setState(nextc); return free; }
注: 在释放写锁占用次数时, state的高16的读锁有值也不影响,减去releases,首先减去的state低位的数,而且在释放写锁时,state的低16位的值一定>=1,不存在减少读锁的值情况。
int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0;
也可改写为如下面代码
//1,获取state值 int c = getState(); //2,获取写锁的值 int w= exclusiveCount(c); int remain = w- releases; boolean free = remain== 0;
4、读锁
4.1、获取读锁
读锁调用lock方法加锁,实际调用Sync的acquireShared方法
public void lock() { sync.acquireShared(1); }
走进acquireShared,获取共享锁方法
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
尝试获取锁tryAcquireShared,如果返回值<0, 表示获取读锁失败
主要执行步骤:
1、首先判断是否存在其他线程在占用写锁,有需要挂起等待;
2、在不用阻塞等待,且读锁值没有超过最大值,cas更新成功了state的值,可以获取到读锁,还会做以下事:
? a. 第一个获取读锁的,直接记录线程对象和其重入获取读锁的次数
? b. 非第一个获取读锁的,则获取缓存计数器(cachedHoldCounter),其记录上一次获取读锁的线程,如果是同一个线程,则直接更新其计数器的重入次数,如果缓存计数器为空或缓存计数器的线程不是当前获取读锁的线程,则从当前线程本地变量中获取自己的计数器,更新计数器的值
protected final int tryAcquireShared(int unused) { //1,获取当前线程对象 Thread current = Thread.currentThread(); //2,获取同步锁的值 int c = getState(); /*3,exclusiveCount(c) != 0 计算写锁的同步状态,不等于0,说明有写锁已经获取到同步锁, *需要判断当前线程是否等于获取写锁线程, *是,可以允许再次获取读锁,这里涉及到锁降级问题,写锁可以降为读锁 *否则不让获取,写读互斥 */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //4,获取读锁同步状态 int r = sharedCount(c); /** *此处3个判断条件 * 1.是否应该阻塞等待,这里也是基于公平锁和非公平获取锁实现 * 2.读锁同步状态值是超过最大值,即限制获取读锁的最大线程数 * 3.cas更新读锁同步状态是否成功 */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //可以获取到读锁 //r==0表示是第一个获取读锁的线程 if (r == 0) { firstReader = current; //记录第一个线程读锁的重入次数 firstReaderHoldCount = 1; } else if (firstReader == current) { //是第一个获取读锁线程,锁重入,锁重入次数+1 firstReaderHoldCount++; } else { // 已有其他线程获取到读锁 /* *1,获取缓存记录的计数器,计数器是用来统计每一个获取读锁线程的重入次数的, *由每个线程的ThreadLocal,即线程内的副本存储,相互独立; *此处也不是放入缓存,在有多个线程同时获取读锁情况, *用一个变量记录上一个获取读锁的线程的计数器,可能考虑多次获取读锁线程大概率是同一个线程情况, *这样做是可提高执行效率 */ HoldCounter rh = cachedHoldCounter; // rh==null,第一个获取读锁,rh没有值 // 或者计数器存储的上一次线程的id与当前线程不等, 即不是相同一个线程, //那么就获取当前线程内部的计数器,并赋值给cachedHoldCounter变量,这样可以让下一次获取读锁线程获取比较了 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) /*进入该条件,我理解是在线程获取读锁再释放后,同一线程再次获取读锁情况, * 缓存计数器会记录上一个线程计数器,因为线程释放读锁后,count=0, * 这里重新将计数器放入线程内部中, * 因为线程在使用完线程内部变量后会防止内存泄漏,会执行remove,释放本地存储的计数器。 */ readHolds.set(rh); //计数器+1 rh.count++; } return 1; } //上面3个条件没有同时满足,没有成功获取到读锁,开始无限循环尝试去获取读锁 return fullTryAcquireShared(current); }
无限循环尝试获取共享锁 fullTryAcquireShared方法
主要执行步骤:
1、 如果有其他线程获取到了写锁,写读互斥,应该去挂起等待;
2、如果可以获取读锁,判断是否应该阻塞等待,在公平获取锁方式中,同步队列中有其他线程在等待,则应该去排队按照FIFO顺序获取锁,非公平获取锁方式,可以直接去竞争获取锁。
3、可以获取锁,则尝试cas更新state的值,更新成功,获取到锁。
final int fullTryAcquireShared(Thread current){ HoldCounter rh = null; //无限循环 for (;;) { //获取同步锁状态 int c = getState(); //判断写锁值不为0,且不是当前线程,不可获取读锁 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { //没有线程获取到写锁情况,公平获取锁情况, //同步队列中有其他线程等待锁,该方法主要是在需要排队等待,计数器重入次数==0情况,清除计数器 if (firstReader == current) { //此处firstReader !=null, 则第1个获取读锁的线程还没释放锁,可允许该线程继续重入获取锁 //计数器count一定>0 } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) //清除计数器 readHolds.remove(); } } // 为什么rh.count == 0就不让线程获取到锁了,基于公平获取锁方式,去同步队列中等待 if (rh.count == 0) return -1; } } //获取读锁线程超过最大限制值 65535 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // cas执行读锁值+1 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { //1,第一个获取读锁 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //2,第一个获取读锁重入 firstReaderHoldCount++; } else { //3,非第一个线程获取读锁,存在多个线程获取读锁 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; //缓存计数器变量记录此次获取读锁线程的计数器 cachedHoldCounter = rh; // cache for release } return 1; } } }
tryAcquireShared 返回< 0, 获取锁失败,执行 doAcquireShared
在获取读锁失败后,执行以下步骤:
1、将节点加入同步队列中
2、如果前置节点是头节点,将再次尝试获取锁,如果成功,设置当前节点为head节点,并根据tryAcquireShared方法的返回值r判断是否需要继续唤醒后继节点,如果 r大于0,需要继续唤醒后继节点,r=0不需要唤醒后继节点。
3、如果前置节点不是头节点,则在队列中找到安全位置,设置前置节点 ws=SIGNAL, 挂起等待。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //如果前继节点是头节点,再次尝试获取共享锁 int r = tryAcquireShared(arg); //r>=0,表示获取到锁, //r=0,表示不需要唤醒后继节点 //r>0,需要继续唤醒后继节点 if (r >= 0) { //该方法实现2个步骤 //1,设置当前节点为头节点 //2,r>0情况会继续唤醒后继节点 setHeadAndPropagate(node, r); //旧的头节点移出队列 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate 该方法是与独占锁获取锁的区别之处,获取到锁后,设置为头结点还需要继续传播下去。
private void setHeadAndPropagate(Node node, int propagate) { //记录是的旧的头节点 Node h = head; // Record old head for check //设置当前获取到锁节点为头节点 setHead(node); //propagate >0,表示还需要继续唤醒后继节点 //旧的头节点和新头节点为空,或者ws<0,满足条件之一,尝试去唤醒后继节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //后继节点为空或者是共享节点(获取读锁的线程) if (s == null || s.isShared()) doReleaseShared(); } }
doReleaseShared 方法较难理解,在释放锁中也有调用,留着后面一起分析。
4.2、释放读锁
public void unlock() { sync.releaseShared(1); }
AQS中释放共锁方法releaseShared
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
看一下读写锁具体实现tryReleaseShared 的方法
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //1,更新或者移出线程内部计数器的值 if (firstReader == current) { //当前线程是第一个获取读锁的线程 if (firstReaderHoldCount == 1) //直接置空 firstReader = null; else //该线程获取读锁重入多次,计数器-1 firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { //非第一个获取读锁线程,避免ThreadLocal内存泄漏,移出计数器 readHolds.remove(); if (count <= 0) //此处是调用释放锁次数比获取锁次数还多情况,直接抛异常 throw unmatchedUnlockException(); } --rh.count; } //2,循环cas更新同步锁的值 for (;;) { int c = getState(); //读锁同步状态-1 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //返回完全释放读锁,读锁值是否==0,完全释放,等待写锁线程可获取 return nextc == 0; } }
tryReleaseShared 返回true情况,表示完全释放读锁,执行doReleaseShared,那就需要唤醒同步队列中等待的其他线程
在读写锁中存在几种情况
情况一、如果当前获取锁的线程占用的是写锁,则后来无论是获取读锁还写锁的线程都会被阻塞在同步队列中,
同步队列是FIFO队列,在占用写锁的释放后,node1获取读锁,因读锁是共享的,继续唤醒后一个共享节点。
如上图,在node1获取到读锁时,会调用doReleaseShared方法,继续唤醒下一个共享节点node2,可以持续将唤醒动作传递下去,如果node2后面还存在几个等待获取读锁的线程,这些线程是由谁唤醒的?是其前置节点,还是第一个获取读锁的节点? 应该是第1个获取锁的节点,这里即node1, 由下代码可见,在无限循环中,只有头节点没有变化时,即再没其他节点获取到锁后,才会跳出循环。
private void doReleaseShared() { for (;;) { //获取同步队列中头节点 Node h = head; //同步队列中节点不为空,且节点数至少2个 if (h != null && h != tail) { int ws = h.waitStatus; //1,表示后继节点需要被唤醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继节点 unparkSuccessor(h); } //2,后继节点暂时不需要唤醒,设置节点 ws = -3, 确保后面可以继续传递下去 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果头节点发生变化,表示已经有其他线程获取到锁了,需要重新循环,确保可以将唤醒动作传递下去。 if (h == head) // loop if head changed break; } }
5、思考
1、在非公平获取锁方式下,是否存在等待获取写锁的线程始终获取不到锁,每次都被后来获取读锁的线程抢先,造成饥饿现象?
存在这种情况,从获取读锁源码中看出,如果第一个线程获取到读锁正在执行情况下,第二个等待获取写锁的线程在同步队列中挂起等待,在第一个线程没有释放读锁情况下,又陆续来了线程获取读锁,因为读锁是共享的,线程都可以获取到读锁,始终是在读锁没有释放完毕加入获取读锁的线程,那么等待获取写锁的线程是始终拿不到写锁,导致饥饿。为什么默认还是非公平模式?因为减少线程的上下文切换,保证更大的吞吐量。
6、总结
1、读写锁可支持公平和非公平两种方式获取锁。
2、支持锁降级,写锁可降级为读锁,但读锁不可升级为写锁。
3、大多数场景是读多于写的,所以ReentrantReadWriteLock 比 ReentrantLock(排他锁)有更好的并发性能和吞吐量。
4、读写锁中读锁和写锁都支持锁重入。
5、在获取Condition对象实现阻塞唤醒机制,ReentrantReadWriteLock.WriteLock 重写了 newCondition方法,ReadLock不支持,即读锁不支持与Condition配合使用,使用阻塞唤醒机制。