zookeeper实战:ConfigServer代码样例
ConfigServer从业务需要上来说,作为一个全局配置管理中心,负责保存公用服务中的服务接口/server配置/通讯网关/全局权限等;对服务消费方而言,可以良好的解偶对硬编码配置的依赖,并可以实现服务的动态管理等。
系统配置数据普遍具有“类别多”,“数据小”,“非关系型”,并要求存储系统具备高效的存取能力和健壮性,本例使用zookeeper来简单实现此功能。如下代码仅作参考。
1. TestMain.java:测试引导类。
2. ConfigManager.java: 配置管理类,负责管理“服务类型”(serverType).可以通过向configManager提交“服务类型”,configManager将会把此类型交付给zookeeper持久存储,服务类型创建成功后,configServer实例就可以在此类型下挂载数据。
3. ConfigServer.java: 配置生成类,负责向zookeeper提交数据,不过只能在其所属的serverType下生成子节点,那么此子节点就可以挂载当前server的信息,此例中假定数据为 ip + port字符串。
4. ConfigClient.java:配置消费类,负责从zookeeper中获得其感兴趣的serverType的数据集合。每个client可以获得多种serverType数据。
因为zookeeper系统本身提供了watch机制,因为对数据的“异步获取”提供了有利的条件,同时因为zk对网络IO有较强的敏感性,watch事件本身也是“即发即失”,因为需要考虑client/manager/server三种角色中网络失效情况下的补救措施,或者“容忍”。
设计思路已经在代码注释中标明,如有不妥,请多多指教。
TestMain.java
package com.sample.zk.cs; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; /** * @author qing * 假设一种场景:服务A向ZK注册自己的服务信息,比如IP + Port;客户端B向ZK获取服务的列表,并使用服务。 * 比如CacheServer向zk注册ip和客户端port;其他client端可以向zk获取cacheserver的ip + port,以便此后建立链接。 */ public class TestMain { /** * @param args */ public static void main(String[] args) { String serverType = "cache-server"; //测试 try{ ConfigManager manager = new ConfigManager(true); manager.add(serverType); ConfigServer s1 = new ConfigServer(serverType); ConfigServer s2 = new ConfigServer(serverType,true); ConfigServer s3 = new ConfigServer(serverType); ConfigClient c1 = new ConfigClient(serverType); ConfigClient c2 = new ConfigClient(serverType); ConfigClient c3 = new ConfigClient(serverType); Thread.sleep(3000); System.out.println("+++++++++++++++++++++++++"); System.out.println("S1" + s1.getPath());//注意zk链接,是异步的,有可能此处为 null System.out.println("S2" + s2.getPath()); System.out.println("S3" + s3.getPath()); while(true){ System.out.println("-------------------------"); Set<String> l1 = c1.getServers(serverType);//结果可能为null if(l1 == null){ System.out.println("l1 is null..."); }else{ for(String path : l1){ System.out.println("l1:" + path); } } Set<String> l2 = c2.getServers(serverType); if(l2 == null){ System.out.println("l2 is null..."); }else{ for(String path : l2){ System.out.println("l2:" + path); } } Set<String> l3 = c3.getServers(serverType); if(l3 == null){ System.out.println("l3 is null..."); }else{ for(String path : l3){ System.out.println("l3:" + path); } } Thread.sleep(2000); } }catch(Exception e){ e.printStackTrace(); } } }
ConfigManager.java
package com.sample.zk.cs; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantLock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; /** * 负责管理所有的“serverType”,对于zk而言,负责创建/删除一级节点。每个一级节点表示一个“serverType”。 * 每个serverType都有多个子节点,子节点由configServer实例负责注册。 * @author qing */ public class ConfigManager { Set<String> serverTypes = new CopyOnWriteArraySet<String>(); private ZooKeeper zkClient; private ReentrantLock lock = new ReentrantLock();//同步锁,事实上本例可以不用。。仅供参考 // 当zk环境故障时,是否自动重连,自动重连就意味着开启守护线程检测zk环境, // 此方式适用于zk client不关心session过期,“session重建”带来的数据变更(例如临时节点)不会造成系统异常情况下 private boolean autoReconnected = false; private Thread thread = null; private Watcher dw = new InnerZK();// default watcher private boolean outdate = false; //数据是否过期,在autoReconnected情况下使用,如果没有采用“自动重连”,在session过期后,将不会重建session, //并把outdate标记为true public ConfigManager() { this(false); } /** * 首次链接必须正常,自动重连,将不会对“首次链接”起作用 * * @param autoReconneted */ public ConfigManager(boolean autoReconneted) { this.autoReconnected = autoReconneted; if (this.autoReconnected) { thread = new Thread(new FailureHandler()); thread.setDaemon(true); thread.start(); }else{ try { // 回话重建等异常行为 zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false); System.out.println("Reconnected success!..."); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } } public void add(String path) { serverTypes.add("/" + path); rebuild(); } public void remote(String path) { serverTypes.remove(path); } public Set<String> getServerTypes(){ return serverTypes; } public boolean isOutdate() { return outdate; } ////////////////////////////////////////////////inner work////////////////////////////////// /** * 创建所有serverType的跟节点,比如/cache-server,所有一级节点由此类统一负责创建。 */ private void rebuild() { lock.lock(); if (zkClient == null || !zkClient.getState().isConnected()) { return; } for (String path : serverTypes) { try { Stat stat = zkClient.exists(path, false); if (stat == null) { try { zkClient.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (NodeExistsException ne) { // 如果多个manager同时创建节点,可能会导致此异常,此处忽略它。 } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } /** * watcher,负责处理事件或者异步操作(本代码实例,未展示异步操作) * * @author qing * */ class InnerZK implements Watcher { public void process(WatchedEvent event) { // 如果是“数据变更”事件,忽略 if (event.getType() != EventType.None) { return; } // 如果是链接状态迁移 // 参见keeperState switch (event.getState()) { case SyncConnected: System.out.println("Connected..."); rebuild();// 每次重连,都检测一下数据状态。 outdate = false; break; case Expired: System.out.println("Expired..."); // session重建 outdate = true; break; // session过期 case Disconnected: // 链接断开,或session迁移 System.out.println("Connecting...."); break; case AuthFailed: if (autoReconnected && thread.isAlive()) { thread.interrupt(); } throw new RuntimeException("ZK Connection auth failed..."); default: break; } } } class FailureHandler implements Runnable { /** * zk故障担保线程,如果需要故障检测或者容错,请将此实例交付给单独线程执行 * 比如:因为网络问题,zk实例将可能长时间处于无法链接状态,或者其它异常,导致zk实例化出错等 */ public void run() { try { int i = 0; int l = 100;// 每次重建,将时间延迟100ms while (true) { System.out.println("Manager handler,running...tid: " + Thread.currentThread().getId()); if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) { lock.lock(); try { // 回话重建等异常行为 zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false); System.out.println("Reconnected success!..."); } catch (Exception e) { e.printStackTrace(); i++; Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次 } finally { lock.unlock(); } continue; } if(zkClient.getState().isConnected()){ Thread.sleep(3000);// 如果被“中断”,直接退出 i = 0; } } } catch (InterruptedException e) { System.out.println("Exit..."); if(zkClient != null){ try{ zkClient.close(); }catch(Exception ze){ ze.printStackTrace(); } } } } } }
ConfigServer.java
package com.sample.zk.cs; import java.util.Random; import java.util.concurrent.locks.ReentrantLock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; /** * configServer,负责向zk注册当前server的信息,可被configClient获得信息。 * @author qing * */ public class ConfigServer { private ZooKeeper zkClient; private String path; private String serverType;// 当前configServer的类型,我们假设一个configServer实例持有一种“serverType” private ReentrantLock lock = new ReentrantLock(); private boolean autoReconnected = false; private Thread thread = null; private Watcher dw = new InnerZK();// default watcher private boolean outdate = false; // 数据是否过期,在autoReconnected情况下使用,如果没有采用“自动重连”,在session过期后,将不会重建session,并把outdate标记为true // 控制首次访问 private Object tag = new Object(); private boolean init = false; public ConfigServer(String serverType) { this(serverType, false); } public ConfigServer(String serverType, boolean autoReconnected) { this.serverType = serverType; this.autoReconnected = autoReconnected; if (this.autoReconnected) { thread = new Thread(new FailureHandler()); thread.setDaemon(true);// thread.start(); } else { try { // 回话重建等异常行为 zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false); System.out.println("Reconnected success!..."); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } } public boolean isOutdate() { return outdate; } /** * 注册server信息,从zk角度来说,就是创建serverType的一个子节点。 * * @return */ private boolean register() { lock.lock(); init = false; try { Stat stat = zkClient.exists("/" + serverType, true);// 注册“父节点”watch,跟踪父节点的创建/删除 // 创建跟节点:/cache-server // 如果跟节点不存在,则等待configManager去创建,创建成功后,将会在下文的watch事件中创建此子节点。 if (stat == null) { return false; } // 创建临时子节点:/cache-server/cs01; Random r = new Random(); String data = "127.0.0.1:" + r.nextInt(65535);// tmp data,模拟一个ip + // port参数 path = zkClient.create("/" + serverType + "/id_", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Register path:" + path); init = true; synchronized (tag) { tag.notifyAll(); } } catch (NodeExistsException ne) { // ignore. } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return true; } public String getPath() { synchronized (tag) { while (!init) { try { tag.wait(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } return path; } class InnerZK implements Watcher { public void process(WatchedEvent event) { // 如果是“数据变更”事件 if (event.getType() != EventType.None) { switch (event.getType()) { // 如果其父节点(/serverType)被创建, // 此时configServer也开始注册其子节点信息,watcher在下文中SyncConnected中注册。 case NodeCreated: register(); break; case NodeDeleted: // 如果父节点被删除,那么此后子节点也将不复存在 path = null; register();// 注册watch,检测父节点/serverType再次创建。 break; default: break; } return; } // 如果是链接状态迁移 // 参见keeperState switch (event.getState()) { case SyncConnected: System.out.println("Connected..."); // 如果path == null,则表明是首次链接或者session重建。 if (path == null) { try { register();// 创建子节点,并对其父节点注册watch。 outdate = false; } catch (Exception e) { e.printStackTrace(); } } break; case Expired: System.out.println("Expired..."); outdate = true; init = true; synchronized (tag) { tag.notifyAll(); } break; // session过期 case Disconnected: // 链接断开,或session迁移 System.out.println("Connecting...."); break; case AuthFailed: init = true; synchronized (tag) { tag.notifyAll(); } if (autoReconnected && thread.isAlive()) { thread.interrupt(); } throw new RuntimeException("ZK Connection auth failed..."); default: break; } } } class FailureHandler implements Runnable { public void run() { try { int i = 0; int l = 10; while (true) { System.out.println("Server handler,running...tid: " + Thread.currentThread().getId()); if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) { lock.lock(); try { // 回话重建等异常行为 zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false); System.out.println("Reconnected success!..."); } catch (Exception e) { e.printStackTrace(); i++; Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次 } finally { lock.unlock(); } continue; } if (zkClient.getState().isConnected()) { Thread.sleep(3000);// 如果被“中断”,直接退出 i = 0; } } } catch (InterruptedException e) { System.out.println("Exit..."); if (zkClient != null) { try { zkClient.close(); } catch (Exception ze) { ze.printStackTrace(); } } } } } }
ConfigClient.java
package com.sample.zk.cs; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; /** * configServer注册的数据,configClient消费。设计思路和configServer一致。 * 针对client获取数据的方式很多,如下是2中思路 1) 使用zk watch,当数据变更时即使获取 2) 开启守护线程,间歇性读取 * 这两中方式各有优缺点,使用watch,间接的增加了zk环境的事件push的压力和“波及面”,当客户端为N,每个客户端wath的节点数为M * 那么在极端情况下,zk需要分发的watch个数为M*N,而且可能因为configServer的数据变更较多,导致watch处罚次数较多。 * 同时因为网络问题,client可能丢失某些事件而导致无法及时获取数据。 * * 如果使用2),直接避免了1)所带来的问题,但是因为间歇性的读取,可能导致zk数据变更无法被即使获得。同时还有其他的问题,比如如果当前 * client所关注的serverType集合较大,而且数据尺寸较大,可能会导致每次全量读取都会消耗较长的时间和网络IO,如果“间歇时间”较短 + * 数据较大, 也会对整个环境有很大影响。 * * 不过,此实例假设configServer所注册的数据较小,configClient与zk之间的网络情况较佳。因此我决定采取2) * * @author qing * */ public class ConfigClient { private ZooKeeper zkClient; // inner cache;key:serverType,value:serverList private Map<String, Set<String>> servers = new ConcurrentHashMap<String, Set<String>>(); // 当前configClient需要获取的数据分类。即当前client对何种serverType感兴趣 private Set<String> serverTypes = new HashSet<String>(); private Watcher dw = new InnerZK();// 只关注链接状态迁移事件,区别于configServer private ReentrantLock lock = new ReentrantLock(); // 对于首次链接,或者网络异常进行一次阻塞方式的数据同步,将阻塞其他线程对client的操作。 private Object tag = new Object(); private boolean init = false;// 是否已经初始化 private Thread thread = new DumpThread();// 数据同步线程 public ConfigClient(String... types) { if (types == null || types.length == 0) { throw new RuntimeException("ConfigClient,serverTypes cant be empty..please check!"); } for (String type : types) { if (type == null || type.isEmpty() || type.contains("/")) { System.out.println("ConfigClient,ignore :" + type); continue; } serverTypes.add(type); } thread.setDaemon(true); thread.start(); } /** * 获得指定serverType的节点数据 * * @param serverType * @return */ public Set<String> getServers(String serverType) { synchronized (tag) { while (!init) { try { // 阻塞直到成功,在链接异常时的dump期间,所有客户端访问需要阻塞;在dumpThread中dump,不会阻塞。 // 当然你可以设计为不阻塞。 // 不过需要注意“首次实例化一定要阻塞”,因为configClient实例化zk是在dumpThread中, // 如果执行new ConfigClient()之后,立即调用getServers方法,可能导致一个调用者获得空集合; tag.wait(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } if (servers.containsKey(serverType)) { return Collections.unmodifiableSet(servers.get(serverType)); } return null;// } public Set<String> getServerTypes() { return serverTypes; } // ///////////////////////////////////////////inner // work//////////////////////////// /** * 和zk同步数据 */ private void dump() { lock.lock(); try { for (String serverType : serverTypes) { dump(serverType); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 同步制定serverType的数据 * * @param serverType */ private void dump(String serverType) { lock.lock(); try { String parent = "/" + serverType; List<String> children = zkClient.getChildren(parent, false, null);// 注册watch if (children == null || children.isEmpty()) { return; } Set<String> snap = new HashSet<String>();; for (String path : children) { byte[] data = zkClient.getData(parent + "/" + path, false, null); snap.add(new String(data)); } servers.put(serverType, snap);// 直接替换 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } class InnerZK implements Watcher { public void process(WatchedEvent event) { // 如果是“数据变更”事件,不关注数据变更事件,事实上,我们也不会注册此类型事件 if (event.getType() != EventType.None) { return; } // 如果是链接状态迁移 // 参见keeperState switch (event.getState()) { case SyncConnected: System.out.println("Connected..."); init = false; dump();// 每次链接重建,都需要手动dump一下数据 init = true; synchronized (tag) { tag.notifyAll(); } break; case Expired: System.out.println("Expired..."); // 将在DumpThread中自动创建 break; // session过期 case Disconnected: // 链接断开,或session迁移 System.out.println("Connecting...."); break; case AuthFailed: init = true; synchronized (tag) { tag.notifyAll(); } if (thread.isAlive()) { thread.interrupt(); servers.clear(); } default: break; } } } class DumpThread extends Thread { @Override public void run() { try { Random r = new Random(); int i = 0; while (true) { System.out.println("Client handler,running...tid: " + Thread.currentThread().getId()); // 如果zk尚未实例化,或者链接异常 if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) { lock.lock(); try { // 回话重建等异常行为 zkClient = new ZooKeeper(Constants.connectString, 10000, dw, true); System.out.println("Reconnected success!..."); } catch (Exception e) { e.printStackTrace(); i++; // 惰性延迟,每失败一次,多休眠100ms Thread.sleep(2000 + i * 100); } finally { lock.unlock(); } continue; } if (zkClient.getState().isConnected()) { // 休眠,为了避免client网络“大规模”故障时,同时访问zk带来的性能波动 Thread.sleep(1000 + r.nextInt(6000)); dump(); i = 0;// reset } } } catch (InterruptedException e) { e.printStackTrace(); System.out.println("Exit..."); if (zkClient != null) { try { zkClient.close(); } catch (Exception ze) { ze.printStackTrace(); } } } } } }