Netflix是如何实现每秒200万次的数据处理?

Netflix 是如何实现每秒 200 万次的数据处理,并查询超过 1.5 万亿行的数据?

在推动技术创新升级的同时,还要确保 Netflix 始终如一的良好体验,这并非易事。

如何才能确保更新不会影响到用户呢?如果确保我们的改进是可度量的呢?Netflix 使用来自回放设备的实时日志作为事件源来获得度量,以便理解和量化用户设备浏览和回放的流畅度。

Netflix是如何实现每秒200万次的数据处理?

一旦有了这些度量,我们就把它们输入数据库。每一项指标都附有与所使用设备类型相关的匿名细节,例如,该设备是智能电视、iPad 还是 Android 手机。这样,我们就可以对设备进行分类,并从不同的方面来查看数据。同样,我们还能够只隔离影响特定群体的问题,如应用的版本、特定类型的设备或特定国家。

这些聚合数据可以立即用于查询,可以通过仪表板查询,也可以通过即席查询。这些指标还会持续检查报警信号,比如新版本是否会影响某些用户或设备的回放或浏览。这些检查用于通知负责的团队,让他们可以尽快处理问题。

在软件更新期间,我们为一部分用户启用新版本,并使用这些实时指标来比较新版本与旧版本的性能。在度量中,如果有任何不合适,我们可以中止更新并将那些已获得新版本的用户恢复到以前的版本。

由于这些数据的处理速度超过每秒 200 万次,所以将其存入一个可以快速查询的数据库非常困难。我们需要足够的数据维数,以便能够有效地隔离问题,如此一来,我们每天生成超过 1150 亿行数据。在 Netflix,我们利用 Apache Druid 帮助我们在这种规模下解决这一挑战。

1. Druid

Apache Druid 是一个高性能的实时分析数据库。它是针对特别注重快速查询和摄取的工作流而设计。Druid 特别适合于即时的数据可视化、即席查询、操作分析和高并发处理。——druid.io

因此,Druid 非常适合我们的用例,事件数据摄取率很高,而且具有高基数(high cardinality)和快速查询需求。

Druid 不是一个关系型数据库,但是一些概念是可以转化的。我们有数据源,而不是表。与关系型数据库一样,有表示为列的数据逻辑分组。与关系型数据库不同的是,没有连接的概念。因此,我们需要确保在每个数据源中都包含希望的筛选或分组的列。

数据源中主要有三种列——时间、维度和度量。

Druid 中的一切都有时间标记。每个数据源都有一个时间戳列,这是主要的分区机制。维度是可用于筛选、查询或分组的值。度量是可以聚合的值,并且几乎总是数值。

通过移除执行连接的能力,并假设数据都有时间戳,Druid 可以在存储、分发和查询数据方面做一些优化,这样我们就可以将数据源扩展到数万亿行,并且仍然可以实现查询响应时间在 10 毫秒以内。

为了达到这种程度的可扩展性,Druid 把存储的数据分成时间块。时间块的长度是可配置的。可以根据数据和用例选择适当的区间。对于数据和用例,我们使用 1 小时的时间块。时间块中的数据存储在一个或多个 段 中。每个段包含所有属于这个时间块的数据行,时间块由它的时间戳列决定。段的大小可以配置为行数上限或段文件的总大小。

Netflix是如何实现每秒200万次的数据处理?

在查询数据时,Druid 将查询发送到集群中所有那些拥有的段所属的时间块在查询范围内的节点。在将中间结果发送回查询代理节点之前,每个节点都并行地针对其持有的数据处理查询。在将结果集发送回客户端之前,代理将执行最后的合并和聚合。

Netflix是如何实现每秒200万次的数据处理?

2. 摄取

这个数据库的数据插入是实时的,不是将单个记录插入到数据源中,而是从 Kafka 流读取事件(就是我们的度量)。每个数据源使用一个主题。在 Druid 中,我们使用 Kafka 索引任务,它创建了多个分布在实时节点(中间管理器)上的索引工作器。

这些索引器都订阅主题,并从流中读取其事件。索引器根据摄取规范从事件消息中提取值,并将创建的行累积到内存中。一旦创建了一行,就可以查询它。对于索引器正在填充的段的时间块进行查询,将由索引器本身提供服务。由于索引任务本质上是执行两项工作,即摄取和处理查询,所以及时将数据发送到历史节点,以更优化的方式将查询工作卸载给它们是很重要的。

Druid 可以在摄取时汇总数据,以尽量减少需要存储的原始数据量。Rollup 是一种汇总或预聚合的形式。在某些情况下,汇总数据可以极大地减少需要存储的数据的大小,可能会减少行数数量级。然而,这种存储减少是有代价的:我们失去了查询单个事件的能力,只能在预定义的查询粒度上进行查询。对于我们的用例,我们选择了 1 分钟的查询粒度。

