ZookeeperNetflix Curator 使用
curator简介
Netflix curator 是Netflix公司开源的一个Zookeeper client library,用于简化zookeeper客户端编程,包含一下几个模块:
- curator-client - zookeeper client封装,用于取代原生的zookeeper客户端,提供一些非常有用的客户端特性
- curator-framework - zookeeper api的高层封装,大大简化zookeeper客户端编程,添加了例如zookeeper连接管理、重试机制等
- curator-recipes - zookeeper recipes 基于curator-framework的实现(除2PC以外)
maven dependency:
String path = "/test_path"; CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("test:2181").namespace("/test1") .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000)) .connectionTimeoutMs(5000).build(); //create a node client.create().forPath("/head", new byte[0]); //delete a node in background client.delete().inBackground().forPath("/head"); // create a EPHEMERAL_SEQUENTIAL client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]); // get the data client.getData().watched().inBackground().forPath("/test"); // check the path exits client.checkExists().forPath(path);
curator framework使用builder模式和类似nio的chain api,代码非常简洁
curator recipes 使用
InterProcessMutex
用途:进程间互斥锁
示例代码:
String lockName = "/lock1"; InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName); InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName); lock1.acquire(); boolean result = lock2.acquire(1, TimeUnit.SECONDS); assertFalse(result); lock1.release(); result = lock2.acquire(1, TimeUnit.SECONDS); assertTrue(result);
原理:每次调用acquire在/lock1节点节点下使用CreateMode.EPHEMERAL_SEQUENTIAL 创建新的ephemeral节点,然后getChildren获取所有的children,判断刚刚创建的临时节点是否为第一个,如果是,则获取锁成功;如果不是,则删除刚刚创建的临时节点。
注意: 每次accquire操作,成功,则请求zk server 2次(一次写,一次getChildren);如果失败,则请求zk server 3次(一次写,一次getChildren,一次delete)
InterProcessReadWriteLock
示例代码:
@Test public void testReadWriteLock() throws Exception{ String readWriteLockPath = "/RWLock"; InterProcessReadWriteLock readWriteLock1 = new InterProcessReadWriteLock(this.curator, readWriteLockPath); InterProcessMutex writeLock1 = readWriteLock1.writeLock(); InterProcessMutex readLock1 = readWriteLock1.readLock(); InterProcessReadWriteLock readWriteLock2 = new InterProcessReadWriteLock(this.curator, readWriteLockPath); InterProcessMutex writeLock2 = readWriteLock2.writeLock(); InterProcessMutex readLock2 = readWriteLock2.readLock(); writeLock1.acquire(); // same with WriteLock, can read assertTrue(readLock1.acquire(1, TimeUnit.SECONDS)); // different lock, can't read while writting assertFalse(readLock2.acquire(1, TimeUnit.SECONDS)); // different write lock, can't write assertFalse(writeLock2.acquire(1, TimeUnit.SECONDS)); // release the write lock writeLock1.release(); //both read lock can read assertTrue(readLock1.acquire(1, TimeUnit.SECONDS)); assertTrue(readLock2.acquire(1, TimeUnit.SECONDS)); } 原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。 注意: 同一个InterProcessReadWriteLock如果已经获取了write lock,则获取read lock也会成功 LeaderSelector 示例代码: [java] view plain copy @Test public void testLeader() throws Exception{ LeaderSelectorListener listener = new LeaderSelectorListener(){ @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println("i'm leader"); } @Override public void handleException(CuratorFramework client, Exception exception) { } @Override public void notifyClientClosing(CuratorFramework client) { }}; String leaderPath = "/leader"; LeaderSelector selector1 = new LeaderSelector(this.curator, leaderPath, listener); selector1.start(); LeaderSelector selector2 = new LeaderSelector(this.curator, leaderPath, listener); selector2.start(); assertFalse(selector2.hasLeadership()); }
原理:内部基于InterProcessMutex实现,具体细节参见shared lock一节
总结
curator还提供了很多其他的实现,具体参见https://github.com/Netflix/curator/wiki/Recipes