大数据系列(6)——ZooKeeper

1. ZooKeeper

开源的分布式的协调服务,是Google的Chubby一个开源的实现,它是一个为分布式应用提供一致性服务的软件

2. ZooKeeper提供的功能

  • 配置维护
  • 域名服务
  • 分布式锁
  • 组服务

3. ZooKeeper的特点

  • 简单

    • ZooKeeper的核心是一个精简的文件系统 ,它支持一些简单的操作和一些抽象操作
  • 丰富

    • ZooKeeper的操作是很丰富的,可实现一些协调数据结构和协议。例如,分布式队列、分布式锁和一组同级别节点中的“领导者选举”
  • 高可靠

    • ZooKeeper支持集群模式,可以很容易的解决单点故障问题
  • 松耦合交互

    • 不同进程间的交互不需要了解彼此,甚至可以不必同时存在,某进程在ZooKeeper中留下消息后,该进程结束后其它进程还可以读这条消息
  • 资源库

    • ZooKeeper实现了一个关于通用协调模式的开源共享存储库,能使开发者免于编写这类通用协议

4. ZooKeeper的角色

  • leader(领导者)

    • 负责进行投票的发起和决议,更新系统状态
  • learner (学习者)

    • 包括跟随者(follower)和观察者(observer),follower用于接

      受客户端请求并向客户端返回结果,在选举过程中参与投票。Observer可以接受客户端连接,

      将写请求转发给leader,但observer不参与投票过程,只同步leader的状态,observer的

      目的是为了扩展系统,提高读取速度

  • 客户端(client)

    • 请求发起方

5. ZooKeeper数据模型

  • 层次化的目录结构,命名符合常规文件系统规范
  • 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
  • 节点znode可以包含数据和子节点,但是EPHEMERAL类型的节点不能有子节点
  • znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本
  • 客户端应用可以在节点上设置监视器
  • 节点不支持部分读写,而是一次性完整读写

6. ZooKeeper的节点

  • zookeeper中的节点包含下面两个类型的节点

    • 临时节点(ephemeral)
    • 持久节点(persistent)

znode的类型在创建时确定并且之后不能再修改

znode默认不指定类型是持久节点

  • ephemeral类型的节点

    • 在节点客户端会话结束时,会将zookeeper中的节点删除
    • 不能有子节点
  • persistent节点不依赖与客户端会话,只有当客户端明确要删除该persistent节点时才会被删除
  • ZooKeeper的客户端和服务器通信采用长连接方式 ,每个客户端和服务器通过心跳来保持连接,这个连接状态称之为session ,如果znode是临时节点,这个seesion失效,znode也就删除了

7. ZooKeeper中的选举机制

  • 服务器的ID

    • 分别1,2,3
    • 编号越大在选择算法中的权重越大
  • 数据的ID

    • 服务器中存放的最大数据ID.
    • 值越大说明数据越新,在选举算法中数据越新权重越大
  • 编辑时钟

    • 投票的次数
    • 同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断
  • 选举状态

    • LOOKING,竞选状态。
    • FOLLOWING,随从状态,同步leader状态,参与投票。
    • OBSERVING,观察状态,同步leader状态,不参与投票。
    • LEADING,领导者状态。
  • 选举信息的内容

    ​ 在投票完成后,需要将投票信息发送给集群中的所有服务器,它包含如下内容。

    • 服务器ID
    • 数据ID
    • 逻辑时钟
    • 选举状态
  • 选举机制的结果

    • zk启动之后通过选举机制,来选举出来一个leader

8. ZooKeeper集群搭建

安装zookeeper集群要求大于1的奇数台机器

8.1 准备安装包

zookeeper-3.4.6.tar.gz

8.2 解压

tar -zxvf zookeeper-3.4.6.tar.gz -C /opt/

8.3 重命名

mv zookeeper-3.4.6/ zookeeper

8.4 配置环境变量

export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=PATH:ZOOKEEPER_HOME/bin

8.5 配置zookeeper

cp zoo_sample.cfg zoo.cfg

dataDir=/opt/zookeeper/data

8.6 myid文件

在zookeeper的各个机器中分别创建myid文件(/opt/zookeeper/data ),内容分别为

1 2 3

8.7 配置zoo.cfg

server.1=uplooking03:2888:3888
server.2=uplooking04:2888:3888
server.3=uplooking05:2888:3888

8.8 查看zookeeper集群的时间

保证zookeeper 集群中的时间不能有超过20秒的误差

ntpdate -u ntp.api.bz 根据时间同步服务器同步时间,-u是绕过防火请

8.9 启动zookeeper服务

zkServer.sh start

9. ZooKeeper

9.1 zookeeper的Shell操作

  • create znodename data :创建znode
  • get znodename :查看znode
  • set znodename data : 修改znode的数据
  • rmr znodename : 删除znode节点
  • quit :退出会话
  • delete : 删除节点
  • setquota -b 长度配额 :设置节点长度配额
  • setquota -n 数量配额 :设置节点数量配额
  • listquoat path : 列出配额
  • delquota path :删除配额

    • zookeeper中的配额管理超出配额的大小之后依然可以进行操作
  • create -e znode :创建临时节点

    • 临时节点不能创建子节点
    • 临时节点可以有数据
    • 临时节点会话结束时会自动删除

9.2 zookeeper原生的API操作