在摄取期间,如果任何行具有相同的维度,并且它们的时间戳在同一分钟内(我们的查询粒度),则将这些行汇总。这意味着,通过将所有度量值相加合并行并增加计数器,我们就可以知道有多少事件对这一行的值有贡献。这种形式的 Rollup 可以显著地减少数据库中的行数,从而加快查询速度。

一旦累积的行数达到某个阈值,或者段打开的时间太长,这些行就被写入段文件并被卸载到深层存储中。然后,索引器通知协调器片段已经做好准备,以便协调器可以告诉一个或多个历史节点来加载它。一旦段被成功地加载到历史节点中,它就会从索引器中卸载,任何针对该数据的查询现在都将由历史节点提供服务。

3. 数据管理

可以想象,随着维度基数的增加,在同一分钟内发生相同事件的可能性会降低。管理基数(以便汇总)是实现良好查询性能的强大手段。

为了达到我们需要的摄取速度,可以运行许多索引器实例。即使索引任务使用 Rollup 合并相同的行,在一个索引任务的同一个实例中获得这些相同行的机会也非常低。为了解决这个问题并实现尽可能好的 Rollup,我们会在给定时间块的所有段都传递给历史节点之后运行一个任务。

预定的压缩任务从深度存储中获取时间块的所有段,并运行 map/reduce 作业来重新创建段并实现完美的汇总。然后,由历史节点加载和发布新的段,替换和取代原来的、未充分汇总的段。在我们的例子中,通过使用这个额外的压缩任务,行数减少到了 1/2。

知道何时收到给定时间块的所有事件并不是一件小事。Kafka 上可能有延迟到达的数据,或者索引器将片段传递给历史节点可能需要花些时间。为了解决这个问题,我们会在运行压缩之前执行一些限制和检查。

首先,我们丢弃所有非常晚才到达的数据。我们认为,这些数据在我们的实时系统已经过时。这设置了数据延迟的界限。其次,压缩任务被延迟调度,这使得段有足够的时间可以卸载到正常流中的历史节点。最后,当给定时间块的预定压缩任务启动时,它将查询段元数据,检查是否仍然有相关的段被写入或传递。如果有,它将等待几分钟后再试一次。这将确保所有数据都由压缩作业处理。

没有这些措施,我们发现有时会丢失数据。在开始压缩时仍有写入的段将被新压缩的段所覆盖,这些段具有更高的版本,因此会优先。这可以有效地删除包含在那些尚未完成传递的段中的数据。

4. 查询

Druid 支持两种查询语言:Druid SQL 和原生查询。在底层,Druid SQL 查询会被转换成原生查询。原生查询以 JSON 格式提交给 REST 端点,这是我们使用的主要机制。

我们集群的大多数查询都是由自定义的内部工具(如仪表板和预警系统)生成的。这些系统最初是为了与我们内部开发的开源时序数据库 Atlas 一起工作而设计的。因此,这些工具使用 Atlas Stack 查询语言。

为了加速查询 Druid 的采用,并实现现有工具的重用,我们添加了一个翻译层来接收 Atlas 查询,将它们重写为 Druid 查询,发送查询并将结果重新格式化为 Atlas 结果。这个抽象层允许现有的工具按原样使用,用户要访问我们 Druid 数据存储中的数据也不需要额外学习。

5. 调优

在调整集群节点的配置时,我们以较高的速度运行一系列可重复和可预测的查询,从而获得每个给定配置的响应时间和查询吞吐量的基准。这些查询在设计时隔离了集群的各个部分,以检查查询性能方面的改善或退化。

例如,我们对最近的数据进行有针对性的查询,以便只对 Middle Manager 进行查询。同样,对于较长的时间段但较旧的数据,我们只查询历史节点来测试缓存配置。同样,使用按高基数维分组的查询检查结果合并受到了什么影响。我们继续调整和运行这些基准测试,直到我们对查询性能满意为止。

在这些测试中,我们发现调整缓冲区的大小、线程的数量、查询队列的长度和分配给查询缓存的内存对查询性能有实际的影响。然而,压缩作业的引入对查询性能有更重要的影响,它会将未充分汇总的段重新压缩,实现完美汇总。

我们还发现,在历史节点上启用缓存非常有好处,而在代理节点上启用缓存效果则不是很明显。因此,我们不在代理上使用缓存。这可能是由我们的用例造成的,但是几乎每一次查询都会错过代理上的缓存,这可能是因为查询通常包含最新的数据,这些数据不在任何缓存中,因为一直有数据到达。

6. 小结

针对我们的用例和数据率,经过多次优化调整,Druid 已经被证明具备我们最初希望的能力。