大数据系列(6)——ZooKeeper
1. ZooKeeper
开源的分布式的协调服务,是Google的Chubby一个开源的实现,它是一个为分布式应用提供一致性服务的软件
2. ZooKeeper提供的功能
- 配置维护
- 域名服务
- 分布式锁
- 组服务
3. ZooKeeper的特点
简单
- ZooKeeper的核心是一个精简的文件系统 ,它支持一些简单的操作和一些抽象操作
丰富
- ZooKeeper的操作是很丰富的,可实现一些协调数据结构和协议。例如,分布式队列、分布式锁和一组同级别节点中的“领导者选举”
高可靠
- ZooKeeper支持集群模式,可以很容易的解决单点故障问题
松耦合交互
- 不同进程间的交互不需要了解彼此,甚至可以不必同时存在,某进程在ZooKeeper中留下消息后,该进程结束后其它进程还可以读这条消息
资源库
- ZooKeeper实现了一个关于通用协调模式的开源共享存储库,能使开发者免于编写这类通用协议
4. ZooKeeper的角色
leader(领导者)
- 负责进行投票的发起和决议,更新系统状态
learner (学习者)
- 包括跟随者(follower)和观察者(observer),follower用于接
受客户端请求并向客户端返回结果,在选举过程中参与投票。Observer可以接受客户端连接,
将写请求转发给leader,但observer不参与投票过程,只同步leader的状态,observer的
目的是为了扩展系统,提高读取速度
- 包括跟随者(follower)和观察者(observer),follower用于接
客户端(client)
- 请求发起方
5. ZooKeeper数据模型
- 层次化的目录结构,命名符合常规文件系统规范
- 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
- 节点znode可以包含数据和子节点,但是EPHEMERAL类型的节点不能有子节点
- znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本
- 客户端应用可以在节点上设置监视器
- 节点不支持部分读写,而是一次性完整读写
6. ZooKeeper的节点
zookeeper中的节点包含下面两个类型的节点
- 临时节点(ephemeral)
- 持久节点(persistent)
znode的类型在创建时确定并且之后不能再修改
znode默认不指定类型是持久节点
ephemeral类型的节点
- 在节点客户端会话结束时,会将zookeeper中的节点删除
- 不能有子节点
- persistent节点不依赖与客户端会话,只有当客户端明确要删除该persistent节点时才会被删除
- ZooKeeper的客户端和服务器通信采用长连接方式 ,每个客户端和服务器通过心跳来保持连接,这个连接状态称之为session ,如果znode是临时节点,这个seesion失效,znode也就删除了
7. ZooKeeper中的选举机制
服务器的ID
- 分别1,2,3
- 编号越大在选择算法中的权重越大
数据的ID
- 服务器中存放的最大数据ID.
- 值越大说明数据越新,在选举算法中数据越新权重越大
编辑时钟
- 投票的次数
- 同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断
选举状态
- LOOKING,竞选状态。
- FOLLOWING,随从状态,同步leader状态,参与投票。
- OBSERVING,观察状态,同步leader状态,不参与投票。
- LEADING,领导者状态。
选举信息的内容
在投票完成后,需要将投票信息发送给集群中的所有服务器,它包含如下内容。
- 服务器ID
- 数据ID
- 逻辑时钟
- 选举状态
选举机制的结果
- zk启动之后通过选举机制,来选举出来一个leader
8. ZooKeeper集群搭建
安装zookeeper集群要求大于1的奇数台机器
8.1 准备安装包
zookeeper-3.4.6.tar.gz
8.2 解压
tar -zxvf zookeeper-3.4.6.tar.gz -C /opt/
8.3 重命名
mv zookeeper-3.4.6/ zookeeper
8.4 配置环境变量
export ZOOKEEPER_HOME=/opt/zookeeper export PATH=PATH:ZOOKEEPER_HOME/bin
8.5 配置zookeeper
cp zoo_sample.cfg zoo.cfg
dataDir=/opt/zookeeper/data
8.6 myid文件
在zookeeper的各个机器中分别创建myid文件(/opt/zookeeper/data ),内容分别为
1 2 3
8.7 配置zoo.cfg
server.1=uplooking03:2888:3888 server.2=uplooking04:2888:3888 server.3=uplooking05:2888:3888
8.8 查看zookeeper集群的时间
保证zookeeper 集群中的时间不能有超过20秒的误差
ntpdate -u ntp.api.bz 根据时间同步服务器同步时间,-u是绕过防火请
8.9 启动zookeeper服务
zkServer.sh start
9. ZooKeeper
9.1 zookeeper的Shell操作
- create znodename data :创建znode
- get znodename :查看znode
- set znodename data : 修改znode的数据
- rmr znodename : 删除znode节点
- quit :退出会话
- delete : 删除节点
- setquota -b 长度配额 :设置节点长度配额
- setquota -n 数量配额 :设置节点数量配额
- listquoat path : 列出配额
delquota path :删除配额
- zookeeper中的配额管理超出配额的大小之后依然可以进行操作
create -e znode :创建临时节点
- 临时节点不能创建子节点
- 临时节点可以有数据
- 临时节点会话结束时会自动删除
9.2 zookeeper原生的API操作
package com.uplooking.bigdata.zookeeper; import org.apache.zookeeper.*; import org.junit.After; import org.junit.Before; import org.junit.Test; public class ZookeeperTest { private ZooKeeper zooKeeper; @Before public void init() throws Exception { String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181"; zooKeeper = new ZooKeeper(connStr, 3000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("watch..." + watchedEvent.getType()); } }); } /** * 创建节点 */ @Test public void testCreateZNode() throws Exception { String path = "/test01"; zooKeeper.exists(path, true); String ret = zooKeeper.create(path, "HELLO2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(ret); } @Test public void testSetZnode() throws KeeperException, InterruptedException { zooKeeper.setData("/test02", "uplooking02".getBytes(), 1); } @Test public void testGetZnode() throws KeeperException, InterruptedException { byte[] data = zooKeeper.getData("/test02", true, null); System.out.println(new String(data, 0, data.length)); } @Test public void testDeleteZnode() throws KeeperException, InterruptedException { zooKeeper.delete("/test02", -1); } @After public void destory() throws Exception { zooKeeper.close(); } }
实现注册监听的方法
- exists
- getData
- getChild
事件的类型
- NodeCreated
- NodeDeleted
- NodeDataChanged
- .......
10 ZooKeeper客户端神器Curator
- Netflix公司开源的一套Zookeeper客户端框架
- 封装了原生的zookeeper的API操作
- apache的顶级项目
- 基于Fluent的编程风格(链式编程)
- Curator有2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性
- 推荐使用curator2.x的版本
11 Curator中的组件
名称 | 描述 |
---|---|
Recipes | Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。 |
Framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
Utilities | 为Zookeeper提供的各种实用程序。 |
Client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
Errors | Curator如何处理错误,连接问题,可恢复的例外等。 |
12 依赖POM
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
package com.uplooking.bigdata.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryForever; import org.junit.Before; import org.junit.Test; public class CuratorTest { public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181"; private CuratorFramework zkClient; @Before public void init() { zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000)); zkClient.start(); } /** * 创建空节点(其实不是空节点,是给节点默认设置了ip地址) * * @throws Exception */ @Test public void createZnode() throws Exception { zkClient.create().forPath("/test03"); } @Test public void createZnode1() throws Exception { zkClient.create().forPath("/test02", "h".getBytes()); } @Test public void deleteZnode() throws Exception { zkClient.delete().deletingChildrenIfNeeded().forPath("/test02"); } @Test public void setZnode() throws Exception { zkClient.setData().forPath("/test03"); } @Test public void getZnode() throws Exception { byte[] bytes = zkClient.getData().forPath("/test03"); System.out.println(new String(bytes, 0, bytes.length)); } }
10 ZooKeeper客户端神器Curator
10.1 Curator简介
- Netflix公司开源的一套Zookeeper客户端框架
- 封装了原生的zookeeper的API操作
- apache的顶级项目
- 基于Fluent的编程风格(链式编程)
- Curator有2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性
- 推荐使用curator2.x的版本
10.2 Curator中的组件
名称 | 描述 |
---|---|
Recipes | Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。 |
Framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
Utilities | 为Zookeeper提供的各种实用程序。 |
Client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
Errors | Curator如何处理错误,连接问题,可恢复的例外等。 |
10.3 Curator的基本API操作
/*Curator的基本的节点操作*/ package com.uplooking.bigdata.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryForever; import org.junit.Before; import org.junit.Test; public class CuratorTest { public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181"; private CuratorFramework zkClient; @Before public void init() { zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000)); zkClient.start(); } /** * 创建空节点(其实不是空节点,是给节点默认设置了ip地址) * * @throws Exception */ @Test public void createZnode() throws Exception { zkClient.create().forPath("/test03"); } @Test public void createZnode1() throws Exception { zkClient.create().forPath("/test02", "h".getBytes()); } @Test public void deleteZnode() throws Exception { zkClient.delete().deletingChildrenIfNeeded().forPath("/test02"); } @Test public void setZnode() throws Exception { zkClient.setData().forPath("/test03"); } @Test public void getZnode() throws Exception { byte[] bytes = zkClient.getData().forPath("/test03"); System.out.println(new String(bytes, 0, bytes.length)); } }
10.4 Curator的监听器
PathChildrenCache(监听一级子节点的改变)
- 监听的节点如果不存在则会自动创建一个
- 监听的节点如果途中被删除了,那么监听器则无效了
NodeCache(当前节点的的改变)
- 节点的改变(节点的添加也属于节点的改变)
- 节点的删除
- TreeCache(当前节点以及后代节点的改变)
10.4.1 NodeCache监听器
String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181"; RetryPolicy retryPolicy = new RetryNTimes(3, 3000); CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy); zkClient.start(); String path = "/test01"; //创建监听监听器 NodeCache nodeCache = new NodeCache(zkClient, path); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { if (nodeCache.getCurrentData() == null) { System.out.println("删除了节点...."+path); } else { System.out.println("节点改变.." + "路径为:" + nodeCache.getCurrentData().getPath() + "数据为:" + new String(nodeCache.getCurrentData().getData())); } } }); Thread.sleep(Integer.MAX_VALUE); zkClient.close();
10.4.2 PathChildrenCache监听器
String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181"; RetryPolicy retryPolicy = new RetryNTimes(3, 3000); CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy); zkClient.start(); String path = "/test01"; //创建监听监听器 PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true); pathChildrenCache.start(); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("一级子节点改变.."+event.getData()+ event.getType()); } }); Thread.sleep(Integer.MAX_VALUE); zkClient.close();