Zookeeper是分布式环境下一个重要的组件

Zookeeper是分布式环境下一个重要的组件,因为它能在分布式环境下,给我带来很多便利,大大简化了分布式编程的复杂性,本篇散仙将给出一个模拟例子,来演示下如何使用Zookeeper的API编程,来完成分布式环境下配置的同步。大家都知道在一个中大型的规模的集群中,配置文件通常是必不可少的的东西,很多时候,我都需要将在Master上配置好的配置文件,给分发到各个Slave上,以确保整体配置的一致性,在集群规模小的时候我们可能简单的使用远程拷贝或复制即可完成,但是,当集群规模越来越大的时候,我们发现这种方式不仅繁琐,而且容易出错,最要命的是,以后如果改动配置文件的很少一部分的东西,都得需要把所有的配置文件,给重新远程拷贝覆盖一次,那么,怎样才能避免这种牵一发而动全身的事情呢?


事实上,利用Zookeeper,就能够很容易的,高可靠的帮我们完成这件事,我们只需要把配置文件保存在Zookeeper的znode里,然后通过Watch来监听数据变化,进而帮我们实现同步。一个简单的工作图如下所示:

Zookeeper是分布式环境下一个重要的组件

总结流程如下:

序号实现1启动ZK集群2客户端在ZK创建一个znode,并写入数据3启动各个Server上的Watcher,无限休眠4客户端更新znode里数据5Watcher的read方法发现数据更新,下拉至本地,更新本地数据


代码如下:

package com.sanjiesanxian;  
  
import java.util.concurrent.CountDownLatch;  
  
import org.apache.zookeeper.CreateMode;  
import org.apache.zookeeper.WatchedEvent;  
import org.apache.zookeeper.Watcher;  
import org.apache.zookeeper.ZooKeeper;  
import org.apache.zookeeper.ZooDefs.Ids;  
import org.apache.zookeeper.data.Stat;  
  
  
/*** 
 * Zookeeper实现分布式配置同步 
 *  
 * @author 秦东亮 
 *  
 * ***/  
public class SyscConfig   implements Watcher{  
      
    //Zookeeper实例  
    private ZooKeeper zk;  
    private CountDownLatch countDown=new CountDownLatch(1);//同步工具  
    private static final int TIMIOUT=5000;//超时时间  
    private static final String PATH="/sanxian";  
    public SyscConfig(String hosts) {  
           
    try{  
        zk=new ZooKeeper(hosts, TIMIOUT, new Watcher() {  
              
            @Override  
            public void process(WatchedEvent event) {  
                   
                if(event.getState().SyncConnected==Event.KeeperState.SyncConnected){  
                    //防止在未连接Zookeeper服务器前,执行相关的CURD操作  
                    countDown.countDown();//连接初始化,完成,清空计数器  
                }  
                  
            }  
        });  
          
    }catch(Exception e){  
        e.printStackTrace();  
    }  
    }  
      
      
      
    /*** 
     * 写入或更新 
     * 数据 
     * @param path 写入路径 
     * @param value 写入的值 
     * **/  
  public void addOrUpdateData(String path,String data)throws Exception {  
        
        
      Stat stat=zk.exists(path, false);  
      if(stat==null){  
            //没有就创建,并写入       
          zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
      System.out.println("新建,并写入数据成功.. ");  
      }else{    
          //存在,就更新  
          zk.setData(path, data.getBytes(), -1);  
          System.out.println("更新成功!");  
      }  
  }  
    
  /** 
   * 读取数据 
   * @param path 读取的路径 
   * @return 读取数据的内容 
   *  
   * **/  
  public String readData()throws Exception{  
        
      String s=new String(zk.getData(PATH, this, null));  
        
    return s;    
  }  
      
      
    /** 
     * 关闭zookeeper连接 
     * 释放资源 
     *  
     * **/  
    public void close(){  
          
        try{  
              
            zk.close();  
        }catch(Exception e){  
            e.printStackTrace();  
        }  
          
    }  
  
   
public static void main(String[] args)throws Exception {  
      
    SyscConfig conf=new SyscConfig("10.2.143.5:2181");  
       
      conf.addOrUpdateData(PATH, "修真天劫,九死一生。");  
      conf.addOrUpdateData(PATH, "圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁.");  
     conf.addOrUpdateData(PATH, "努力奋斗,实力才是王道! ");  
      
    //System.out.println("监听器开始监听........");  
    // conf.readData();  
    // Thread.sleep(Long.MAX_VALUE);  
    //conf.readData();  
    conf.close();  
      
}  
  
    @Override  
    public void process(WatchedEvent event){  
         try{  
        if(event.getType()==Event.EventType.NodeDataChanged){  
            System.out.println("变化数据:  "+readData());  
        }  
         }catch(Exception e){  
             e.printStackTrace();  
         }  
          
    }  
}  
package com.sanjiesanxian;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;


/***
 * Zookeeper实现分布式配置同步
 * 
 * @author 秦东亮
 * 
 * ***/
public class SyscConfig   implements Watcher{
	
