流式系统如何引入Watermark支持:以Pravega和Flink为例
在流式计算的世界中,时间问题一直是困扰着业界的难点与痛点:如何能够更加精确地进行基于事件时间窗口的计算?Watermark 的概念应运而生。Watermark 试图将更加精确的时间参考引入流式计算,并取得了越来越多的流式平台的支持。Pravega 也不例外,在最近的版本更新中(v0.6),Pravega 已经加入了 Watermark 的完整支持。由于 Pravega 原生支持 Segment 级别的企业级动态缩放特性,在此基础上要实现 Watermark 并非易事。本文将按照“发现问题 - 解决问题”的线索,循序渐进地讨论 Watermark 机制在 Pravega 中的设计和实现,并对比 Flink 的实现。本文最早出自 Pravega 的官方博客。
1 动机
流式处理(Stream Processing)从广义上指的是从无界数据源注入数据并在注入的过程中进行数据处理的能力。这些数据可以是用户生成的数据,例如社交网络或其它在线应用;也可以是机器生成的数据,例如来自物联网和边缘应用的服务器遥测数据或传感器样本。
典型的流式数据处理应用通常按照数据产生的顺序依次处理数据。在实际应用中,由于以下原因,严格按照全序处理数据通常是无法实现的:
- 数据源本身就不是一个单一的元素,它可能由多个用户,服务器或者网关组成;
- 应用的内在设计也可能导致不同数据项目被乱序注入和处理。
因此,在 Pravega 和其它类似系统中,顺序都指的是数据注入的顺序,并且由“键”确定。“键”这一概念连结了数据流中的各个元素。
按生成顺序处理数据是流式处理最有意思的一面,因为这使得应用程序可以在不同事件中建立起一种临时的相关关系,尽管这种关系比较松散。例如,某个应用程序能够涉及这样的提问:在过去的一小时中有多少不同的用户登录了,或者在过去 的十分钟内有多少传感器报告了异常读数。为了实现并回答这些查询,应用程序必须能够为每一个报告周期生成相应的结果(第一个例子的报告周期是一小时,而第二个例子的报告周期是十分钟)。这些报告周期通常被称作时间窗口(Time Window)。
在数据生成时就进行数据处理使得应用程序可以在数据生成的同时就输出结果。对于有界数据集(不会新增数据),可以通过使用 map-reduce 对所有窗口并行地进行窗口聚合。而这对无界数据集(流)却并不可行,因为数据一直在动态持续增长。因此,对于持续生成的数据源,可以选择用 map-reduce 的方式周期性地处理数据集快照或增量(这将引入更长的处理时间),也可以用流的方式在数据注入的同时就进行处理。相对于周期性地处理,后者可以提供更低的端到端时延。
为了进行诸如窗口聚合之类的计算,首先必须拥有某种时间参考,并且使得每个数据元素(例如:消息,事件,记录等)都与一个时间值相关联。如果没有一个时间参考,应用程序就无法确定一个数据元素究竟属于哪个时间窗口。典型的用于讨论时间参考的时域包括事件时间(Event Time)和处理时间(Processing Time)。事件时间指的是数据源赋予事件的时间,通常用的是事件生成时的挂钟时间。处理时间用的是事件被进行数据处理时的时间作参考。某个事件所关联的时间要么是在应用程序从 Pravega 读取数据的时候被确定,要么是在事件被处理的时候确定。此外,我们还考虑注入时间(Ingestion Time),即进行注入的应用程序收到事件的事件。例如,在某个利用 Pravega 进行流式数据存储的应用程序中,注入时间就是事件被写入 Pravega Stream 的时间。图 1 展示了上述三个时域。
图 1 Pravega 中的时间
由于这三种时域各自将事件生命周期中的不同时间点与一个事件相关联,它们必然存在差异。当数据源在事件生成的同时就立刻进行发送时,事件时间和注入时间之间的差异通常较小。但是由于网络连接原因,也可能出现一些具有显著偏差的离群点。注入时间和处理时间之间的差异取决于注入过程和处理过程的实际发生时间。事实上,对于 Pravega 来说,这一差值可能会相当大,因为 Pravega 是一个存储系统,数据在被注入之后可能在任意时间之后才被应用程序处理。在 Pravega 中,我们将这种在任意长时间之前就已经注入的老数据称为历史数据(Historical Data)。
能够使用我们上述讨论过的时域之一将一个时间值关联到一个事件上还远远不够。应用程序的确可以从一个时间戳推断出某个事件属于哪个时间窗口,但它如何能知道它已经收到了某个时间窗口内的所有事件并且可以关闭当前窗口了呢?在处理时间是连续递增的假设下,关闭一个基于处理时间的窗口非常简单,但对于事件时间和注入时间就完全不同了。对于事件和处理时间,进行数据处理的应用程序需要知道它何时(即便只是估计)能够关闭一个给定的窗口并报告计算结果。当然,应用程序也可以选择永远不关闭时间窗口并且持续重复处理窗口内数据。但是,在某个时间点,应用程序总是需要调用和使用最终的计算结果,然后向前推进,这已经等价于关闭当前时间窗口了。
为了让应用程序能够对时间窗口结束进行断言,我们需要知道事件关联时间的下界,而这些下界就被称作 Watermark。Watermark w 保证所有时间戳小于 w 的事件都已经被读取或者处理了(究竟是读取还是处理的语义则要依赖上下文确定)。然而,迟到的事件(Late Event)总是有几率发生。如何处理和最小化迟到事件则依赖具体的应用程序实现。图 2 展示了 Watermark 的概念。
图 2 时间和 Watermark
为了计算基于时间窗口的聚合,我们需要能够将事件映射到窗口并且知晓何时能够关闭窗口(计算当前窗口内的聚合)。即便我们假设只有单一的事件序列,顺序赋值的方法也是行不通的,因为事件可以乱序出现。图中,事件 7 和事件 8 就出现了这种乱序。因此,对于每一个事件我们都需要一个时间参考,以便确定将它分配到哪个时间窗口。我们还需要知道何时能够关闭一个时间窗口,而 Watermark 正是这样一种抽象:通过提供时间下界允许窗口正确关闭。现实中,要提供严格的 Watermark 保证是极其困难的。分布式系统的异步本质使得为迟到事件提供强保证变得非常复杂。另外,从进度所关注的角度看,提前关闭时间窗口并允许一小部分迟到事件往往是一种较好的选择。这种选择通常是依赖具体应用程序的。
在这篇文章中,我们将会讨论 Pravega 新增的对事件时间和注入时间的支持。我们必须克服的关键难点之一就是如何在流式数据的 Segment 集合会因自动缩放机制而动态变化的情况下提供 Watermark 的支持。我们对 Pravega 的 Reader Group 加入了内部支持,以便简化与流式处理器的关联,例如 Apache Flink。我们用 Apache Flink 作为基于 Watermark 的流式处理器的一个典型例子,讨论 Flink 对 Watermark 的支持以及与 Pravega 的 Flink 连接器(Connector)的集成问题。我们还会对如何与任意应用程序集成进行总结,并根据我们对该特性现有的经验给出建议。
2 示例:Apache Flink
Apache Flink 是一个为流式和批式数据而设计的开源平台,而我们编写了一个连接器允许应用程序可以使用 Flink 处理 Pravega 的流式数据。Flink 由一个允许应用程序编写作业(Job)的编程模型和一个执行 Flink 程序的分布式运行时环境构成。在运行时,Flink 环境把一个程序映射成一个数据流,而这个数据流由一个或多个源(Source),一系列变换算子(Operator)以及一个或多个汇(Sink)组成。在本文关于 Watermark 的讨论中,源是最有意思的元素,因为正是由它利用 Pravega 的时间信息产生 Watermark。
Watermark 是 Apache Flink 中的核心概念。它允许一系列基于时间的计算,例如不同时域下的时间窗口:事件时间,注入时间和处理时间。在 Flink 中,它们被称作时间特征(Time Characteristics)。事件时间和注入时间在 Flink 中有着不同的定义。在 Flink 中,注入时间代表事件进入 Flink 数据流时的时间,而不是指事件被注入数据管道(例如写入 Pravega)的时间。事件时间代表应用程序赋予的时间值,它涵盖了由应用程序确定的任意形式的时间和 Watermark,包括在源端进行的基于 Pravega 传播的时间信息的赋值。因此,事件时间是 Flink 的时间特征,包含了 Pravega 所提供的事件时间和注入时间。图 3 展示了这些不同的时间特征以及与 Pravega 的区别。
图 3 Flink 和 Pravega 中的时间
为了确定在一个作业究竟使用何种时间特征,Flink 需要程序在执行环境中设置:
复制代码
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
当在 Flink 中使用事件时间时,必须对事件进行时间戳赋值,并且系统需要 Watermark 作为事件时间的进度指标量。这有两种实现方法:通过源直接进行或者通过时间戳赋值器(同时负责产生 Watermark)。时间戳赋值器是作业规格(Job Specification)的一部分,它必须在第一个使用时间的操作发生之前被指明(通常在源之后)。时间戳赋值器将覆写源直接生成的时间戳和 Watermark。
与 Pravega 最相关的选项就是让源赋值时间戳并且产生 Watermark。当使用这一选项时,我们可以使用本文所描述的方法,在 Pravega 连接器中加入时间戳赋值和 Watermark 生成的支持。支持事件时间的 Flink 源需要调用如下方法:
- SourceContext#collectWithTimestamp(T element, long timestamp): 从源中产生一个事件,并赋值时间戳。
- SourceContext#emitWatermark(Watermark mark): 产生 Watermark。
在接下来两个小节,我们将给出我们的设计,在 Pravega 中支持 Watermark。在我们讨论完设计与实现之后,我们将回头展示更多与连接器集成的细节。
3 难点
假设现在我们有一个简单的应用程序和一组产生事件的传感器,一个 Pravega 流,以及一个 Flink 作业。就目前的讨论而言,究竟这个作业在进行怎样的操作并不重要,但我们假设它正基于 Pravega 的流进行某种形式的时间窗口聚合,并且它需要知道时间窗口的边界。
如果传感器本身能够进行时间戳赋值,这样写入 Pravega 的事件都附加着时间戳信息,那么 Flink 的作业源就可以提取这些时间戳并具有某种时间进度的概念。尽管这是一个合法的方法,但这么做有两个严重的缺点:
- 对于一个给定的时间戳,我们并不知道是否还有一个具有相同时间戳的事件,因此我们无法推进 Watermark。
- 如果 Flink 源没有收到事件,它就不知道究竟是事件时间仍在向前推进而仅仅只是没有新事件产生,还是系统正在经历异步过程(例如事件被任意延迟)。
通常说来,迟到事件不可能完全避免,因为有太多情况可能导致迟到事件,例如连接或者节点不可用。但是,源和应用程序一般都有事件时间的相关信息(例如自身的时钟),并且在理想情况下我们应当传播这些信息以便 Flink 源可以更加精确地推进事件时间。
现在我们看一下如何用 Pravega 实现这些。假设我们周期性地向 Pravega 流的字节序列里写入标记来表征时间进度。这些标记指明所有事件时间早于这个标记的所有事件都已经写入了。这么做会带来三个问题:
- 具有多个 Writer 的 Pravega 流需要协调标记的写入,保证它们反映出所有 Writer 的状态。
- Pravega 流通常都不是一个简单的字节序列,它一般由多个并行的 Segment 构成。
- Segment 的内部实现是一组字节序列,因此将标记这种控制数据与应用程序数据混合存储在一起并不是一个好方法。
为了解决问题 1,我们需要某种机制来参考所有已知的 Writer,而问题 2 要求标记能够反映跨 Segment 的位置。对于问题 3,我们需要在外部维护标记。图 4 展示了一个往 Pravega 注入事件并进行处理的应用程序的时间流。
图 4 Pravega 中的时间流
参考所有 Writer 并不是一件简单的事,因为 Writer 可以在线也可以离线。我们选择的任何机制都必须考虑 Writer 集合的这种动态性。在外部保存标记的同时还要能够将它们映射到跨 Segment 的位置,我们需要某种数据结构来维护这种 Segment 到偏移量的映射关系,并且我们需要在流数据之外维护这些标记,例如在一个单独的 Segment 中。
现在,还有一个问题需要解决:空闲 Reader。Reader Group 协调对组内 Reader 的 Segment 分配。假设一个给定的 Reader 没有被分配到 Segment。这种场景是可能存在的,例如,当组内 Reader 的数量大于 Segment 数量的时候。在这种情况下,一个没有被分配到 Segment 的 Reader 如何能够知道事件时间在向前推进?为了让空闲 Reader 在没有被分配到 Segment 的时候也能够产生 Watermark,我们通过 Reader Group 的状态同步器(State Synchronizer)来协调事件时间的推进。这种协调使得所有 Reader 可以不依赖 Segment 的分配而推进事件时间。
到目前为止,我们一直在讨论时间却始终没有说明时间参考究竟从何而来。这是有意而为之:我们不想限制应用程序使用特定的时间参考,或者限定这种时间参考何时开始存在。这种时间参考可以是挂钟时间,非常接近数据生成时的当前时间,也可以是从文件读取事件时的任意过去时间点。我们不想试图规定或强制任何对时间赋值的方法,尤其是对于事件时间,我们希望应用程序可以根据自身的设计使用任何有意义的方法设置这个值。
在接下来的几个小节中,我们会详述我们的设计和实现。许多我们已经讨论过的抽象概念都会在余下的章节中具现化。
4 Pravega 对 Watermark 的支持
Pravega 的 Watermark 机制由三个主要部分组成,如提案文档所述:获取时间,时间戳聚合以及时间窗口的获取。
4.1 获取时间
首先是 EventStreamWriter 上的一个 API,用于记录时间。这允许一个进行数据写入的应用程序向 Pravega 表明当前正在写入的数据所对应的时间。
复制代码
EventStreamWriter<EventType> writer = clientFactory.createEventWriter(stream, serializer, EventWriterConfig.builder().build()); //... write events ... writer.noteTime(currentTime);
这里,“noteTime ”API 可以被周期性地调用,指明所有已经写入的事件都发生在某个时间之前。
这个 API 的结构使得那些不关心 Watermark 的应用程序不必额外做任何事情。此外,它还允许应用程序定义自己的时间概念。
类似地,对于事务性 Writer,在事务的 commit() 方法上有一个可选参数,允许应用程序指明当前事务所写入事件的时间。
复制代码
Transaction<EventType> txn = writer.beginTxn();//... write events to transaction. txn.commit(txnTimestamp);
noteTime() 方法和 commit() 方法都接受一个时间戳参数,而并非直接查询系统时钟。这允许用事件时间的方式定义时间。
如果正在进行事件写入的进程并不是事件的真正生产者,例如事件来自 Web 前端,移动 App,或者嵌入式系统,那么事件的发生与写入之间一定存在时间差。这同样适用于事件本身就是从某个上游源头导出的场景。例如,从某个流读取数据,用某种方式进行数据处理(比如聚合),然后再将其写入另一个流,这是非常常见的应用。
如果你的应用程序不需要定义时间,那么可以直接使用注入时间:有一个名为 automaticallyNoteTime 配置参数可以提供这一行为。你可以这样配置:
复制代码
EventStreamWriter<EventType> writer = clientFactory.createEventWriter(stream, serializer, EventWriterConfig.builder().automaticallyNoteTime(true).build());
当这一选项开启时,就无需再调用 noteTime() 方法了。
一旦获取了时间,流上的所有 Writer 都必须形成一个统一的视图。为了进行这种聚合,客户端在内部会将时间值与 Writer 的当前位置进行组合,并将信息发送给控制器(Controller)。
4.2 从多个 Writer 进行时间戳聚合
控制器从所有的 Writer 接收这些时间戳与位置信息。控制器这样做信息聚合:它从一个流上的所有 Writer 收集时间戳并输出一个 Stream Cut,这个 Stream Cut 大于等于所有 Writer 当前位置的最大值,同时还输出所有 Writer 报告时间的最小值作为时间戳。如下:
图 5 聚合 Writer 的时间戳
通过这样的方式聚合时间戳,当一个 Reader 的当前位置超过一个给定 Stream Cut 的时候就一定能保证已经读取了所有对应的事件。
当然,Writer 可以在线也可以离线。很自然的,如果一个 Writer 关闭并且不再上线,我们不希望一直持有它的递增时间信息。为了排除这种情况,流上有一个名为 timestampAggregationTimeout 的配置参数。这一配置项指明当超过多长时间没有收到一个 Writer 的信息后,就把它排除在时间窗口计算之外。
为了让 Reader 可以读取这些聚合后的信息,控制器将聚合后的时间和 Stream Cut 信息写入一个特殊的 Segment。这个 Segment 在 Pravega 内部被称为 Mark Segment。Reader 可以从这个特殊的 Segment 读取相应的信息来确定它们在流中的位置。
4.3 Reader 获取时间窗口
最终,所有的 Reader 协调它们各自的位置信息,得到一个 Stream Cut 形式的组合位置信息。这有一点难理解,因为为了知道 Reader 相对于 Mark Segment 中所记录的 Stream Cut 的位置,Reader 必须首先生成一个聚合后的 Stream Cut。这需要 Reader Group 中所有 Reader 共同协作。我们是这样实现的:让每一个 Reader 都把它们的位置信息记录在一个状态同步器中。
一旦获取了一个位置信息,接下来就需要对它进行比较。事实上,比较的结果并不是一个单一的数值。例如,考虑如下 Stream Cut 上的一个 Reader Group:
图 6 一个时间上下界分别为 T5 和 T2 的 Stream Cut
在这个例子中,Reader Group 部分超越某个时间值,但又部分落后于它。如果你从 Watermark 的设计初衷考虑,这一切就都说得通了。数据在多个主机上被并行处理,我们想要确定这样一个时间点:在该时间点之前的所有事件都已经被处理了。
正因为如此,Reader 收到的是一个 TimeWindow 数据结构而不是一个简单的时间数值。这正是 Reader 的分布区间。在上述例子中,时间上下界分别为 T5 和 T2。这一时间窗口可以通过调用如下方法获取:
复制代码
TimeWindow window = reader.getTimeWindow();
在这一过程中始终保持的不变量是,一个 Reader 得到的时间下界意味着所有早于这个时间点的事件都一定已经被读取了。还有一些极端的例子需要注意。
- Reader Group 可能位于当前流第一次记录的时间戳之前,在这种情况下,时间戳的下界无法准确定义。唯一可以确定的是,Reader Group 处于第一个时间戳之前。
- Reader Group 可能位于控制器所记录的最后一个标记之后。例如,如果一个 Reader Group 正处于流的尾端并且消费速度紧跟注入数据,那么它很有可能在控制器聚合时间之前就完成了事件处理。此时,当应用程序调用 getTimeWindow() 方法时,返回的 TimeWindow 结构中,upperTimeBound 成员可能为空值。类似地,lowerTimeBound 成员也可能滞后于 Reader 的实际位置,因为它必须等待时间信息进行聚合操作。
- TimeWindow 结构是基于 Reader 当前已经读取的位置而不是应用程序处理的位置(因为 Pravega 根本无从知晓这一信息)。所以,如果应用程序由于 Reader 死亡而调用了 readerOffline() 方法指明需要重新处理事件,那么 TimeWindow 可能倒退以便反映出某些事件需要被重新处理,因为在 Reader 死亡的过程中,这些正在被处理的事件已经丢失了。
4.4 与处理逻辑的联系
在 EventStreamReader 接口上,getTimeWindow() 方法返回一个 TimeWindow 对象。TimeWindow 对象提供了时间的上界和下界。
这是一个基于拉取(Pull)而不是推送(Push)的模型,也就是说我们可以假设往流中注入一些“伪事件”。这一模型有如下优点:它无需强制为每一个流都处理时间,它允许时间在一个没有任何事件的流上向前推进,但最重要的是它为 TimeWindow 的计算频率提供了灵活性。
TimeWindow 反映了流上的当前位置,因此,如果需要的话,可以在每次调用完 readNextEvent() 方法之后都调用它,或者也可以周期性地调用它以便将事件按窗口分组。
4.5 Flink 连接器的示例
在 Pravega 的 Flink 连接器中就有这样一个例子,实现了 TimestampsAndPeriodicWatermarksOperator 接口:
复制代码
@Override public void onProcessingTime(long timestamp) throws Exception { // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
此处,连接器获取时间窗口,如果条件满足,则推进 Flink 的 Watermark,生成新的 Watermark 并调度任务在一个可配的时间间隔后重新运行。
由于这段逻辑是在连接器上实现的,所有使用 Pravega 的 Flink 应用程序都可以通过使用标准 Flink 的 API 享受到基于事件时间或注入时间的 Watermark 的好处。
5 总结
处理流的尾部数据和历史数据是 Pravega 的两大组成特性。Pravega 存储流式数据并使用统一的 API 允许应用程序在数据可用时立即处理或者在将来任意时间处理。为了结果的一致性,流式数据需要有一个时间参考以便使得结果与流式数据何时被处理无关,并且这也绑定了时间允许进行时间窗口计算,这是非常关键的一点。正是流式数据的这种对时间信息的需求使得我们对 Pravega 加入了 Watermark 的支持。
我们对 Watermark 的支持由以下几部分组成:将时间戳关联到 Pravega 的写入数据上,根据时间戳生成表征位置信息的 Stream Cut,以及通过 Reader 对外暴露时间信息以便允许应用程序生成 Watermark。某个 Reader 得到的时间信息是一个根据各 Reader 的位置生成的跨 Reader 时间范围。这个时间范围给出了所有 Reader 已经读取数据的下界以及在 Reader Group 上的分布跨度。
本文的方法是一个通用方法,并且支持任意应用程序生成单调递增的时间戳。我们选择 Apache Flink 作为首个集成对象,因为它对窗口聚合和 Watermark 具有高级支持。我们在 Pravega 的 Flink 连接器上加入了 Flink 支持,使得使用 Pravega 的 Flink 作业可以从 Watermark 中获益。我们期待未来的 Pravega 连接器可以提供类似的支持,并且独立的应用程序可以自己实现这种支持,因为本文已经展示了使用该 API 所需加入的逻辑是非常简单直白的。
References
[1] T. Kaitchuck, “Pravega Watermarking Support,” [Online]. Available: http://blog.pravega.io/2019/11/08/pravega-watermarking-support/.
[2] J. Manyika, R. Dobbs, M. Chui, J. Bughin, P. Bisson and J. Woetzel, “The Internet of Things: Mapping the value beyond the hype,” McKinsey Global Institute, McKinsey & Company, 2015.
[3] T. Akidau, R. Bradshaw, C. Chambers, S. Chernyak, R. J. Fernández-Moctezuma, R. Lax, S. McVeety, D. Mills, F. Perry, E. Schmidt and S. Whittle, “The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing,” in Proceedings of the VLDB Endowment, Kohala Coast, Hawaii, 2015.
[4] “Apache Flink,” [Online]. Available: https://flink.apache.org.
[5] “Pravega Connector for Flink,” [Online]. Available: https://github.com/pravega/flink-connectors .
[6] “Flink Event Time,” [Online]. Available: https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html.
[7] “Generating Timestamps / Watermarks,” [Online]. Available: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html
[8] F. Junqueira, “Streams In and out of Pravega,” [Online]. Available: http://blog.pravega.io/2018/02/12/streams-in-and-out-of-pravega/ .
[9] “PDP-33 Watermarking,” [Online]. Available: https://github.com/pravega/pravega/wiki/PDP-33:-Watermarking.