基于ZooKeeper的分布式锁 (二)

前一篇文章介绍了ZooKeeper实现简单的分布式锁算法,但是当并发量很高,锁冲突机率大的情况下这种算法会导致羊群效应。解决羊群问题的算法,其实Zookeeper官网就有:

  • Call create( ) with a pathname of “locknode/lock-“ and the sequence and ephemeral flags set.
  • Call getChildren( ) on the lock node without setting the watch flag (this is important to avoid the herd effect).
  • If the pathname created in step 1 has the lowest sequence number suffix, the client has the lock and the client exits the protocol.
  • The client calls exists( ) with the watch flag set on the path in the lock directory with the next lowest sequence number.
  • if exists( ) returns false, go to step 2. Otherwise, wait for a notification for the pathname from the previous step before going to step 2.

基本思想是利用ZooKeeper的临时节点和有序节点,每个客户端在znode节点下面创建一个唯一子节点,每个子节点都有一个唯一的序列号,然后约定具有最小序列号的客户端持有锁;未获得锁的客户端,只需要监听比它自己的序列号小的前一个子节点即可。这样当锁释放的时候只会有一个客户端被唤醒,避免了羊群效应的发生。

private static final String SEPARATOR = "/";
private static final String NODE_NAME = "lock-";
public String lock(String clientId, String resource) throws KeeperException, InterruptedException {
 this.ensureResource(clientId, resource);
 try{
 String path = zk.create(root + resource + SEPARATOR + NODE_NAME, clientId.getBytes(),
 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 this.waitUntilLocked(path, clientId, resource);
 return path;
 } catch(KeeperException e){
 if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
 return this.lockOnConnectionLoss(clientId, resource);
 } else {
 throw e;
 }
 }
}
private String lockOnConnectionLoss(String clientId, String resource) throws KeeperException, InterruptedException {
 List<String> pathes = this.getChildren(resource);
 String myPath = findMyNodeOnConnectionLoss(pathes, clientId);
 if (null == myPath) {
 return this.lock(clientId, resource);
 }
 this.waitUntilLocked(myPath, clientId, resource);
 return myPath;
}

resource参数是要加锁的资源标识,客户端会在它下面创建子节点。因为ZooKeeper不允许临时节点下面有子节点,所以ensureResource方法用于确保resource参数对应的znode节点必须存在。

lock方法里面创建的是一个EPHEMERAL_SEQUENTIAL的znode节点,它总是会成功的,所以不会发生NodeExistsException异常。不过链接丢失的情况依然需要单独处理,lockOnConnectionLoss方法会取出所有的子节点,然后判断前一次请求是否成功,如果没有成功的话,就进行重试;否则就进入下一步,判断自己是持有锁、还是等待。

private void waitUntilLocked(String myPath, String clientId, String resource)
 throws KeeperException, InterruptedException {
 while (true) {
 List<String> pathes = this.getChildren(resource);
 if (this.isLocked(myPath, pathes)) {
 return;
 } else {
 String target = this.findWatchPath(myPath, pathes, resource);
 this.watchAndWait(target);
 }
 }
}

waitUntilLocked方法的作用就是判断自己是加锁成功,还是等待。getChildren方法返回所有的子节点,isLocked方法判断当前客户端创建的子节点myPath是否具有最小的序列号,如果是的话表示当前客户端加锁成功,直接返回。否则的话就通过findWatchPath方法找到需要监视的子节点,即找到比当前客户端创建的子节点的序列号小的前一个子节点,然后阻塞并监听该节点的删除事件,watchAndWait方法与简单锁的listenLock完全一样,只是方法命名不同而已。

锁的释放相对于简单锁的实现更容易些,因为每个客户端创建的都是唯一的子节点,所以链接丢失时只需要进行重试即可,不需要额外的判断了。

public void release(String path) throws InterruptedException, KeeperException {
 try{
 zk.delete(path, -1);
 }
 catch(KeeperException e){
 if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
 this.release(path);
 } else {
 throw e;
 }
 }
}

下面给出其它几个方法的实现,方便大家查看:

