中间件 - ZooKeeper应用场景实践

注:该文章用作回顾记录

一、准备工作

预先下载安装 ZooKeeper ,简单配置就能使用了。然后构建 Maven 项目,将下面的代码粘贴到 pom.xml中:

<dependency>  
        <groupId>org.apache.zookeeper</groupId>  
        <artifactId>zookeeper</artifactId>  
        <version>3.4.5</version>  
    </dependency>  
    <dependency>  
        <groupId>com.101tec</groupId>  
        <artifactId>zkclient</artifactId>  
        <version>0.5</version>  
    </dependency>

zkclient 是开源的客户端工具,其中封装了很多功能,比如:删除包含子节点的父节点,连接重试,异步回调,偏向 Java 写法的注册监听等,极大地方便了用户使用。

下面不过多介绍客户端操作,只针对应用场景做介绍,该文章会随着本人的学习持续补充。

二、数据发布/订阅

使用 ZooKeeper 节点监听来实现该功能:

ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 连接集群

zkClient.createPersistent("/xxx/xxx"); // 创建持久节点

// 注册子节点变更监听,当子节点改变(比如创建了"/xxx/xxx/1")或当前节点删除等,会触发异步回调
zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() {
    @Override
    public void handleChildChange(String parentPath, List<String> currentChilds) 
        throws Exception {
    }
});

下面为部分源码:

package org.I0Itec.zkclient;

public class ZkClient implements Watcher {

    public List<String> watchForChilds(final String path) {
        return retryUntilConnected(new Callable<List<String>>() {
            @Override
            public List<String> call() throws Exception {
                exists(path, true);
                try {
                    return getChildren(path, true);
                } catch (ZkNoNodeException e) {
                }
                return null;
            }
        });
    }
    
    public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
        final long operationStartTime = System.currentTimeMillis();
        while (true) {
            if (_closed) {
                throw new IllegalStateException("ZkClient already closed!");
            }
            try {
                return callable.call();
            } catch (Exception e) {
                throw ExceptionUtil.convertToRuntimeException(e);
            }
        }
    }
}

基于 ZooKeeper 实现的数据发布/订阅很简单吧,快动手试试。

三、分布式锁

这部分是 ZooKeeper 重要功能,在此基础上实现诸如,分布式协调/通知,负载均衡,Master选举等复杂场景。

1、排它锁

排它锁又称为写锁或独占锁。比如事务 T1 对数据对象 O1 加了排它锁,那么在整个加锁期间,只允许 T1 对 O1 进行读取或更新操作,其它事务都不能对 O1 操作。

1)获取锁

所有客户端都创建临时节点 zkClient.createEphemeral("/xxx/xxx", null);,ZooKeeper 会保证在所有客户端中,最终只有一个客户端能创建成功,那么就认为该客户端获取了锁。同时,所有没获取到锁的客户端需在/xxx/xxx 上注册子节点变更监听,以便实时监听节点变化。如节点发生变化,则未获取到锁的客户端再重新获取锁。

private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
private static final String lockParentPath = "/zk-book/exclusice_lock";

public static void main(String[] args) throws InterruptedException {
    try {
        zkClient.createEphemeral(lockParentPath + "/lock");
        System.out.println("service3 获取锁成功");
    } catch (Exception e) {
        System.out.println("service3获取锁失败");
        zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                System.out.println("service3再次获取锁");
                main(null);
            }
        });
    }
    Thread.sleep(Integer.MAX_VALUE);
}

2)释放锁

"/xxx/xxx" 是临时节点时,以下俩种情况都会释放锁。

  1. 当已获取锁的客户机宕机,导致连接超时断开,那么 ZooKeeper 会将临时节点删除。
  2. 正常执行完逻辑后,客户端主动将临时节点删除。

中间件 - ZooKeeper应用场景实践

2、共享锁

共享锁又称为读锁。如果事务 T1 对数据对象 O1 加了共享锁,那么 T1 只能对 O1 进行读取操作,其它事务只能对 O1 加共享锁,直到 O1 上所有共享锁都被释放。

1)获取锁

所有客户端都创建临时顺序节点 zkClient.createEphemeralSequential("/xxx/xxx", null);,ZooKeeper 会生成类似下面的节点,已保证节点的唯一性。

中间件 - ZooKeeper应用场景实践

2)判断读写顺序

  1. 创建完临时顺序节点后,获取 "/xxx" 下的所有子节点,并对该节点注册子节点变更监听。
  2. 确定创建完的临时顺序节点在所有节点中的顺序。
  3. 对于读节点:
    没有比自己序号小的节点,或比自己序号小的节点都是读节点,则成功获取到共享锁。
    如果比自己序号小的节点中存在写节点,则需进入等待。
    对于写节点:
    如果自己不是序号最小的节点,则需进入等待。
  4. 接受到子节点变更通知后,重复步骤1

中间件 - ZooKeeper应用场景实践

