Chapter 3 API 代码记录
一、Master.java
package com.dayw.zk; import java.io.IOException; import java.util.Random; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Master implements Watcher { static final Logger LOG = LoggerFactory.getLogger(Master.class); ZooKeeper zk; static String hostPort; public Master(String host) { super(); this.hostPort = host; } void startZK(String host) { try { zk = new ZooKeeper(host, 15000, this); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } void stopZK() { try { zk.close(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void process(WatchedEvent e) { System.out.println(e); } Random ran = new Random(); String serverId = Integer.toHexString(ran.nextInt()); static boolean isLeader = false; StringCallback masterCreateCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: checkMaster(); return; case OK: isLeader = true; break; default: isLeader = false; break; } System.out.println("I'm " + (isLeader ? "" : "not ") + "the leader"); } }; DataCallback masterCheckCallback = new DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: checkMaster(); return; case NONODE: try { runForMaster(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return; default: break; } } }; void checkMaster() { zk.getData("/master", false, masterCheckCallback, null); /** * 同步 * while (true) { Stat stat = new Stat(); try { byte data[] = zk.getData("/master", false, stat); isLeader = new String(data).equals(serverId); return true; } catch (KeeperException e) { return false; } catch (InterruptedException e) { } } */ } void runForMaster() throws InterruptedException { /** 异步 */ zk.create("/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, masterCreateCallback, null); /** 同步 * while (true) { try { zk.create("/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); isLeader = true; break; } catch (NodeExistsException e) { isLeader = false; break; } catch (KeeperException e) { } if (checkMaster()) break; } */ } public void bootstrap() { createParent("/workers", new byte[0]); createParent("/assign", new byte[0]); createParent("/tasks", new byte[0]); createParent("/status", new byte[0]); } StringCallback createParentCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: createParent(path, (byte[]) ctx); break; case OK: LOG.info("Parent created"); break; case NODEEXISTS: LOG.warn("Parent already registerd: " + path); break; default: LOG.error("Something went wrong: " + KeeperException.create(Code.get(rc), path)); break; } } }; void createParent(String path, byte[] data) { zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data); } public static void main(String[] args) throws Exception { Master z = new Master("172.16.16.176,2181"); z.startZK(hostPort); z.runForMaster(); if (isLeader) { System.out.println("I'm the leader"); Thread.sleep(60000); } else { System.out.println("Someone else is the leader"); } z.stopZK(); } }
二、Worker.java
package com.dayw.zk; import java.io.IOException; import java.util.Random; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Worker implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(Worker.class); ZooKeeper zk; String hostPort; String status; String serverId = Integer.toHexString(new Random().nextInt()); public Worker(String hostPort) { this.hostPort = hostPort; } void startZK() throws IOException { zk = new ZooKeeper(hostPort, 1500, this); } @Override public void process(WatchedEvent event) { LOG.info(event.toString() + ", " + hostPort); } StringCallback createWorkerCallback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (Code.get(rc)) { case CONNECTIONLOSS: register(); break; case OK: LOG.info("Registered successfully:" + serverId); break; case NODEEXISTS: LOG.warn("Already registered:" + serverId); break; default: LOG.error("Something went wrong:" + KeeperException.create(Code.get(rc), path)); break; } } }; void register() { zk.create("/workers/worker-" + serverId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, createWorkerCallback, null); } /** setStatus */ StatCallback statusUpdateCallback = new StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { switch (Code.get(rc)) { case CONNECTIONLOSS: updateStatus("test" , (String)ctx); return; default: break; } } }; synchronized private void updateStatus(String name, String status) { if (status == this.status) { zk.setData("/workers" + name, status.getBytes(), -1, statusUpdateCallback, status); } } public void setStatus(String status) { this.status = status; updateStatus("test", status); } public static void main(String[] args) throws Exception { Worker w = new Worker(args[0]); w.startZK(); w.register(); Thread.sleep(30000); } }
三、Client.java
package com.dayw.zk; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Client implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(Client.class); ZooKeeper zk; String hostPort; Client(String hostPort) { this.hostPort = hostPort; } void startZK() throws Exception { zk = new ZooKeeper(hostPort, 15000, this); } String queueCommand(String command) throws KeeperException { String name = null; while (true) { try { name = zk.create("/tasks/task-", command.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return name; } catch (NodeExistsException e) { LOG.error(name + " already appears to be running"); } catch (ConnectionLossException e) { } catch (InterruptedException e) { // TODO: handle exception } } } @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub } }
四、AdminClient.java
package com.dayw.zk; import java.util.Date; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AdminClient implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(AdminClient.class); ZooKeeper zk; String hostPort; AdminClient(String hostPort) { this.hostPort = hostPort; } void start() throws Exception { zk = new ZooKeeper(hostPort, 15000, this); } void listState() throws KeeperException, InterruptedException { /** master */ try { Stat stat = new Stat(); byte masterData[] = zk.getData("/master", false, stat); Date startDate = new Date(stat.getCtime()); System.out.println("Master: " + new String(masterData) + " since " + startDate); } catch (NoNodeException e) { System.out.println("No Master"); } /** Workers */ System.out.println("Workers:"); for (String w : zk.getChildren("/workers", false)) { byte data[] = zk.getData("/workers/" + w, false, null); String state = new String(data); System.out.println("\t" + w + ": " + state); } /** Tasks */ System.out.println("Tasks:"); for (String t : zk.getChildren("/assign", false)) { System.out.println("\t" + t); } } @Override public void process(WatchedEvent event) { System.out.println(event); } public static void main(String args[]) throws Exception { AdminClient c = new AdminClient(args[0]); c.start(); c.listState(); } }
相关推荐
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26
梦的天空 2020-08-25