private List<String> getChildren(String resource) throws KeeperException, InterruptedException {
 try{
 List<String> rlt = zk.getChildren(root + resource, false);
 Collections.sort(rlt, new Comparator<String>() {
 @Override
 public int compare(String o1, String o2) {
 int i1 = DistributedLock.this.parseSequence(o1);
 int i2 = DistributedLock.this.parseSequence(o2);
 return i1 - i2;
 }
 });
 return rlt;
 } catch(KeeperException e){
 if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
 return this.getChildren(resource);
 } else {
 throw e;
 }
 }
}
private boolean isLocked(String myPath, List<String> pathes) {
 int mySeq = this.parseSequence(myPath);
 int firstSeq = this.parseSequence(pathes.get(0));
 return mySeq == firstSeq;
}
private String findWatchPath(String myPath, List<String> pathes, String resource) {
 String prefix = root + resource + SEPARATOR;
 int start = prefix.length();
 String path;
 for (int i = pathes.size() -1; i >= 0; i--) {
 path = pathes.get(i);
 if (path.equals(myPath.substring(start))) {
 return prefix + pathes.get(i-1);
 }
 }
 throw new IllegalStateException();
}
private void watchAndWait(String target)
 throws KeeperException, InterruptedException {
 Semaphore s = new Semaphore(0);
 try{
 Stat stat = zk.exists(target, new Watcher() {
 @Override
 public void process(WatchedEvent event) {
 if (event.getType().equals(EventType.NodeDeleted)) {
 s.release();
 }
 }
 });
 if (null != stat) {
 s.acquire();
 }
 } catch(KeeperException e){
 if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
 this.watchAndWait(target);
 return;
 } else {
 throw e;
 }
 }
}
private String findMyNodeOnConnectionLoss(List<String> pathes, String clientId)
 throws KeeperException, InterruptedException {
 if (null == pathes || pathes.isEmpty()) {
 return null;
 }
 String path;
 for (int i = pathes.size() -1; i >= 0; i--) {
 path = pathes.get(i);
 if (this.isClientIdMatch(path, clientId)) {
 return path;
 }
 }
 return null;
}
private boolean isClientIdMatch(String path, String clientId) throws KeeperException, InterruptedException {
 try{
 Stat stat = new Stat();
 byte[] data = zk.getData(path, false, stat);
 if (clientId.equals(new String(data))) {
 return true;
 }
 return false;
 } catch(KeeperException e){
 if (e.code().equals(KeeperException.Code.NONODE)) {
 return false;
 } else if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
 return this.isClientIdMatch(path, clientId);
 } else {
 throw e;
 }
 }
}

读写锁

将上面的实现稍作修改,就可以实现读写锁,这里引用ZooKeeper官网的描述。

加读锁的算法:

  • Call create( ) to create a node with pathname “locknode/read-“. This is the lock node use later in the protocol. Make sure to set both the sequence and ephemeral flags.
  • Call getChildren( ) on the lock node without setting the watch flag - this is important, as it avoids the herd effect.
  • If there are no children with a pathname starting with “write-“ and having a lower sequence number than the node created in step 1, the client has the lock and can exit the protocol.
  • Otherwise, call exists( ), with watch flag, set on the node in lock directory with pathname staring with “write-“ having the next lowest sequence number.
  • If exists( ) returns false, goto step 2. Otherwise, wait for a notification for the pathname from the previous step before going to step 2

加写锁的算法:

  • Call create( ) to create a node with pathname “locknode/write-“. This is the lock node spoken of later in the protocol. Make sure to set both sequence and ephemeral flags.
  • Call getChildren( ) on the lock node without setting the watch flag - this is important, as it avoids the herd effect.
  • If there are no children with a lower sequence number than the node created in step 1, the client has the lock and the client exits the protocol.
  • Call exists( ), with watch flag set, on the node with the pathname that has the next lowest sequence number.
  • If exists( ) returns false, goto step 2. Otherwise, wait for a notification for the pathname from the previous step before going to step 2.

附上源码地址https://github.com/OuYangLiang/ZK-lock

相关推荐