Redis结合AQS实现Java版的可重入分布式锁
前言
对于java的单进程应用来说,存在资源竞争的场景可以使用synchronized关键字和Lock来对资源进行加锁,使整个操作具有原子性。但是对于多进程或者分布式的应用来说,上面提到的锁不共享,做不到互相通讯,所以就需要分布式锁来解决问题了。
废话不多说,直接进入正题,下面结合AQS和Redis来实现分布式锁。
代码中大部分都是参考ReentrantLock来实现的,所以读者可以先去了解一下ReentranLock和AQS
参阅:
http://www.importnew.com/27477.html
http://cmsblogs.com/?p=2210
加锁
@Override
protected boolean tryAcquire(int acquires) throws AcquireLockTimeoutException {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, 1)) { // 标注1
setExclusiveOwnerThread(current);
// 如果是线程被中断失败的话,返回false,如果超时失败的话,捕获异常
return tryAcquireRedisLock(TimeUnit.MILLISECONDS.toNanos(redisLockTimeout));
}
//可重入
} else if (current == getExclusiveOwnerThread()) { //标注2
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
下面会把进程内的锁称为进程锁,如果有更专业的描述方法的话,欢迎指出。
对上面的步骤分析:
1. 首先看标注1,通过compareAndSetState获取到进程锁,只有获取到进程锁,才有资格去竞争redis锁, 这样的好处就是对于同一个进程里面的所有加锁请求,在某一个时刻只有一个请求能去请求获取redis锁,有效降低redis的压力,总的来说就是把部分竞争交给进程自己去解决了,也就是先竞争进程锁。
2. 再看标注2,能进行到这一步,首先能确保已经获取了进程锁,但是是否一定获取了redis锁了呢,不一定,所以在tryAcquireRedisLock的过程中如果抛出异常,一定要保证使用finally代码块把进程锁释放掉,避免误以为已经同时获取了进程锁和redis锁。
获取redis锁
private final boolean tryAcquireRedisLock(long nanosTimeout) {
if (nanosTimeout <= 0L) {
return false;
}
final long deadline = System.nanoTime() + nanosTimeout;
int count = 0;
boolean interrupted = false;
Jedis jedis = null;
try {
jedis = redisHelper.getJedisInstance();
while (true) {
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
throw new AcquireLockTimeoutException();
}
String value = String.format(valueFormat, Thread.currentThread().getId());
//避免系统宕机锁不释放,设置过期时间
String response = jedis.set(lockKey, value, NX, PX, redisLockTimeout);
if (OK.equals(response)) {
//如果线程被中断同时也是失败的
return !interrupted;
}
// 超过尝试次数
if (count > RETRY_TIMES && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD && parkAndCheckInterrupt()) {
interrupted = true;
}
count++;
}
} finally {
redisHelper.returnResouce(jedis);
}
}
final boolean parkAndCheckInterrupt() {
LockSupport.parkNanos(TimeUnit.NANOSECONDS.toNanos(PARK_TIME));
return Thread.interrupted();
}
分析:
1. 为了避免获取redis锁的过程无休止的运行下去,使用超时策略,如果超时了,直接返回失败
2. 如果还在有效时间内,则通过自旋不断尝试获取锁,如果超过了尝试次数,暂时挂起,让出时间片,但是不可以挂起太长的时间,几个时间片内为好。
解锁
//RedisDistributedLock.java
@Override
public void unlock() {
sync.unlock();
}
//Sync.java
public void unlock() {
release(1);
}
@Override
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
Jedis jedis = null;
try {
jedis = redisHelper.getJedisInstance();
String value = String.format(valueFormat, Thread.currentThread().getId());
jedis.eval(UNLOCK_SCRIPT, Arrays.asList(lockKey), Arrays.asList(value));
} finally {
redisHelper.returnResouce(jedis);
}
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
分析:
1. 可以注意到value在加锁和解锁的过程都有,这个value是用来标识锁的唯一性的,避免别的进程误删了该锁。
private final UUID uuid = UUID.randomUUID();
private final String valueFormat = "%d:" + uuid.toString();
验证
@Override
public void run() {
SqlSession session = MybatisHelper.instance.openSession(true);
try {
KeyGeneratorMapper generatorMapper = session.getMapper(KeyGeneratorMapper.class);
KeyFetchRecordMapper recordMapper = session.getMapper(KeyFetchRecordMapper.class);
while (true) {
try {
lock.lock();
KeyGenerator keyGenerator = generatorMapper.select(1);
if (keyGenerator.getKey() >= MAX_KEY) {
System.exit(0);
}
recordMapper.insert(new KeyFetchRecord(keyGenerator.getKey(), server));
generatorMapper.increase(1, 1);
session.commit();
} catch (RuntimeException e) {
e.printStackTrace();
continue;
} finally {
lock.unlock();
}
}
} finally {
session.close();
}
}
开启5个进程,每个进程5个线程,进行获取一个key值,获取到后加1,然后记录到数据库,这个过程不要是原子的,因为把没有原子性的过程变成有原子性的过程,才证明了这个锁的有效性。
结果如下
没有重复的key,成功!
详细实现
https://github.com/dhhua/common-util/tree/master/util-lock