Zookeeper动态更新服务器地址

        利用Zookeeper可实现服务发现功能,通过在节点下面创建数据来动态的更新服务地址

当服务地址发生变化时,服务提供者更改子节点下面的数据来更改url,服务消费者Watcher检测节点的变化,就获取节点下面的数据并且更新服务器地址

当服务器中的一个节点宕机或者leader宕机,zookeeper会自动选取领导,然后提供服务,只要宕机的数量小于zookeeper 服务器数量的一半仍然能提供良好的服务

   Zookeeper动态更新服务器地址

  基于Zookeeper的服务注册与发现架构

<dependency>  
    <groupId>org.apache.zookeeper</groupId>  
    <artifactId>zookeeper</artifactId>  
    <version>3.4.7</version>  
</dependency>  
<dependency>  
    <groupId>com.github.sgroschupf</groupId>  
    <artifactId>zkclient</artifactId>  
    <version>0.1</version>  
</dependency>

 服务消费者代码

package com.zookeerper.find;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class AppClient{
    private String groupNode = "sgroup";
    private ZooKeeper zk;
    private Stat stat = new Stat();
    private volatile List<String> serverList;
    private String zookeeperServer = "127.0.0.1:2181,127.0.0.1:2182";
    
    /**
     * 连接zookeeper
     */
    public void connectZookeeper() throws Exception {
        zk = new ZooKeeper(zookeeperServer, 5000, new Watcher() {

            public void process(WatchedEvent event) {
                // 如果发生了"/sgroup"节点下的子节点变化事件, 更新server列表, 并重新注册监听
                if (event.getType() == EventType.NodeChildrenChanged
                        && ("/" + groupNode).equals(event.getPath())) {
                    try {
                        updateServerList();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        updateServerList();
    }

    /**
     * 更新server列表
     */
    private void updateServerList() throws Exception {
        List<String> newServerList = new ArrayList<String>();

        // 获取并监听groupNode的子节点变化
        // watch参数为true, 表示监听子节点变化事件.
        // 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
        List<String> subList = zk.getChildren("/" + groupNode, true);
        for (String subNode : subList) {
            // 获取每个子节点下关联的server地址
            byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);
            newServerList.add(new String(data, "utf-8"));
        }

        // 替换server列表
        serverList = newServerList;

        System.out.println("server list updated: " + serverList);
    }

    /**
     * client的工作逻辑写在这个方法中 此处不做任何处理, 只让client sleep
     */
    public void handle() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        AppClient ac = new AppClient();
        ac.connectZookeeper();

        ac.handle();
    }
}
服务提供者代码:
package com.zookeerper.find;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class AppServer {  
    
    private String groupNode = "sgroup";  
    private String subNode = "sub";
    
    /** 
     * 连接zookeeper 
     * @param address server的地址 
     */  
    public void connectZookeeper(String address) throws Exception {  
        
    ZooKeeper zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2182", 5000, new Watcher() {  
            public void process(WatchedEvent event) {  
               // 不做处理
            }
        });  
    
        // 在"/sgroup"下创建子节点  
        // 子节点的类型设置为EPHEMERAL_SEQUENTIAL, 表明这是一个临时节点, 且在子节点的名称后面加上一串数字后缀  
        // 将server的地址数据关联到新创建的子节点上  
        
        String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),   
            Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  
        System.out.println("create: " + createdPath);  
    }  
      
    /** 
     * server的工作逻辑写在这个方法中 
     * 此处不做任何处理, 只让server sleep 
     */
    public void handle() throws InterruptedException {  
        Thread.sleep(Long.MAX_VALUE);  
    }  
      
    public static void main(String[] args) throws Exception {    
        AppServer as = new AppServer();  
        as.connectZookeeper("asdfs3:8080");  
          
        as.handle();  
    }  
}

相关推荐