以下为实现代码:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享锁
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLock {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static volatile boolean isExecuted = false;
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null);
        String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1);
        
        List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        if (currentChilds.size() > 0)
            isExecuted = getLockAndExecute(currentChilds, node);
        
        zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                if (currentChilds.size() > 0) {
                    currentChilds = sortNodes(currentChilds);
                    isExecuted = getLockAndExecute(currentChilds, node);
                }
            }
        });
        
        while (!isExecuted) {
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序节点
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List<String> sortNodes(List<String> nodes) {
        Collections.sort(nodes, new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号
            }
        });
        return nodes;
    }
    
    /**
     * 获取节点位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @return
     */
    private static Integer getNodePosition(List<String> nodes, String node) {
        for (int i = 0, size = nodes.size(); i < size; i++) {
            if (nodes.get(i).equals(node))
                return i;
        }
        return null; // 无此数据
    }
    
    /**
     * 是否得到锁
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @param nodePosition
     * @return
     */
    private static boolean isGetLock(List<String> nodes, String node, int nodePosition) {
        if (nodePosition == 0) // 没有比此序号更小的节点 
            return true;
        if (node.indexOf("r-") > -1) { // 读节点
            for (int i = 0; i < nodePosition; i++) { // 遍历小于次序号的节点
                String nodeTemp = nodes.get(i);
                if (nodeTemp.indexOf("w-") > -1)  // 存在写节点,则进入等待锁
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 获取锁并执行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLockAndExecute(List<String> currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null) // 子节点为空
            return false;
        System.out.println("子节点:" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition);
        boolean isGetLock = isGetLock(currentChilds, node, nodePosition);
        if (isGetLock) {
            System.out.println(node + " 成功获取到锁,开始执行耗时任务");
            doSomething();
            boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node);
            if (isSuccess)
                System.out.println(node + " 成功执行完任务并删除节点");
        } else {
            System.out.println(node + " 未获取到锁");
        }
        return isGetLock;
    }
    
    private static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

测试以上代码会发现,当获取锁的节点过多时,某一节点变更会通知所有节点,会对 ZooKeeper 服务器造成巨大的性能影响和网络冲击,服务器会发送给客户端大量的事件通知。比如有以下节点,当 w-24 节点变更时,会通知给其余节点。

中间件 - ZooKeeper应用场景实践

因为当获取共享锁时,要判断比自己序号小的节点,所以应该只给 r-25 节点发送通知。针对此情况,改进后判断读写顺序为:

  1. 创建完临时顺序节点后,获取 "/xxx" 下的所有子节点。
  2. 客户端调用 getChildren() 来获取子节点列表,注意,这里不注册任何监听。
  3. 如果未获取到共享锁,那么找到比自己序号小的节点来注册监听,分为以下俩种情况:
    读节点:比自己序号小的最后一个写节点注册监听
    写节点:比自己序号小的最后一个节点注册监听
  4. 等待监听通知,重复步骤2

中间件 - ZooKeeper应用场景实践

改进后的共享锁代码实现:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享锁最优
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLockOptimal {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null);
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1);
        
        boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node);
        System.out.println("当前所有节点:" + currentChilds.toString() + ", 该" + (isReadNode ? "读" : "写") + "节点:" + node);
        
        if (isGetLock) {
            execute(node);
            System.out.println("退出程序");
            System.exit(1);
        } else {
            String monitorNode = getMonitorNode(currentChilds, node);
            System.out.println(node + " 未获取到锁,注册监听节点:" + monitorNode);
            if (null != monitorNode) {
                zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() {
                    @Override
                    public void handleChildChange(String parentPath, List<String> currentChilds)
                            throws Exception {
                        main(null); // 递归调用
                    }
                });
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序节点
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List<String> sortNodes(List<String> nodes) {
        Collections.sort(nodes, new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号
            }
        });
        return nodes;
    }
    
    /**
     * 获取节点位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static Integer getNodePosition(List<String> currentChilds, String node) {
        for (int i = 0, size = currentChilds.size(); i < size; i++) {
            if (currentChilds.get(i).equals(node))
                return i;
        }
        return null;
    }
    
    /**
     * 获取监听节点
     * @author alexnevsky
     * @date 2018年5月25日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static String getMonitorNode(List<String> currentChilds, String node) {
        String monitorNode = null;
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (0 < nodePosition) { // 非首节点
            if (node.indexOf("r-") > -1) { // 读节点
                // 获取比当前序号小的最后一个写节点
                for (int i = nodePosition - 1; i >= 0; i--) {
                    String tempNode = currentChilds.get(i);
                    if (tempNode.indexOf("w-") > -1) 
                        return tempNode;
                }
            } else {
                // 获取比当前序号小的最后一个节点
                return currentChilds.get(nodePosition - 1);
            }
        }
        return monitorNode;
    }
    
    /**
     * 获取锁
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLock(List<String> currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null)
            return false;
        if (nodePosition == 0) // 无序号更小的节点 
            return true;
        if (node.indexOf("r-") > -1) { // 读节点
            for (int i = 0; i < nodePosition; i++) { // 遍历前面序号的节点
                String tempNode = currentChilds.get(i);
                if (tempNode.indexOf("w-") > -1)  // 存在写节点,返回失败
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 执行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param node
     * @return
     */
    private static void execute(String node) {
        System.out.println(node + " 成功获取到锁,开始执行耗时任务");
        doSomething();
        boolean isDeletedLock = zkClient.delete(nodeFullPath);
        System.out.println(node + " 成功执行完任务,删除节点" + (isDeletedLock ? "成功" : "失败"));
    }
    
    /**
     * 模拟耗时任务
     * @author alexnevsky
     * @date 2018年5月25日
     */
    public static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

相关推荐