Chapter 8 Curator:ZooKeeper API 的高级封装库 —— 笔记

Curator的核心目标就是管理ZooKeeper的相关操作,将连接管理的复杂操作部分隐藏起来。

  • 例如,Curator实现了以下原语的菜谱:

    • 锁(lock)
    • 屏障(barrier)
    • 缓存(cache)
  • 还实现了流畅(fluent)式的开发风格的接口:能够将ZooKeeper 中的create、delete、getData 等操作以流水线式的编程方式链式执行。
  • 提供了命名空间(namespace)
  • 自动重连和其他组件

一、Curator 客户端程序

  • 首先、创建一个客户端实例,实例为 CuratorFramework 类的实力,通过调用 Curator 提供的工厂方法来获得该实例:
CuratorFramework zkc = 
    CuratorFrameworkFactory.newClient(connectString, retryPolicy);
  • connectString:连接的ZooKeeper服务器的列表
  • retryPolicy:指定对于失去连接事件重试操作的处理策略
  • 注意:在工厂类中还提供了其他方法来创建实例,其中有一个CuratorZooKeeperClient类,该类在ZooKeeper客户端实例上提供了某些附加功能,如保证请求操作在不可预见的连接断开情况下也能够安全执行,与CuratorFramework类不同,CuratorZooKeeperClient类中的操作执行与ZooKeeper客户端句柄直接相对应。

二、流畅式 API

【标准 ZooKeeper API】

zk.create("/mypath",
    new byte[0],
    ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT);

【Curator 流程 API】

zkc.create().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);

create 调用返回一个 CreateBuilder 类的实力,随后调用的返回均为 CreateBuilder 类所继承的对象,如:

  • CreateBuilder 继承了 CreateModable<ACLBackgroundPathAndBytesable<String>> 类
  • withMode 方法声明了泛型接口 CreateModable<T>

【异步执行方法,只需增加 inBackground】

zkc.create().inBackground().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
inBackground调用可以传入一个上下文对象,通过该参数可以传入一个具体的回调方法的实现,或是一个执行回调的执行器(java.util.concurrent.Executor)。在Java中,执行器(executor)对象可以执行可运行的对象,我们可以通过执行器将回调方法的执行与ZooKeeper客户端线程的运行解耦,采用执行器常常比为每个任务新建一个线程更好。

【设置监视点,只需调用链中增加 watched 方法】

zkc.getData().inBackground().watched().forPath("/mypath")
  • 上面设置的监视点将会通过监听器触发通知,这些通知将会以WATCHED事件传递给指定的监听器
  • 还可以使用usingWathcer方法替换watched方法,usingWathcer方法接受一个普通的ZooKeeper的Wathcer对象,并在接收到通知后调用该监视点方法
  • 第三种选择就是传入一个CuratorWatcher对象,CuratorWatcher的process方法与ZooKeeper的Watcher不同的是,它可能会抛出异常。

三、监听器

1、【实现一个 CuratorListener 接口】

CuratorListener masterListener = new CuratorListener() {
    public void eventRecived(CuratorFramework client, CuratorEvent event) {
        try {
            switch(event.getType()) {
            case CHILDREN:
                ...
                break;
            case CREATE:
                ...
                break;
            case DELETE:
                ...
                break;
            case WATCHED:
                ...
                break;
            }
        } catch(Exception e) {
            LOG.error("Exception while processing event.", e);
            try {
                close();
            } catch(IOException ioe) {
                LOG.error(:IOException while closing.", ioe);
            }
        }
    }
};

2、【注册这个监听器】

client = CuratorFrameworkFactory.newClient(hostPost, retryPolicy);
client.getCuratorListenable().addListener(masterListener);

3、【特殊的监听器,负责处理后台工作线程捕获的异常时的错误报告,提供了底层细节的处理】

UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
    public void UnhandleError(String message, Throwable e) {
        LOG.error("Unrecoverable error:", e);
        try {
            close();
        } catch (IOException ioe) {
            LOG.warn( "Exception when closing.", ioe );
        }
    }
}

4、【将该监听器注册到客户端实例中】

client.getUnhandledErrorListeable().addListener(errorsListener);

四、Curator 中状态的转换

在Curator中暴露了与ZooKeeper不同的一组状态,比如SUSPENDED状态,还有Curator使用LOST来表示会话过期的状态。
图8-1中展示了连接状态的状态机模型,当处理状态的转换时,建议将所有主节点操作请求暂停,因为并不知道ZooKeeper客户端能否在会话过期前重新连接,即使ZooKeeper客户端重新连接成功,也可能不再是主要主节点的角色,因此谨慎处理连接丢失的情况,对应用程序更加安全。
【图8-1:Curator连接状态机模型】
Chapter 8 Curator:ZooKeeper API 的高级封装库 —— 笔记
【READ_ONLY状态】:当ZooKeeper集群启用了只读模式,客户端所连接的服务器就会进入只读模式中,此时的连接状态也将进入只读模式。服务器转换到只读模式后,该服务器就会因隔离问题而无法与其他服务器共同形成仲裁的最低法定数量,当连接状态为制度模式,客户端也将漏掉此时发生的任何更新操作,因为如果集群中存在一个子集的服务器数量,可以满足仲裁最低法定数量,并可以接收到客户端的对ZooKeeper的更新操作,还是会发生ZooKeeper的更新,也许这个子集的服务器会持续运行很久(ZooKeeper无法控制这种情况),那么漏掉的更新操作可能会无限多。漏掉更新操作的结果可能会导致应用程序的不正确的操作行为,所以,强烈建议启用该模式前仔细考虑其后果。

