Zookeeper入门实战
Zookeeper是一个为分布式应用提供一致性协调服务的中间件,主要用来解决分布式应用中经常遇到的一些一致性问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。本文主要包括Zookeeper简介、安装、命令行操作、java操作Zookeeper等,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、junit 4.13、Centos 7.6。
1、简介
1.1、设计目标
ZooKeeper is simple.
ZooKeeper is replicated.
ZooKeeper is ordered.
ZooKeeper is fast.
ZooKeeper是简单、可复制、有序、快速的。
1.2、数据模型和层次命名空间
ZooKeeper提供的命名空间与标准文件系统的命名空间非常类似。命名空间由一系列路径组成,用/分隔。ZooKeeper命名空间中的每个节点使用一个具体路径来标识。ZooKeeper的层次命名空间结构如下:
1.3、节点
与标准文件系统不同的是,ZooKeeper命名空间的每个节点可以保存数据,就像一个文件系统中的文件,它既是文件也是目录。ZooKeeper用来存储状态信息、配置、位置信息等,因此存储在每个节点上的数据通常很小,在字节到千字节范围内。有四种类型的节点:
临时节点(EPHEMERAL):会话结束该节点自动被删除,临时节点不能拥有子节点
临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会在节点名称后面增加一个序列号,分布式锁中会用到该类型节点
持久节点(PERSISTENT):创建后永久存在,可以自动删除;也可以设置一个存活时间,当指定存活时间过去以后,如果相应的节点没有得到更新且没有直接的,就会被自动删除
持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会在节点名称后面增加一个序列号
注:顺序节点中序列号对于此节点的父节点是唯一的,它是一个10位的数字,如果这个序列号大于2^32-1就会溢出。
1.4、更新和监视
客户端可以监视一个节点,当该节点发生变化时会,客户端会收到该节点变化的通知;一个监视器只会触发一次,触发后会删除该监视器。如果客户端和其中一个ZooKeeper服务器之间的连接中断,则客户端将收到一个本地通知。
1.5、状态信息
zxid:zookeeper每次状态改变都收到一个zxid(ZooKeeper Transaction Id),zxid是全局有序的,每次更新都会产生一个新的,且后面的大于前面的。
版本:每次节点改变都会使该节点的版本号增加,有三中版本号:dataversion(数据版本号)、cversion(子节点版本号)、aclversion(节点所拥有的ACL版本号)
通过stat [-w] path可以查看节点的具体状态信息:
cZxid 创建节点时的事务ID
ctime 创建节点时的时间
mZxid 最后修改节点的事务ID
mtime 最后修改节点的时间
pZxid 该节点的子节点最后一次修改的事务ID,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该ID
cversion 子节点版本号,子节点每次修改版本号加1
dataversion 数据版本号,数据每次修改该版本号加1
aclversion 权限版本号,权限每次修改该版本号加1
ephemeralOwner 节点的会话id,只有临时节点有,持久节点值为0
dataLength 节点的数据长度
numChildren 节点的子节点数量
1.6、特性
ZooKeeper的目标是作为构建其他复杂服务的基石,因此它提供了一系列的特性:
一致性:数据一致性, 数据按照顺序分批入库
原子性:事务要么成功要么失败
单一视图:客户端连接集群中的任意zk节点, 数据都是一致的
可靠性:每次对zk的操作状态都会保存在服务端
实时性::客户端可以读取到zk服务端的最新数据
2、zoo.cfg参数说明
clientPort zookeeper服务器对客户端暴露的端口
dataDir zookeeper服务器存储快照文件的目录,事务日志文件默认也保存在该目录下,除非另外指定。
dataLogDir 服务器存储事务日志文件的目录,默认与dataDir一致。建议将它和dataDir分别配置,防止磁盘的并发读写,影响服务器性能。可将其配置在一个单独的磁盘上。
tickTime 服务器最小时间单元,默认值3000ms
initLimit leader服务器等待Follewer服务器启动,并完成数据同步的时间,默认为10,表示10*tickTime
syncLimit leader服务器和Follewer服务器之间进行心跳检测的间隔时间,默认为5,表示5*tickTime
server.id zookeeper集群的机器列表,其中id为serverId,与myid文件中的值对应。第一个端口用于指定Leader服务器和Follewer服务器进行运行时通信和数据同步所使用的端口,第二个端口用于进行Leader选举过程中的投票通信
3、安装
3.1、单机版安装
3.1.1、下载并解压Zookeeper
下载地址:http://zookeeper.apache.org/releases.html
解压:tar zxvf zookeeper-3.6.0.tar.gz
3.1.2、修改配置文件
zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf cp zoo_sample.cfg zoo.cfg
配置文件zoo.cfg中的内容可以使用文件中的默认值,也可以根据实际需要修改配置项:
dataDir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data
3.1.3、启动停止
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin zkServer.sh start #启动 zkServer.sh stop #停止
3.2、集群安装
假设在172.17.139.160、172.17.139.161、172.17.139.162三台机器上安装。
3.2.1、下载并解压Zookeeper(每台机器)
下载地址:http://zookeeper.apache.org/releases.html
解压:tar zxvf zookeeper-3.6.0.tar.gz
3.2.2、修改zoo.cfg配置文件(每台机器)
zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf cp zoo_sample.cfg zoo.cfg
zoo.cfg中集群与单机的配置不同的地方是server.id参数,其他根据实际需要修改配置项:
dataDir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data server.1=172.17.139.160:2555:3555 server.2=172.17.139.161:2555:3555 server.3=172.17.139.162:2555:3555
3.2.3、创建myid文件(每台机器)
在dataDir(/home/hadoop/app/apache-zookeeper-3.6.0-bin/data)目录下创建myid文件,文件内容为该zookeeeper在集群中的id,对应上面zoo.cfg中server.后的数字。
172.17.139.160:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:
1
172.17.139.161:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:
2
172.17.139.162:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:
3
3.2.4、启动停止(每台机器)
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin zkServer.sh start #启动 zkServer.sh stop #停止
4、命令行
bin/zkCli.sh可以启动一个客户端连接到Zookeeper:
bin/zkCli.sh [-server host:port]
不加server参数,默认连接到本地2181端口;启动后可以输入help/h查看使用方法:
[zk: localhost:2181(CONNECTED) 4] help ZooKeeper -server host:port cmd args addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE addauth scheme auth close config [-c] [-w] [-s] connect host:port create [-s] [-e] [-c] [-t ttl] path [data] [acl] delete [-v version] path deleteall path [-b batch size] delquota [-n|-b] path get [-s] [-w] path getAcl [-s] path getAllChildrenNumber path getEphemerals path history listquota path ls [-s] [-w] [-R] path printwatches on|off quit reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*] redo cmdno removewatches path [-c|-d|-a] [-l] set [-s] [-v version] path data setAcl [-s] [-v version] [-R] path acl setquota -n|-b val path stat [-w] path sync path version Command not found: Command not found help [zk: localhost:2181(CONNECTED) 5]
4.1、列出子节点
ls [-s] [-w] [-R] path
-s:显示节点状态信息
-w:监听该节点
-R:递归查看所有子节点
如:ls /
4.2、创建节点
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
-s:顺序节点
-e:临时节点
-t:设置存活时间(针对持久节点,单位秒);需要开启,默认是关闭的,参见第6小节:TTL(Time To Life)
acl:权限控制
如:create /test test
4.3、查看节点
get [-s] [-w] path
-s:显示状态
-w:监听该节点
如:get /test
4.4、设置节点
set [-s] [-v version] path data
-s:返回状态信息
-v:设置版本信息
如:set /test testaa
4.4、查看节点状态
stat [-w] path
-w:监视该节点
如:stat /test
4.5、删除节点
delete [-v version] path
-v:指定版本信息
如:delete /test
4.6、设置权限
setAcl [-s] [-v version] [-R] path acl
-s:返回状态信息
-v:指定版本信息
-R:递归设置权限
4.7、查看权限
getAcl [-s] path
-s:返回状态信息
5、权限控制ACL(Access Control List)
ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,子节点不会继承父节点的权限;ACL由三个字段组成:schema:id:permission。
5.1、schema(权限模式)
world 只有一个id,anyone,代表所有人
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证
ip 使用IP地址认证
x509 使用客户端X500 Principal认证
5.2、id(授权对象)
权限赋予的用户或者一个实体
word对应的id只有一个:anyone
digest自定义id,通常为“usernmae:BASE64(SHA-1(username:password))”
ip对应的id为一个ip或ip段,如10.49.196.10、10.49.196.0、24
5.3、permission(权限)
CREATE(c) 可以创建子节点
READ(r) 可以读取节点数据及显示子节点列表
WRITE(w) 可以设置节点数据
DELETE(d) 可以删除子节点(仅下一级节点)
ADMIN(a) 可以设置节点权限
5.4、例子
5.4.1、word例子
setAcl /acltest world:anyone:cdrwa
创建节点时如果没有设置权限,这是默认的权限。
5.4.2、auth例子
addauth digest jack:123456 #先添加认证用户 setAcl /acltest auth:jack:cdrwa
再开一个终端需先添加认证用户(addauth digest jack:123456)才能访问/actltest
5.4.3、digest例子
echo -n jack:123456 | openssl dgst -binary -sha1 | openssl base64#得到密文tgi9UCnyPo5FJjVylKr05nAlWeg= setAcl /acltest digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
添加认证用户(addauth digest jack:123456)后才能访问/actltest。
5.4.4、ip例子
setAcl /acltest ip:10.49.196.10:cdrwa
10.49.196.10的机器才能访问/actltest。
6、TTL(Time To Life)
在zookeeper中,当创建一个PERSISTENT或者PERSISTENT_SEQUENTIAL节点的时候,可以有选择的给这个节点设置一个存活时间(TTL);当指定存活时间过去以后,如果该节点没有得到更新且没有直接的,就会被自动删除。
默认该特性是关闭的,如果需要设置java系统属性:zookeeper.extendedTypesEnabled;由于TTL节点是在3.5.3版本增加的,3.5.4/3.6.0版本并不支持,所以在3.5.4/3.6.0等其他版本还需设置另外一个java系统属性:Dzookeeper.emulate353TTLNodes。可以修改zkServer.sh脚本,增加:
-Dzookeeper.extendedTypesEnabled=true -Dzookeeper.emulate353TTLNodes=true
在zkServer.sh脚本里查找到start关键字,在如下图所示的地方增加上面的代码,如何重启Zookeeper即可。
7、Java操作Zookeeper
7.1、原生API操作Zookeeper
7.1.1、引入依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency>
7.1.2、基本操作
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; /** * Zookeeper基本操作列子 */ public class ZookeeperCase { //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181 private static String connectString = "10.49.196.10:2181"; private static int sessionTimeout = 2 * 1000; private ZooKeeper zooKeeper; @Before public void before() { try { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); System.out.println(zooKeeper.getState()); } catch (IOException e) { e.printStackTrace(); } } @After public void after() throws Exception { zooKeeper.close(); } /** * 创建节点 */ @Test public void create() throws Exception { /* * 同步创建持久节点,ACL为world:anyone:cdrwa * 等同于该命令:create /javatest/node1 test world:anyone:cdrwa */ zooKeeper.create("/javatest/node1", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); /* * 同步创建持久节点,ACL为world:anyone:cr * 等同于该命令:create /javatest/node2 test world:anyone:cr */ zooKeeper.create("/javatest/node2", "test".getBytes(), Collections.singletonList(new ACL((ZooDefs.Perms.CREATE + ZooDefs.Perms.READ), ZooDefs.Ids.ANYONE_ID_UNSAFE)), CreateMode.PERSISTENT); /* * 异步创建临时顺序节点,ACL为ip:127.0.0.1:c * 等同于该命令:create -s -e /javatest/node3 test ip:127.0.0.1:c */ CountDownLatch counter = new CountDownLatch(1); zooKeeper.create("/javatest/node3", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, new Id("ip", "127.0.0.1"))), CreateMode.EPHEMERAL_SEQUENTIAL ,new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ", name=" + name); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); /* * 同步创建持久节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa * 等同于该命令:create /javatest/node4 test digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa * 添加认证用户(addauth digest jack:123456)后才能访问/javatest/node4 */ zooKeeper.create("/javatest/node4", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg="))) , CreateMode.PERSISTENT); /* * 同步创建顺序持久节点,ACL为world:anyone:cdrwa,存活时间为5秒 * 等同于该命令:create -s -t 5000 /javatest/node5 test */ Stat stat = new Stat(); zooKeeper.create("/javatest/node5", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, stat, 5000); System.out.println(stat); } /** * 获取节点数据 * @throws Exception */ @Test public void getData() throws Exception { //同步读取数据 Stat stat = new Stat(); byte[] data = zooKeeper.getData("/javatest/node1", false, stat); System.out.println(new String(data)); System.out.println(stat); //异步读取数据 zooKeeper.addAuthInfo("digest", "jack:123456".getBytes()); CountDownLatch counter = new CountDownLatch(1); zooKeeper.getData("/javatest/node4", false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { String s = ""; if (data != null) { s = new String(data); } System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + s + ",stat=" + stat); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } @Test public void setData() throws Exception { //同步设置数据,version为-1表示匹配任何版本 Stat stat = zooKeeper.setData("/javatest/node1", "test2".getBytes(), -1); System.out.println(stat); //异步设置数据 zooKeeper.addAuthInfo("digest", "jack:123456".getBytes()); CountDownLatch counter = new CountDownLatch(1); zooKeeper.setData("/javatest/node4", "test2".getBytes(), -1, new AsyncCallback.StatCallback(){ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc=" + rc + ",path=" + path + ",stat=" + stat); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } @Test public void delete() throws Exception { //同步删除数据 zooKeeper.delete("/javatest/node1", -1); //异步删除数据 CountDownLatch counter = new CountDownLatch(1); zooKeeper.delete("/javatest/node2", -1, new AsyncCallback.VoidCallback(){ @Override public void processResult(int rc, String path, Object ctx) { System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } }
7.1.3、监控节点
DataMonitor类实现对节点的监控,节点有变化时会回调DataMonitorListener.process方法,该方法由调用方根据业务来实现;WatcherCase类传入需要的参数来启动DataMonitor。
该例子是根据官网例子改造而来,相较官网更简单了些。
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Runnable { private ZooKeeper zk; private DataMonitorListener listener; /** * 节点变化时会回调该方法,把监控变化类型及新数据带过来 */ public interface DataMonitorListener { void process(WatchedEvent event, byte[] data); } public DataMonitor(String hostPort, String znode, DataMonitorListener listener) throws Exception { this.listener = listener; AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat); switch (rc) { case KeeperException.Code.Ok: case KeeperException.Code.NoNode: return; case KeeperException.Code.SessionExpired: case KeeperException.Code.NoAuth: close(); return; default: zk.exists(znode, true, this, null); return; } } }; //监视器 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event); if (event.getType() == Event.EventType.None) { switch (event.getState()) { case SyncConnected: break; case Expired: close(); break; } } else { try { byte[] bytes = zk.getData(event.getPath(), false, null); listener.process(event, bytes); } catch (Exception e) { e.printStackTrace(); } if (event.getPath() != null && event.getPath().equals(znode)) { //再次监控 zk.exists(znode, true, callback, null); } } } }; zk = new ZooKeeper("10.49.196.10:2181", 20000, watcher); zk.exists(znode, true, callback, null); } @Override public void run() { try { synchronized (this) { wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void close() { synchronized (this) { notifyAll(); } } }
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; /** * 监视节点样例 */ public class WatcherCase { public static void main(String[] args) throws Exception { DataMonitor.DataMonitorListener listener = new DataMonitor.DataMonitorListener() { @Override public void process(WatchedEvent event, byte[] data) { //todo:根据实际情况处理 if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { System.out.println(new String(data)); } } }; new DataMonitor("10.49.196.10:2181", "/watchtest", listener).run(); } }