zookeeper的两种分布式锁的源代码点评

zookeeper的两种分布式锁的源代码点评

自己实现锁的思想就是

所有分布式不好关的东西可以都注册到一个功能的中间件上,然后这个中间件进行统一汇集管理

对需要锁控制先后的线程先在执行前先建立一个标记性的节点,最后根据节点的顺序,决定线程执行的先后顺序(都在同一个zk上)

这也是zk的分布式锁原理

zk锁的源码(时序锁)

每个进程连接好zk之后就安照先后顺序创建节点,然后判断该节点是不是最先序列(最先创建),是的话就先执行,其他节点在其各自等待的节点(比自己小1的节点建立监听

,一旦自己监听的节点移除(锁释放),就会回调自己的process方法,自己多要做的事情(这时不在需要再获得锁,笫一次都依次建立好了时序锁))

 public void connectZookeeper() throws Exception {

        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {

            public void process(WatchedEvent event) {

                try {

                    // 连接建立时, 打开latch, 唤醒wait在该latch上的线程

                    if (event.getState() == KeeperState.SyncConnected) {

                        latch.countDown();////保证连接好了才往下走(倒计时为0开始执行)

                    }

                    // 发生了waitPath的删除事件

                    if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {

                        doSomething();

                    }

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        });

        // 等待连接建立

        latch.await();////保证连接好了才往下走

采用回调的方式实现所有锁依次执行

 // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法

                zk.getData(waitPath, true, new Stat());

当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是: 

1.建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT) 

2.每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号。 

3.在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。 

4.假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。 

5.当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。 

实现的分布式锁是严格的按照顺序访问的并发锁。

http://blog.csdn.net/a925907195/article/details/39639357

独占锁

都去创建一个同名锁,用好了后删除,之前为创建成功的开始有创建,未得到的等待(先判断自己是不是最小的),等待超时会包超时异常,但是会不断的迭代直到获得锁

  

private boolean waitForLock(String lower, long waitTime, TimeUnit unit,boolean isBreak) throws InterruptedException, KeeperException {  

        long beginTime = System.currentTimeMillis();  

        Stat stat = zk.exists(root + "/" + lower,true);  

        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听  

        if(stat != null){  

            logger.debug("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);  

            this.latch = new CountDownLatch(1);  

            this.latch.await(waitTime, unit);  

            if(this.latch.getCount() == 1){  

                 this.latch = null;  

                 if(isBreak){  

                     throw new LockException("申请锁资源默认时间是:"+defaultWaitTime+"ms,等待锁超时");  

                 }else{  

                     return false;  

                 }  

            }  

            this.latch = null;  

        }  

          

        //取出所有lockName的锁  

        List<String> lockObjNodes = this.getChildrenListSort();  

        logger.debug(myZnode + "==" + lockObjNodes.get(0));  

        if(myZnode.equals(root+"/"+lockObjNodes.get(0))){  

            //如果是最小的节点,则表示取得锁  

            return true;  

        }else{  

             //如果不是最小的节点,找到比自己小1的节点  

            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);  

            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);  

            long endTime = System.currentTimeMillis();  

            waitTime = waitTime - (endTime - beginTime) ;  

            if(waitTime <= 0) return false;  

            return waitForLock(lower, waitTime, unit, false);//等待锁  (迭代)

        }  

    }

http://blog.csdn.net/qq_18167901/article/details/50681429

相关推荐