五、两种边界情况

有两种有趣的错误场景,在Curator中都可以处理得很好,第一种是在有序节点的创建过程中发生的错误情况的处理,第二种为删除一个节点时的错误处理。

  • 【有序节点的情况】
    如果客户端所连接的服务器崩溃了,但还没来得及返回客户端所创建的有序节点的节点名称(即节点序列号),或者客户端只是连接丢失,客户端没接收到所请求操作的响应信息,结果,客户端并不知道所创建的znode节点路径名称。回忆对于有序节点的应用场景,例如,建立一个有序的所有客户端列表。为了解决这个问题,CreateBuilder提供了一个 withProtection 方法来通知Curator客户端,在创建的有序节点前添加一个唯一标识符,如果create操作失败了,客户端就会开始重试操作,而重试操作的一个步骤就是验证是否存在一个节点包含这个唯一标识符。
  • 【删除节点的保障】
    在进行delete操作时也可能发生类似情况,如果客户端在执行delete操作时,与服务器之间的连接丢失,客户端并不知道delete操作是否成功执行。如果一个znode节点删除与否表示某些特殊情况,例如,表示一个资源处于锁定状态,因此确保该节点删除才能确保资源的锁定被释放,以便可以再次使用。Curator客户端中提供了一个方法,对应用程序的delete操作的执行提供了保障,Curator客户端会重新执行操作,直到成功为止,或Curator客户端实例不可用时。使用该功能,只需要使用DeleteBuilder接口中定义的 guaranteed 方法。

六、菜谱

1、群首闩(leader latch)

【创建一个LeaderLatch的实例】

leaderLatch = new LeaderLatch(client, "/master", myId);
  • 传入一个Curator框架客户端的实例
  • 一个用于表示集群管理节点的群组的ZooKeeper路径
  • 以及一个表示当前主节点的标识符

【注册一个LeaderLatchListener接口的实现,该接口中有两个方法:isLeader和notLeader。以下为isLeader实现的代码:】

@Override
public void isLeader()
{
    ...
    /*
    * Start workersCache  ①
    */
    workersCache.getListeable().addListener(workersCacheListener);
    workersCache.start();
    (new RecoveredAssignments(
        client.getZooKeeperClient().getZooKeeper())).recover(
            new RecoveryCallback() {
                public void recoveryComplete(int rc, List<String> tasks) {
                    try {
                        if(rc == RecoveryCallback.FAILED) {
                            LOG.warn("Recovery of assigned tasks failed.");
                        } else {
                            LOG.info( "Assigning recovered tasks" );
                            recoveryLatch = new CountDownLatch(tasks.size());
                            assignTasks(tasks); ② 
                        }
                        new Thread( new Runnable() {③
                            public void run() {
                                try {
                                    /*
                                    * Wait until recovery is complete
                                    */
                                        recoveryLatch.await();
                                    /*
                                    * Start tasks cache
                                    */
                                    tasksCache.getListenable().
                                        addListener(tasksCacheListener); ④
                                    tasksCache.start();
                                } catch (Exception e) {
                                    LOG.warn("Exception while assigning and getting tasks.", e );
                                }
                            }
                        }).start();
                    } catch(Exception e) {
                    LOG.error("Exception while executing the recovery callback", e);
                    }    
                }
            });
        }
/* ① 首先初始化一个从节点缓存列表的实例,以确保有可以分配任务的从节点。
 * ② 一旦发现存在之前的主节点没有分配完的任务需要分配,将继续进行任务分配。
 * ③ 实现了一个任务分配的屏障,这样就可以在开始分配新任务前,等待已恢复的任务的分配完成,如果不这样做,新的主节点会再次分配所有已恢复的任务。启动了一个单独的线程进行处理,以便不会锁住ZooKeeper客户端回调线程的运行。
 * ④ 当主节点完成恢复任务的分配操作,开始进行新任务的分配操作。 */

【需要在具体流程开始前注册监听器。在runForMaster方法中进行这两步操作,同时,还将注册另外两个监听器,来处理事件的监听和错误:】

pubic void runForMaster() {
    client.getCuratorListenable().adddListener(masterLisener);
    client.getUnhandledErrorListenable().addListener(errorsListener);
    leaderLatch.start();
}

**【对于notLeader方法,会在主节点失去管理权时进行调用,在本例中,只是简单地关闭了所有对象实例,对这个例子来说,这些操
作已经足够了。在实际的应用程序中,也许还需要进行某些状态的清理操作并等待再次成为主节点。如果LeaderLatch对象没有关闭,Curator客户端有可能再次获得管理权】**

2、群首选举器