package com.uplooking.bigdata.zookeeper;

import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ZookeeperTest {
    private ZooKeeper zooKeeper;

    @Before
    public void init() throws Exception {
        String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
        zooKeeper = new ZooKeeper(connStr, 3000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("watch..." + watchedEvent.getType());
            }
        });
    }

    /**
     * 创建节点
     */
    @Test
    public void testCreateZNode() throws Exception {
        String path = "/test01";
        zooKeeper.exists(path, true);
        String ret = zooKeeper.create(path, "HELLO2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(ret);
    }

    @Test
    public void testSetZnode() throws KeeperException, InterruptedException {
        zooKeeper.setData("/test02", "uplooking02".getBytes(), 1);
    }


    @Test
    public void testGetZnode() throws KeeperException, InterruptedException {
        byte[] data = zooKeeper.getData("/test02", true, null);
        System.out.println(new String(data, 0, data.length));
    }

    @Test
    public void testDeleteZnode() throws KeeperException, InterruptedException {
        zooKeeper.delete("/test02", -1);
    }

    @After
    public void destory() throws Exception {
        zooKeeper.close();
    }
}
  • 实现注册监听的方法

    • exists
    • getData
    • getChild
  • 事件的类型

    • NodeCreated
    • NodeDeleted
    • NodeDataChanged
    • .......

10 ZooKeeper客户端神器Curator

  • Netflix公司开源的一套Zookeeper客户端框架
  • 封装了原生的zookeeper的API操作
  • apache的顶级项目
  • 基于Fluent的编程风格(链式编程)
  • Curator有2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性
  • 推荐使用curator2.x的版本

11 Curator中的组件

名称描述
RecipesZookeeper典型应用场景的实现,这些实现是基于Curator Framework。
FrameworkZookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
Utilities为Zookeeper提供的各种实用程序。
ClientZookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
ErrorsCurator如何处理错误,连接问题,可恢复的例外等。

12 依赖POM

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
    public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
    private CuratorFramework zkClient;

    @Before
    public void init() {
        zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
        zkClient.start();
    }

    /**
     * 创建空节点(其实不是空节点,是给节点默认设置了ip地址)
     *
     * @throws Exception
     */
    @Test
    public void createZnode() throws Exception {
        zkClient.create().forPath("/test03");
    }

    @Test
    public void createZnode1() throws Exception {
        zkClient.create().forPath("/test02", "h".getBytes());
    }


    @Test
    public void deleteZnode() throws Exception {
        zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
    }

    @Test
    public void setZnode() throws Exception {
        zkClient.setData().forPath("/test03");
    }

    @Test
    public void getZnode() throws Exception {
        byte[] bytes = zkClient.getData().forPath("/test03");
        System.out.println(new String(bytes, 0, bytes.length));
    }
}

10 ZooKeeper客户端神器Curator

10.1 Curator简介

  • Netflix公司开源的一套Zookeeper客户端框架
  • 封装了原生的zookeeper的API操作
  • apache的顶级项目
  • 基于Fluent的编程风格(链式编程)
  • Curator有2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性
  • 推荐使用curator2.x的版本

10.2 Curator中的组件

名称描述
RecipesZookeeper典型应用场景的实现,这些实现是基于Curator Framework。
FrameworkZookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
Utilities为Zookeeper提供的各种实用程序。
ClientZookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
ErrorsCurator如何处理错误,连接问题,可恢复的例外等。

10.3 Curator的基本API操作

/*Curator的基本的节点操作*/
package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
    public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
    private CuratorFramework zkClient;

    @Before
    public void init() {
        zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
        zkClient.start();
    }

    /**
     * 创建空节点(其实不是空节点,是给节点默认设置了ip地址)
     *
     * @throws Exception
     */
    @Test
    public void createZnode() throws Exception {
        zkClient.create().forPath("/test03");
    }

    @Test
    public void createZnode1() throws Exception {
        zkClient.create().forPath("/test02", "h".getBytes());
    }


    @Test
    public void deleteZnode() throws Exception {
        zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
    }

    @Test
    public void setZnode() throws Exception {
        zkClient.setData().forPath("/test03");
    }

    @Test
    public void getZnode() throws Exception {
        byte[] bytes = zkClient.getData().forPath("/test03");
        System.out.println(new String(bytes, 0, bytes.length));
    }
}

10.4 Curator的监听器

  • PathChildrenCache(监听一级子节点的改变)

    • 监听的节点如果不存在则会自动创建一个
    • 监听的节点如果途中被删除了,那么监听器则无效了
  • NodeCache(当前节点的的改变)

    • 节点的改变(节点的添加也属于节点的改变)
    • 节点的删除
  • TreeCache(当前节点以及后代节点的改变)

10.4.1 NodeCache监听器

String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
//创建监听监听器
NodeCache nodeCache = new NodeCache(zkClient, path);
nodeCache.start();

nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
        if (nodeCache.getCurrentData() == null) {
            System.out.println("删除了节点...."+path);
        } else {
            System.out.println("节点改变.." + "路径为:" + nodeCache.getCurrentData().getPath() + "数据为:" + new String(nodeCache.getCurrentData().getData()));
        }
    }
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();

10.4.2 PathChildrenCache监听器

String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
//创建监听监听器
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        System.out.println("一级子节点改变.."+event.getData()+ event.getType());
    }
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();

相关推荐