【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理
前言
前面已经写了Redisson大多的内容,我们再看看Redisson官网共有哪些组件:
image.png
剩下还有Semaphore和CountDownLatch两块,我们就趁热打铁,赶紧看看Redisson是如何实现的吧。
我们在JDK中都知道Semaphore和CountDownLatch两兄弟,这里就不多赘述,不了解的可以再回头看看。
Semaphore使用示例
先看下Semaphore原理图如下:
image.png
接着我们看下Redisson中使用的案例:
RSemaphore semaphore = redisson.getSemaphore("semaphore");// 同时最多允许3个线程获取锁semaphore.trySetPermits(3);for(int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]尝试获取Semaphore锁"); semaphore.acquire(); System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]成功获取到了Semaphore锁,开始工作"); Thread.sleep(3000); semaphore.release(); System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]释放Semaphore锁"); } catch (Exception e) { e.printStackTrace(); } } }).start();}
Semaphore源码解析
接着我们根据上面的示例,看看源码是如何实现的:
第一步:
semaphore.trySetPermits(3);
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public boolean trySetPermits(int permits) { return get(trySetPermitsAsync(permits)); } @Override public RFuture<Boolean> trySetPermitsAsync(int permits) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call(‘get‘, KEYS[1]); " + "if (value == false or value == 0) then " + "redis.call(‘set‘, KEYS[1], ARGV[1]); " + "redis.call(‘publish‘, KEYS[2], ARGV[1]); " + "return 1;" + "end;" + "return 0;", Arrays.<Object>asList(getName(), getChannelName()), permits); }}
执行流程为:
- get semaphore,获取到一个当前的值
- 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3
- 然后发布一些消息,返回1
接着看看semaphore.acquire();
和semaphore.release();
逻辑:
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public RFuture<Boolean> tryAcquireAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can‘t be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(true); } return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call(‘get‘, KEYS[1]); " + "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + "local val = redis.call(‘decrby‘, KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), permits); } @Override public RFuture<Void> releaseAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can‘t be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(null); } return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call(‘incrby‘, KEYS[1], ARGV[1]); " + "redis.call(‘publish‘, KEYS[2], value); ", Arrays.<Object>asList(getName(), getChannelName()), permits); }}
先看看加锁的逻辑tryAcquireAsync()
:
- get semaphore,获取到一个当前的值,比如说是3,3 > 1
- decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
- decrby semaphore 1
- decrby semaphore 1
- 执行3次加锁后,semaphore值为0
此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁,如下图:
image.png
接着看看解锁逻辑releaseAsync()
:
- incrby semaphore 1,每次一个客户端释放掉这个锁的话,就会将信号量的值累加1,信号量的值就不是0了
看到这里大家就明白了了,Redisson实现Semaphore其实是很简单了
CountDownLatch使用示例
使用案例:
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");latch.trySetCount(3);System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]设置了必须有3个线程执行countDown,进入等待中。。。"); for(int i = 0; i < 3; i++) { new Thread(new Runnable() { @Override public void run() { try { System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]在做一些操作,请耐心等待。。。。。。"); Thread.sleep(3000); RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch"); localLatch.countDown(); System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]执行countDown操作"); } catch (Exception e) { e.printStackTrace(); } } }).start();}latch.await();System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]收到通知,有3个线程都执行了countDown操作,可以继续往下走");
CountDownLatch 源码解析
源码如下:
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch { @Override public RFuture<Boolean> trySetCountAsync(long count) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call(‘exists‘, KEYS[1]) == 0 then " + "redis.call(‘set‘, KEYS[1], ARGV[2]); " + "redis.call(‘publish‘, KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count); } @Override public RFuture<Void> countDownAsync() { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local v = redis.call(‘decr‘, KEYS[1]);" + "if v <= 0 then redis.call(‘del‘, KEYS[1]) end;" + "if v == 0 then redis.call(‘publish‘, KEYS[2], ARGV[1]) end;", Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage); }}
先分析trySetCount()
方法逻辑:
- exists anyCountDownLatch,第一次肯定是不存在的
- set redisson_countdownlatch__channel__anyCountDownLatch 3
- 返回1
接着分析latch.await();
方法,如下图:
image.png
这个方法其实就是陷入一个while true死循环,不断的get anyCountDownLatch的值,如果这个值还是大于0那么就继续死循环,否则的话呢,就退出这个死循环
最后分析localLatch.countDown();
方法:
- decr anyCountDownLatch,就是每次一个客户端执行countDown操作,其实就是将这个cocuntDownLatch的值递减1
await()
方面已经分析过,死循环去判断anyCountDownLatch对应存储的值是否为0,如果为0则接着执行自己的逻辑
总结
看到了这里 这两个组件是不是很简单?
到了这里,Redisson部分的学习都已经结束了,后面还会学习ZK实现分布式锁的原理。
申明
本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!
感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