选举主节点时还可以使用的另一个菜谱为LeaderSelector。
LeaderSelector和LeaderLatch之间主要区别在于使用的监听器接口不同:

  • LeaderSelector使用了LeaderSelectorListener接口,该接口中定义了takeLeadership方法,并继承了stateChanged方法,可以在应用程序中使用群首闩原语来进行一个主节点的选举操作

【首先需要创建一个LeaderSelector实例】

leaderSelector = new LeaderSelector(client, "/master", this);

【takeLeadership方法用于获取管理权,该代码实现与isLeader类似】

CountDownLatch leaderLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1); 
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
    ...
    /*
    * Start workersCache
    */
    workersCache.getListenable().addListener(workersCacheListener);
    workersCache.start();
    (new RecoveredAssignments(
        client.getZooKeeperClient().getZooKeeper())).recover(
            new RecoveryCallback() {
                public void recoveryComplete (int rc, List<String> tasks) {
                try {
                    if(rc == RecoveryCallback.FAILED) {
                        LOG.warn("Recovery of assigned tasks failed.");
                    } else {
                        LOG.info( "Assigning recovered tasks" );
                        recoveryLatch = new CountDownLatch(tasks.size());
                        assignTasks(tasks);
                    }
                    new Thread( new Runnable() {
                        public void run() {
                            try {
                                /*
                                * Wait until recovery is complete
                                */
                                recoveryLatch.await();
                                /*
                                * Start tasks cache
                                */
                                tasksCache.getListenable().
                                addListener(tasksCacheListener);
                                tasksCache.start();
                            } catch (Exception e) {
                                LOG.warn("Exception while assigning and getting tasks.", e);
                            }
                        }
                    }).start();
                /*
                * Decrement latch
                */
                leaderLatch.countDown(); ①
                } catch (Exception e) {
                    LOG.error("Exception while executing the recovery callback", e);
                }
            }
    });
    /*
    * This latch is to prevent this call from exiting. If we exit, then
    * we release mastership.
    */
    closeLatch.await(); ②
}
/* ① 通过一个单独的CountDownLatch原语来等待该Curator客户端获取管理权。
 * ② 如果主节点退出了takeLeadership方法,也就放弃了管理权,通过CountDownLatch来阻止退出该方法,直到主节点关闭为止。
 */
实现的这个方法为CuratorMaster类的一部分,而CuratorMaster类实现了LeaderSelectorListener接口。对于主节点来说,如果想要释放管理权只能退出takeLeadership方法,所以需要通过某些锁等机制来阻止该方法的退出,在实现中,在退出主节点时通过递减闩(latch)值来实现。

【依然在runForMaster方法中启动我们的主节点选择器】

public void runForMaster() {
    client.getCuratorListenable().addListener(masterListener);
    client.getUnhandledErrorListenable().addListener(errorsListener);
    leaderSelector.setId(myId);
    leaderSelector.start();
}
另外还需要给这个主节点一个任意的标识符,虽然在本例中并未实现,但可以设置群首选择器在失去管理权后自动重新排队(LeaderSelector.autoRequeue)。重新排队意味着该客户端会一直尝试获取管理权,并在获得管理权后执行takeLeadership方法。

【作为LeaderSelectorListener接口实现的一部分,还实现了一个处理连接状态变化的方法:】

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
    switch(newState) {
    case CONNECTED:
        // Nothing to do in this case.
        break;
    case RECONNECTED:
        // Reconnected, so I should ①
        // still be the leader.
        break;
    case SUSPENDED:
        LOG.warn("Session suspended");
        break;
    case LOST:
        try {
            close(); ②
        } catch (IOException e) {
            LOG.warn( "Exception while closing", e );
        }
        break;
    case READ_ONLY:
        // We ignore this case.
        break;
    }
}
/* ① 所有操作均需要通过ZooKeeper集群实现,因此,如果连接丢失,主节点也就无法先进行任何操作请求,因此在这里最好什么都不做。
 * ② 如果会话丢失,只是关闭这个主节点程序。
 */

3、子节点缓存器

最后一个菜谱是子节点缓存器(PathChildrenCached类)

为了处理每一个缓存器实例的变化情况,需要一个 PathChildrenCacheListener 接口的实现类,该接口中只有一个方法
childEvent 。对于从节点信息的列表,只关心从节点离开的情况,因为需要重新分配已经分给这些节点的任务,而列表中添加信息对于分配新任务更加重要:
PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener()
{
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
    {
        if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
            /*
            * Obtain just the worker's name
            */
            try {
                getAbsentWorkerTasks(event.getData().getPath().replaceFirst("/workers/", ""));
            } catch (Exception e) {
                LOG.error("Exception while trying to re-assign tasks.", e);
            }
        }
    }
};
对于任务列表,通过列表增加的情况来触发任务分配的过程:
PathChildrenCacheListener tasksCacheListener = new PathChildrenCacheListener() {
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
        if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
            try {
                assignTask(event.getData().getPath().replaceFirst("/tasks/",""));
            } catch (Exception e) {
                LOG.error("Exception when assigning task.", e);
            }
        }
    }
};

相关推荐