基于ZooKeeper的事件驱动设计
事件驱动在软件设计中是一种常见的方法,它有很多直观的优点。比如它的思想符合开闭原则,具有极好的可扩展性,提供更好的响应性,等等。观察者模式算得上是最简单的事件驱动设计了,但是直接使用它,也存在一些难以解决的问题。比如如何保证事件的顺序消费?在流量大的时候如何把系统的负载均衡到多个服务器?本文为大家介绍一种利用ZooKeeper实现的分布式事件驱动设计的实现。
事件驱动,首先我们要约定好什么是一个事件?事件是一个抽象的概念,用户点击某个按钮是一个事件、用户下订单是一个事件、用户支付订单是一个事件、商品发货了是一个事件,所以我们约定事件包含几个关键的属性:
- 类型
用于区分不同类型的事件,不同类型的事件有着不同的观注者。
- 事件的内容
记录事件的信息,比如:谁点击了哪个按钮?谁下了订单、订单对应的商品是什么、价格多少?谁支付了订单、支付方式是什么?
- 时间
表示事件发生的时间。
- 事件的分组
很多业务场景对事件的处理顺序有严格的要求,只通过事件的类型来实现顺序的隔离在某个场景中会产生问题,比如创建订单事件,和订单变更事件是两个不同类型的事件,但它们确有先后依赖关系,订单只有先被创建出来才可能进行变更。为了解决事件消费的顺序问题,我们需要为事件定义一个额外的属性——分组,属于同一个分组的事件,必须按顺序处理。
好了,我们现在已经定义了一个事件的基本属性,现在我们看一下系统的总体架构设计。
在系统中一共有三类角色,分别是Master服务器,Worker服务器和客户端,它们的职责如下:
- 客户端
客户端用于发布事件,即把事件保存到ZooKeeper中。
- Master服务器
Master服务器相当于集群中的管理员,它有两个主要的职责:一是监控Worker服务器,当Worker服务器发生故障下线、或者有新的Worker服务器加入集群时,重新对每个Worker进行任务分配,避免Worker单点问题的同时实现了系统的负载均衡。二是监控客户端提交的事件,如果事件对应的分组还没有分配给Worker的话,就将该分组分配给当前负载最小的Worker。
- Worker服务器
Worker服务器用来消费事件,客户端提交的事件会由Master服务器以分组为单位分配给各个Worker。
基于以上的构想,我们还需要对存放在ZooKeeper的信息加以一定的设计,来满足我们的需要。
在ZooKeeper中,request节点用来保存客户端提交的事件。前面我们提到,每个事件都有一个对应的分组,分组的作用是保证事件的顺序消费,所以我们不能简单的把所有的事件都放在request下面,而是把分组挂在request节点下面,事件本身挂在分组节点下面。这样,每个分组由一个Worker来顺序消费,一个worker可以处理多个分组,不同的分组可以被并行消费,取决于Master如何进行分配。这样,即解决了事件消费的顺序问题,也实现了负载的均衡。
对于Worker服务器,上线时会在workers节点下创建一个临时节点,同时监听该节点。节点的名称可以是任何能识别该Worker的标识(比如IP地址),节点的内容由Master负责更新,是一个事件分组的列表,表示分配给当前Worker处理的分组。Worker服务器在监听到自己的节点内容变化时,就读取节点内容获取分配给自己的分组,并监听request节点下对应的分组节点,当这些分组下有新的事件时,读取事件信息进行处理。
此外,我们需要一个master暂时节点来表示当前的Master服务器,成功创建master节点的成为当前的Master服务器,其它Master服务器则监听该节点,成为备用服务器。Master服务器监控request节点和workers节点。当request节点下产生了新的分组,就将该分组分配给当前负载最小的Worker服务器,即更新worker服务器的节点内容。当workers节点发生变化,说明有Worker服务器发生故障下线、或者有新的Worker服务器加入集群,这时Master服务器就重新分配所有的分组。
实现
既然是一个事件驱动框架,我们自然需要为不同类型的事件注册不同的订阅者,那么框架为我们提供了哪些接口呢?我们来看一下几个关键的类。
Event类是事件的封装实体,这个没什么好讲的。Client类的作用也很明显,客户端使用它来提交事件。重点看看Subscribers与BaseSubscriber两个接口。BaseSubscriber是订阅者接口,所有的订阅者都需要实现onEvent方法,当订阅的事件发生时,这个方法就会被调用,我们可以在这里实现业务逻辑,参数event表示当前的事件。而Subscribers接口设计的目的是需要我们提供一个从事件类型到订阅者的关系转换,在启动WorkServer时(start方法),我们需要提供这样一个实现。在事件发生时,WorkServer通过Subscribers得到观注该事件的订阅者,然后逐个调用它们的onEvent方法,下面是一段删减过的代码示例:
String json = this.getContent(group + Config.SEPARATOR + event, null); Event e = Event.fromJson(json); subscribers.getSubscribers(e.getType()).forEach((s) -> { s.onEvent(e); });
对于Master的实现,它在分配的算法上存在扩展点,所以我们在分配方面做了一个抽象,相关的类图如下。
Worker类是一个Worker服务器实例的抽象,包括它的标识(name属性)和当前分配给它处理的分组任务(group属性)。Situation类是集群当前状态的一个现状,或者说是快照吧,记录了哪些集群的配置情况(Configuration对象),当前一共有多少个分组(groups属性),当前有多少Worker服务器实例以及每个Worker的分配情况。当Master服务器监听到新的分组请求,或者发现Worker故障下线、或新的Worker加入集群时,就会分别通过调用Assigner的onNewGroup方法和onWorkerChange方法进行任务分配。框架本身提供了一个DefaultAssigner类,基于公平的原则实现的分配算法。这里还是存在一起扩展的余地的,比如最常见的基于权值的分配、基于Hash的分配等等。
目前存在哪些问题
- 系统的并行性是基于事件的分组来设计的,在实际场景中,不同类型的事件,在数量级上往往差别巨大。比如会员升级和会员下单的频率明显不一样。所以我们在考虑分组的粒度问题上,需要结合实际场景,尽量使得每个分组的事件产生频度不要相差太大,否则Worker之间的负载会不均衡。
- 这个限制来自于ZooKeeper,ZK建议每个节点的内容最好不要超过1M,也就是说我们的事件内容最大不应该超过这个限制。对象一个事件来说,这通常不是问题,但如果事件的内容确实很大,我们也可以只把事件的内容存在其它地方,ZK中仅仅中保留事件的标识。
- 提交到request分组下的事件节点,我们用的是PERSISTENT_SEQUENTIAL类型的节点。熟悉ZK的同学都知道connection_loss异常需要我们进行重试,而前一次请求我们并不知道是否已经成功。即使我们在重试前,把当前的事件查询出来检查一遍,也不能彻底解决问题,因为事件有可能已经被worker消费掉。所以实现上我们只是简单的进行了重试,这也意味着事件有可能会被重复消费,对于关键场景,我们需要在订阅者端对事件进行判重。
- 前面我们提到,当Worker故障下线、或新的Worker加入集群时,master会进行重新分配,这样就存在同一个分组被多个Worker同时处理的时间片断,虽然很短暂,但我们也不得不约定,Worker能够处理一个事件的前提是先删除这个节点,否则就会存在事件重复消费的情况。但如果在删除时发生了connection_less异常,并且前一次请求已经删除成功的话,重试的结果会是失败,这样当前Worker又不能直接处理该事件,这样又会发生事件丢失。
最后附上源码地址:https://github.com/OuYangLiang/ZK-eventdriven