Redis结合AQS实现Java版的可重入分布式锁

Redis结合AQS实现Java版的可重入分布式锁

前言

对于java的单进程应用来说,存在资源竞争的场景可以使用synchronized关键字和Lock来对资源进行加锁,使整个操作具有原子性。但是对于多进程或者分布式的应用来说,上面提到的锁不共享,做不到互相通讯,所以就需要分布式锁来解决问题了。

废话不多说,直接进入正题,下面结合AQS和Redis来实现分布式锁。

代码中大部分都是参考ReentrantLock来实现的,所以读者可以先去了解一下ReentranLock和AQS

参阅:

http://www.importnew.com/27477.html

http://cmsblogs.com/?p=2210

加锁

@Override

protected boolean tryAcquire(int acquires) throws AcquireLockTimeoutException {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (!hasQueuedPredecessors() &&

compareAndSetState(0, 1)) { // 标注1

setExclusiveOwnerThread(current);

// 如果是线程被中断失败的话,返回false,如果超时失败的话,捕获异常

return tryAcquireRedisLock(TimeUnit.MILLISECONDS.toNanos(redisLockTimeout));

}

//可重入

} else if (current == getExclusiveOwnerThread()) { //标注2

int nextc = c + acquires;

if (nextc < 0) {

throw new Error("Maximum lock count exceeded");

}

setState(nextc);

return true;

}

return false;

}

下面会把进程内的锁称为进程锁,如果有更专业的描述方法的话,欢迎指出。

对上面的步骤分析:

1. 首先看标注1,通过compareAndSetState获取到进程锁,只有获取到进程锁,才有资格去竞争redis锁, 这样的好处就是对于同一个进程里面的所有加锁请求,在某一个时刻只有一个请求能去请求获取redis锁,有效降低redis的压力,总的来说就是把部分竞争交给进程自己去解决了,也就是先竞争进程锁。

2. 再看标注2,能进行到这一步,首先能确保已经获取了进程锁,但是是否一定获取了redis锁了呢,不一定,所以在tryAcquireRedisLock的过程中如果抛出异常,一定要保证使用finally代码块把进程锁释放掉,避免误以为已经同时获取了进程锁和redis锁。

获取redis锁

private final boolean tryAcquireRedisLock(long nanosTimeout) {

if (nanosTimeout <= 0L) {

return false;

}

final long deadline = System.nanoTime() + nanosTimeout;

int count = 0;

boolean interrupted = false;

Jedis jedis = null;

try {

jedis = redisHelper.getJedisInstance();

while (true) {

nanosTimeout = deadline - System.nanoTime();

if (nanosTimeout <= 0L) {

throw new AcquireLockTimeoutException();

}

String value = String.format(valueFormat, Thread.currentThread().getId());

//避免系统宕机锁不释放,设置过期时间

String response = jedis.set(lockKey, value, NX, PX, redisLockTimeout);

if (OK.equals(response)) {

//如果线程被中断同时也是失败的

return !interrupted;

}

// 超过尝试次数

if (count > RETRY_TIMES && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD && parkAndCheckInterrupt()) {

interrupted = true;

}

count++;

}

} finally {

redisHelper.returnResouce(jedis);

}

}

final boolean parkAndCheckInterrupt() {

LockSupport.parkNanos(TimeUnit.NANOSECONDS.toNanos(PARK_TIME));

return Thread.interrupted();

}

分析:

1. 为了避免获取redis锁的过程无休止的运行下去,使用超时策略,如果超时了,直接返回失败

2. 如果还在有效时间内,则通过自旋不断尝试获取锁,如果超过了尝试次数,暂时挂起,让出时间片,但是不可以挂起太长的时间,几个时间片内为好。

解锁

//RedisDistributedLock.java

@Override

public void unlock() {

sync.unlock();

}

//Sync.java

public void unlock() {

release(1);

}

@Override

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

if (c == 0) {

Jedis jedis = null;

try {

jedis = redisHelper.getJedisInstance();

String value = String.format(valueFormat, Thread.currentThread().getId());

jedis.eval(UNLOCK_SCRIPT, Arrays.asList(lockKey), Arrays.asList(value));

} finally {

redisHelper.returnResouce(jedis);

}

free = true;

setExclusiveOwnerThread(null);

}

setState(c);

return free;

}

分析:

1. 可以注意到value在加锁和解锁的过程都有,这个value是用来标识锁的唯一性的,避免别的进程误删了该锁。

private final UUID uuid = UUID.randomUUID();

private final String valueFormat = "%d:" + uuid.toString();

验证

@Override

public void run() {

SqlSession session = MybatisHelper.instance.openSession(true);

try {

KeyGeneratorMapper generatorMapper = session.getMapper(KeyGeneratorMapper.class);

KeyFetchRecordMapper recordMapper = session.getMapper(KeyFetchRecordMapper.class);

while (true) {

try {

lock.lock();

KeyGenerator keyGenerator = generatorMapper.select(1);

if (keyGenerator.getKey() >= MAX_KEY) {

System.exit(0);

}

recordMapper.insert(new KeyFetchRecord(keyGenerator.getKey(), server));

generatorMapper.increase(1, 1);

session.commit();

} catch (RuntimeException e) {

e.printStackTrace();

continue;

} finally {

lock.unlock();

}

}

} finally {

session.close();

}

}

开启5个进程,每个进程5个线程,进行获取一个key值,获取到后加1,然后记录到数据库,这个过程不要是原子的,因为把没有原子性的过程变成有原子性的过程,才证明了这个锁的有效性。

结果如下

Redis结合AQS实现Java版的可重入分布式锁
Redis结合AQS实现Java版的可重入分布式锁

没有重复的key,成功!

详细实现

https://github.com/dhhua/common-util/tree/master/util-lock

相关推荐