	//Zookeeper实例
	private ZooKeeper zk;
	private CountDownLatch countDown=new CountDownLatch(1);//同步工具
	private static final int TIMIOUT=5000;//超时时间
	private static final String PATH="/sanxian";
	public SyscConfig(String hosts) {
		 
	try{
		zk=new ZooKeeper(hosts, TIMIOUT, new Watcher() {
			
			@Override
			public void process(WatchedEvent event) {
				 
				if(event.getState().SyncConnected==Event.KeeperState.SyncConnected){
					//防止在未连接Zookeeper服务器前,执行相关的CURD操作
					countDown.countDown();//连接初始化,完成,清空计数器
				}
				
			}
		});
		
	}catch(Exception e){
		e.printStackTrace();
	}
	}
	
	
	
	/***
	 * 写入或更新
	 * 数据
	 * @param path 写入路径
	 * @param value 写入的值
	 * **/
  public void addOrUpdateData(String path,String data)throws Exception {
	  
	  
	  Stat stat=zk.exists(path, false);
	  if(stat==null){
            //没有就创建,并写入		
		  zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
	  System.out.println("新建,并写入数据成功.. ");
	  }else{  
		  //存在,就更新
		  zk.setData(path, data.getBytes(), -1);
		  System.out.println("更新成功!");
	  }
  }
  
  /**
   * 读取数据
   * @param path 读取的路径
   * @return 读取数据的内容
   * 
   * **/
  public String readData()throws Exception{
	  
	  String s=new String(zk.getData(PATH, this, null));
	  
	return s;  
  }
	
	
	/**
	 * 关闭zookeeper连接
	 * 释放资源
	 * 
	 * **/
	public void close(){
		
		try{
			
			zk.close();
		}catch(Exception e){
			e.printStackTrace();
		}
		
	}

 
public static void main(String[] args)throws Exception {
	
	SyscConfig conf=new SyscConfig("10.2.143.5:2181");
	 
	  conf.addOrUpdateData(PATH, "修真天劫,九死一生。");
	  conf.addOrUpdateData(PATH, "圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁.");
	 conf.addOrUpdateData(PATH, "努力奋斗,实力才是王道! ");
	
	//System.out.println("监听器开始监听........");
	// conf.readData();
	// Thread.sleep(Long.MAX_VALUE);
	//conf.readData();
	conf.close();
	
}

	@Override
	public void process(WatchedEvent event){
		 try{
		if(event.getType()==Event.EventType.NodeDataChanged){
			System.out.println("变化数据:  "+readData());
		}
		 }catch(Exception e){
			 e.printStackTrace();
		 }
		
	}
}


模拟客户端输出如下:

      
//客户端监听代码  
SyscConfig conf=new SyscConfig("10.2.143.5:2181");  
       
      conf.addOrUpdateData(PATH, "修真天劫,九死一生。");  
      conf.addOrUpdateData(PATH, "圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁.");  
     conf.addOrUpdateData(PATH, "努力奋斗,实力才是王道! ");  
      
    //System.out.println("监听器开始监听........");  
    // conf.readData();  
    // Thread.sleep(Long.MAX_VALUE);  
    //conf.readData();  
    conf.close();  
//客户端监听代码
SyscConfig conf=new SyscConfig("10.2.143.5:2181");
	 
	  conf.addOrUpdateData(PATH, "修真天劫,九死一生。");
	  conf.addOrUpdateData(PATH, "圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁.");
	 conf.addOrUpdateData(PATH, "努力奋斗,实力才是王道! ");
	
	//System.out.println("监听器开始监听........");
	// conf.readData();
	// Thread.sleep(Long.MAX_VALUE);
	//conf.readData();
	conf.close();
更新成功!  
更新成功!  
更新成功!  
更新成功!
更新成功!
更新成功!


模拟服务端输出如下:

public static void main(String[] args)throws Exception {  
    //服务端监听代码  
    SyscConfig conf=new SyscConfig("10.2.143.36:2181");  
    //conf.addOrUpdateData(PATH, "");  
    System.out.println("模拟服务监听器开始监听........");  
     conf.readData();  
     Thread.sleep(Long.MAX_VALUE);  
    conf.close();  
      
}  
public static void main(String[] args)throws Exception {
	//服务端监听代码
	SyscConfig conf=new SyscConfig("10.2.143.36:2181");
	//conf.addOrUpdateData(PATH, "");
	System.out.println("模拟服务监听器开始监听........");
	 conf.readData();
	 Thread.sleep(Long.MAX_VALUE);
	conf.close();
	
}
模拟服务监听器开始监听........  
数据更新了:  修真天劫,九死一生。  
数据更新了:  圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁.  
数据更新了:  努力奋斗,实力才是王道!   
模拟服务监听器开始监听........
数据更新了:  修真天劫,九死一生。
数据更新了:  圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁.
数据更新了:  努力奋斗,实力才是王道!




至此,使用zookeeper来完成配置同步的服务就完成了,我们可以发现,使用zookeeper来编写分布式程序是非常简单可靠的。