中间件 - ZooKeeper应用场景实践
注:该文章用作回顾记录
一、准备工作
预先下载安装 ZooKeeper ,简单配置就能使用了。然后构建 Maven 项目,将下面的代码粘贴到 pom.xml中:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.5</version> </dependency>
zkclient 是开源的客户端工具,其中封装了很多功能,比如:删除包含子节点的父节点,连接重试,异步回调,偏向 Java 写法的注册监听等,极大地方便了用户使用。
下面不过多介绍客户端操作,只针对应用场景做介绍,该文章会随着本人的学习持续补充。
二、数据发布/订阅
使用 ZooKeeper 节点监听来实现该功能:
ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 连接集群 zkClient.createPersistent("/xxx/xxx"); // 创建持久节点 // 注册子节点变更监听,当子节点改变(比如创建了"/xxx/xxx/1")或当前节点删除等,会触发异步回调 zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { } });
下面为部分源码:
package org.I0Itec.zkclient; public class ZkClient implements Watcher { public List<String> watchForChilds(final String path) { return retryUntilConnected(new Callable<List<String>>() { @Override public List<String> call() throws Exception { exists(path, true); try { return getChildren(path, true); } catch (ZkNoNodeException e) { } return null; } }); } public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { final long operationStartTime = System.currentTimeMillis(); while (true) { if (_closed) { throw new IllegalStateException("ZkClient already closed!"); } try { return callable.call(); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } } } }
基于 ZooKeeper 实现的数据发布/订阅很简单吧,快动手试试。
三、分布式锁
这部分是 ZooKeeper 重要功能,在此基础上实现诸如,分布式协调/通知,负载均衡,Master选举等复杂场景。
1、排它锁
排它锁又称为写锁或独占锁。比如事务 T1 对数据对象 O1 加了排它锁,那么在整个加锁期间,只允许 T1 对 O1 进行读取或更新操作,其它事务都不能对 O1 操作。
1)获取锁
所有客户端都创建临时节点 zkClient.createEphemeral("/xxx/xxx", null);
,ZooKeeper 会保证在所有客户端中,最终只有一个客户端能创建成功,那么就认为该客户端获取了锁。同时,所有没获取到锁的客户端需在/xxx/xxx
上注册子节点变更监听,以便实时监听节点变化。如节点发生变化,则未获取到锁的客户端再重新获取锁。
private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String lockParentPath = "/zk-book/exclusice_lock"; public static void main(String[] args) throws InterruptedException { try { zkClient.createEphemeral(lockParentPath + "/lock"); System.out.println("service3 获取锁成功"); } catch (Exception e) { System.out.println("service3获取锁失败"); zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println("service3再次获取锁"); main(null); } }); } Thread.sleep(Integer.MAX_VALUE); }
2)释放锁
当 "/xxx/xxx"
是临时节点时,以下俩种情况都会释放锁。
- 当已获取锁的客户机宕机,导致连接超时断开,那么 ZooKeeper 会将临时节点删除。
- 正常执行完逻辑后,客户端主动将临时节点删除。
2、共享锁
共享锁又称为读锁。如果事务 T1 对数据对象 O1 加了共享锁,那么 T1 只能对 O1 进行读取操作,其它事务只能对 O1 加共享锁,直到 O1 上所有共享锁都被释放。
1)获取锁
所有客户端都创建临时顺序节点 zkClient.createEphemeralSequential("/xxx/xxx", null);
,ZooKeeper 会生成类似下面的节点,已保证节点的唯一性。
2)判断读写顺序
- 创建完临时顺序节点后,获取
"/xxx"
下的所有子节点,并对该节点注册子节点变更监听。 - 确定创建完的临时顺序节点在所有节点中的顺序。
- 对于读节点:
没有比自己序号小的节点,或比自己序号小的节点都是读节点,则成功获取到共享锁。
如果比自己序号小的节点中存在写节点,则需进入等待。
对于写节点:
如果自己不是序号最小的节点,则需进入等待。 - 接受到子节点变更通知后,重复步骤1
以下为实现代码:
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分布式共享锁 * @author alexnevsky * @date 2018年5月23日 */ public class SharedLock { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static volatile boolean isExecuted = false; public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null); String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1); List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); if (currentChilds.size() > 0) isExecuted = getLockAndExecute(currentChilds, node); zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if (currentChilds.size() > 0) { currentChilds = sortNodes(currentChilds); isExecuted = getLockAndExecute(currentChilds, node); } } }); while (!isExecuted) { Thread.sleep(Integer.MAX_VALUE); } } /** * 排序节点 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List<String> sortNodes(List<String> nodes) { Collections.sort(nodes, new Comparator<String>() { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号 } }); return nodes; } /** * 获取节点位置 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @return */ private static Integer getNodePosition(List<String> nodes, String node) { for (int i = 0, size = nodes.size(); i < size; i++) { if (nodes.get(i).equals(node)) return i; } return null; // 无此数据 } /** * 是否得到锁 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @param nodePosition * @return */ private static boolean isGetLock(List<String> nodes, String node, int nodePosition) { if (nodePosition == 0) // 没有比此序号更小的节点 return true; if (node.indexOf("r-") > -1) { // 读节点 for (int i = 0; i < nodePosition; i++) { // 遍历小于次序号的节点 String nodeTemp = nodes.get(i); if (nodeTemp.indexOf("w-") > -1) // 存在写节点,则进入等待锁 return false; } return true; } return false; } /** * 获取锁并执行 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLockAndExecute(List<String> currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) // 子节点为空 return false; System.out.println("子节点:" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition); boolean isGetLock = isGetLock(currentChilds, node, nodePosition); if (isGetLock) { System.out.println(node + " 成功获取到锁,开始执行耗时任务"); doSomething(); boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node); if (isSuccess) System.out.println(node + " 成功执行完任务并删除节点"); } else { System.out.println(node + " 未获取到锁"); } return isGetLock; } private static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
测试以上代码会发现,当获取锁的节点过多时,某一节点变更会通知所有节点,会对 ZooKeeper 服务器造成巨大的性能影响和网络冲击,服务器会发送给客户端大量的事件通知。比如有以下节点,当 w-24 节点变更时,会通知给其余节点。
因为当获取共享锁时,要判断比自己序号小的节点,所以应该只给 r-25 节点发送通知。针对此情况,改进后判断读写顺序为:
- 创建完临时顺序节点后,获取
"/xxx"
下的所有子节点。 - 客户端调用 getChildren() 来获取子节点列表,注意,这里不注册任何监听。
- 如果未获取到共享锁,那么找到比自己序号小的节点来注册监听,分为以下俩种情况:
读节点:比自己序号小的最后一个写节点注册监听
写节点:比自己序号小的最后一个节点注册监听 - 等待监听通知,重复步骤2
改进后的共享锁代码实现:
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分布式共享锁最优 * @author alexnevsky * @date 2018年5月23日 */ public class SharedLockOptimal { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null); public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1); boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node); System.out.println("当前所有节点:" + currentChilds.toString() + ", 该" + (isReadNode ? "读" : "写") + "节点:" + node); if (isGetLock) { execute(node); System.out.println("退出程序"); System.exit(1); } else { String monitorNode = getMonitorNode(currentChilds, node); System.out.println(node + " 未获取到锁,注册监听节点:" + monitorNode); if (null != monitorNode) { zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { main(null); // 递归调用 } }); } Thread.sleep(Integer.MAX_VALUE); } } /** * 排序节点 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List<String> sortNodes(List<String> nodes) { Collections.sort(nodes, new Comparator<String>() { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号 } }); return nodes; } /** * 获取节点位置 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static Integer getNodePosition(List<String> currentChilds, String node) { for (int i = 0, size = currentChilds.size(); i < size; i++) { if (currentChilds.get(i).equals(node)) return i; } return null; } /** * 获取监听节点 * @author alexnevsky * @date 2018年5月25日 * @param currentChilds * @param node * @return */ private static String getMonitorNode(List<String> currentChilds, String node) { String monitorNode = null; Integer nodePosition = getNodePosition(currentChilds, node); if (0 < nodePosition) { // 非首节点 if (node.indexOf("r-") > -1) { // 读节点 // 获取比当前序号小的最后一个写节点 for (int i = nodePosition - 1; i >= 0; i--) { String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) return tempNode; } } else { // 获取比当前序号小的最后一个节点 return currentChilds.get(nodePosition - 1); } } return monitorNode; } /** * 获取锁 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLock(List<String> currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) return false; if (nodePosition == 0) // 无序号更小的节点 return true; if (node.indexOf("r-") > -1) { // 读节点 for (int i = 0; i < nodePosition; i++) { // 遍历前面序号的节点 String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) // 存在写节点,返回失败 return false; } return true; } return false; } /** * 执行 * @author alexnevsky * @date 2018年5月24日 * @param node * @return */ private static void execute(String node) { System.out.println(node + " 成功获取到锁,开始执行耗时任务"); doSomething(); boolean isDeletedLock = zkClient.delete(nodeFullPath); System.out.println(node + " 成功执行完任务,删除节点" + (isDeletedLock ? "成功" : "失败")); } /** * 模拟耗时任务 * @author alexnevsky * @date 2018年5月25日 */ public static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }