字节跳动自研强一致在线 KV &表格存储实践 - 上篇
背景
互联网产品中存在很多种类的数据,不同种类的数据对于存储系统的一致性,可用性,扩展性的要求是不同的。比如,金融、账号相关的数据对一致性要求比较高,社交类数据例如点赞对可用性要求比较高。还有一些大规模元数据存储场景,例如对象存储的索引层数据,对一致性,扩展性和可用性要求都比较高,这就需要底层存储系统在能够保证数据强一致的同时,也具有良好的扩展性。在数据模型上,有些数据比如关系,KV 模型足够用;有些数据比如钱包、账号可能又需要更丰富的数据模型,比如表格。
分布式存储系统对数据分区一般有两种方式:Hash 分区和 Range 分区。Hash 分区对每条数据算一个哈希值,映射到一个逻辑分区上,然后通过另外一层映射将逻辑分区映射到具体的机器上,很多数据库中间件、缓存中间件都是这样做的。这种方式的优点是数据写入一般不会出现热点,缺点是原本连续的数据经过 Hash 后散落在不同的分区上变成了无序的,那么,如果需要扫描一个范围的数据,需要把所有的分区都扫描一遍。
相比而言,Range 分区对数据进行范围分区,连续的数据是存储在一起的,可以按需对相邻的分区进行合并,或者中间切一刀将一个分区一分为二。业界典型的系统像 HBase。这种分区方式的缺点是一、对于追加写处理不友好,因为请求都会打到最后一个分片,使得最后一个分片成为瓶颈。优点是更容易处理热点问题,当一个分区过热的时候,可以切分开,迁移到其他的空闲机器上。
从实际业务使用的角度来说,提供数据强一致性能够大大减小业务的负担。另外 Range 分区能够支持更丰富的访问模式,使用起来更加灵活。基于这些考虑,我们使用 C++ 自研了一套基于 Range 分区的强一致 KV 存储系统 ByteKV,并在其上封装一层表格接口以提供更为丰富的数据模型。
架构介绍
系统组件
整个系统主要分为 5 个组件:SQLProxy, KVProxy, KVClient, KVMaster 和 PartitionServer。其中,SQLProxy 用于接入 SQL 请求,KVProxy 用于接入 KV 请求,他们都通过 KVClient 来访问集群。KVClient 负责和 KVMaster、PartitionServer 交互,KVClient 从 KVMaster 获取全局时间戳和副本位置等信息,然后访问相应的 PartitionServer 进行数据读写。PartitionServer 负责存储用户数据,KVMaster 负责将整个集群的数据在 PartitionServer 之间调度。
集群中数据会按照 range 切分为很多 Partition,每个 Partition 有多个副本,副本之间通过 Raft 来保证一致性。这些副本分布在所有的 PartitionServer 中,每个 PartitionServer 会存储多个 Partition 的副本,KVMaster 负责把所有副本均匀的放置在各个 PartitionServer 中。各个 PartitionServer 会定期汇报自身存储的副本的信息给 KVMaster,从而 KVMaster 有全局的副本位置信息。Proxy 接到 SDK 请求后,会访问 KVMaster 拿到副本位置信息,然后将请求路由到具体的 PartitionServer,同时 Proxy 会缓存一部分副本位置信息以便于后续快速访问。由于副本会在 PartitionServer 之间调度,故 Proxy 缓存的信息可能是过期的,这时当 PartitionServer 给 Proxy 回应副本位置已经变更后,Proxy 会重新向 KVMaster 请求副本位置信息。
分层结构
如上图所示是 ByteKV 的分层结构。
接口层对用户提供 KV SDK 和 SQL SDK,其中 KV SDK 提供简单的 KV 接口,SQL SDK 提供更加丰富的 SQL 接口,满足不同业务的需求。
事务层提供全局一致的快照隔离级别(Snapshot Isolation),通过全局时间戳和两阶段提交保证事务的 ACID 属性。
弹性伸缩层通过 Partition 的自动分裂合并以及 KVMaster 的多种调度策略,提供了很强的水平扩展能力,能够适应业务不同时期的资源需求。
一致性协议层通过自研的 ByteRaft 组件,保证数据的强一致性,并且提供多种部署方案,适应不同的资源分布情况。
存储引擎层采用业界成熟的解决方案 RocksDB,满足前期快速迭代的需求。并且结合系统未来的演进需要,设计了自研的专用存储引擎 BlockDB。
空间管理层负责管理系统的存储空间,数据既可以存储在物理机的本地磁盘,也可以接入其他的共享存储进行统一管理。
对外接口
KV 接口
ByteKV 对外提供两层抽象,首先是 namespace,其次是 table,一个 namespace 可以有多个 table。具体到一个 table,支持单条记录的 Put、Delete 和 Get 语义。其中 Put 支持 CAS 语义,仅在满足某种条件时才写入这条记录,如仅在当前 key 不存在的情况下才写入这条记录,或者仅在当前记录为某个版本的情况下才写入这条记录等,同时还支持 TTL 语义。Delete 也类似。
除了这些基本的接口外,还提供多条记录的原子性写入接口 WriteBatch, 分布式一致性快照读 MultiGet, 非事务性写入 MultiWrite 以及扫描一段区间的数据 Scan 等高级接口。WriteBatch 可以提供原子性保证,即所有写入要么全部成功要么全部失败,而 MultiWrite 不提供原子性保证,能写成功多少写成功多少。MultiGet 提供的是分布式一致性快照读的语义:MultiGet 不会读到其他已提交事务的部分修改。Scan 也实现了一致性快照读的语义,并且支持了前缀扫描,逆序扫描等功能。
表格接口
表格接口在 KV 的基础上提供了更加丰富的单表操作语义。用户可以使用基本的 Insert,Update,Delete,Select SQL 语句来读写数据,可以在 Query 中使用过滤(Where/Having)排序(OrderBy),分组(GroupBy),聚合(Count/Max/Min/Avg)等子句。同时在 SDK 端我们也提供了 ORM 库,方便用户的业务逻辑实现。
关键技术
以下我们将详细介绍 Raft、存储引擎、分布式事务、分区自动分裂和合并、负载均衡这几个技术点。(其中 Raft、存储引擎 会在本篇详述,其他几个技术点会在下篇详述)
自研 ByteRaft
作为一款分布式系统,容灾能力是不可或缺的。冗余副本是最有效的容灾方式,但是它涉及到多个副本间的一致性问题。ByteKV 采用 Raft[1]作为底层复制算法来维护多个副本间的一致性。由于 ByteKV 采用 Range 分片,每个分片对应一个 Raft 复制组,一个集群中会存在非常多的 Raft Group。组织、协调好 Raft Group 组之间的资源利用关系,对实现一个高性能的存储系统至关重要;同时在正确实现 Raft 算法基础上,灵活地为上层提供技术支持,能够有效降低设计难度。因此我们在参考了业界优秀实现的基础上,开发了一款 C++ 的 Multi-Raft 算法库 ByteRaft。
日志复制是 Raft 算法的最基本能力,ByteKV 将所有用户写入操作编码成 RedoLog,并通过 Raft Leader 同步给所有副本;每个副本通过回放具有相同序列的 RedoLog,保证了一致性。有时服务 ByteKV 的机器可能因为硬件故障、掉电等原因发生宕机,只要集群中仍然有多数副本存活,Raft 算法就能在短时间内自动发起选举,选出新的 Leader 进行服务。最重要的是,动态成员变更也被 Raft 算法所支持,它为 ByteKV 的副本调度提供了基础支持。ByteKV 的 KVMaster 会对集群中不同机器的资源利用率进行统计汇总,并通过加减副本的方式,实现了数据的迁移和负载均衡;此外,KVMaster 还定期检查机器状态,将长时间宕机的副本,从原有的复制组中摘除。
ByteRaft 在原有 Raft 算法的基础上,做了很多的工程优化。如何有效整合不同 Raft Group 之间的资源利用,是实现有效的 Multi-Raft 算法的关键。ByteRaft 在各种 IO 操作路径上做了请求合并,将小粒度的 IO 请求合并为大块的请求,使其开销与单 Raft Group 无异;同时多个 Raft Group 可以横向扩展,以充分利用 CPU 的计算和 IO 带宽资源。ByteRaft 网络采用 Pipeline 模式,只要网络通畅,就按照最大的能力进行日志复制;同时 ByteRaft 内置了乱序队列,以解决网络、RPC 框架不保证数据包顺序的问题。ByteRaft 会将即将用到的日志都保留在内存中,这个特性能够减少非常多不必要的 IO 开销,同时降低同步延迟。ByteRaft 不单单作为一个共识算法库,还提供了一整套的解决方案,方便各类场景快速接入,因此除了 ByteKV 使用外,还被字节内部的多个存储系统使用。
除了上述功能外,ByteRaft 还为一些其他企业场景提供了技术支持。
Learner
数据同步是存储系统不可或缺的能力。ByteKV 提供了一款事务粒度的数据订阅方案。这种方案保证数据订阅按事务的提交顺序产生,但不可避免的导致扩展性受限。在字节内部,部分场景的数据同步并不需要这么强的日志顺序保证,为此 ByteRaft 提供了 Learner 支持,我们在 Learner 的基础上设计了一款松散的按 Key 有序复制的同步组件。
同时,由于 Learner 不参与日志提交的特性,允许一个新的成员作为 Learner 加入 Raft Group,等到日志差距不大时再提升为正常的跟随者。这个过程可以使得 KVMaster 的调度过程更为平滑,不会降低集群可用性。
Witness
在字节内部,ByteKV 的主要部署场景为三中心五副本,这样能够保证在单机房故障时集群仍然能够提供服务,但是这种方式对机器数量要求比较大,另外有些业务场景只能提供两机房部署。因此需要一种不降低集群可用性的方案来降低成本。Witness 作为一个只投票不保存数据的成员,它对机器的资源需求较小,因此 ByteRaft 提供了 Witness 功能。
有了 Witness,就可以将传统的五副本部署场景中的一个副本替换为 Witness,在没有降低可用性的同时,节省出了部分机器资源。另外一些只有两机房的场景中,也可以通过租用少量的第三方云服务,部署上 Witness 来提供和三中心五副本对等的容灾能力。更极端的例子场景,比如业务有主备机房的场景下,可以通过增加 Witness 改变多数派在主备机房的分布情况,如果主备机房隔离,少数派的机房可以移除 Witness 降低 quorum 数目从而恢复服务。
存储引擎
RocksDB
和目前大多数存储系统一样,我们也采用 RocksDB 作为单机存储引擎。RocksDB 作为一个通用的存储引擎,提供了不错的性能和稳定性。RocksDB 除了提供基础的读写接口以外,还提供了丰富的选项和功能,以满足各种各样的业务场景。然而在实际生产实践中,要把 RocksDB 用好也不是一件简单的事情,所以这里我们给大家分享一些经验。
Table Properties
Table Properties 是我们用得比较多的一个功能。RocksDB 本身提供一些内置的 SST 统计信息,并且支持用户自定义的 Table Properties Collector,用于在 Flush/Compaction 过程中收集统计信息。具体来说,我们利用 Table Properties 解决了以下几个问题:
- 我们的系统是采用 Range 切分数据的,当一个 Range 的数据大小超过某个阈值,这个 Range 会被分裂。这里就涉及到分裂点如何选取的问题。一个简单的办法是把这个 Range 的数据扫一遍,根据数据大小找到一个中点作为分裂点,但是这样 IO 开销会比较大。所以我们通过 Table Properties Collector 对数据进行采样,每隔一定的数据条数或者大小记录一个采样点,那么分裂的时候只需要根据这些采样点来估算出一个分裂点即可。
- 多版本数据进行启发式垃圾回收的过程,也是通过 Table Properties 的采样来实现的。在存储引擎中,一条用户数据可能对应有一条或多条不同版本的数据。我们在 Table Properties Collector 中采集了版本数据的条数和用户数据的条数。在垃圾回收的过程中,如果一个 Range 包含的版本数据的条数和用户数据的条数差不多,我们可以认为大部分用户数据只有一个版本,那么就可以选择跳过这个 Range 的垃圾回收。另外,垃圾回收除了要考虑多版本以外,还需要考虑 TTL 的问题,那么在不扫描数据的情况下如何知道一个 Range 是否包含已经过期的 TTL 数据呢?同样是在 Table Properties Collector 中,我们计算出每条数据的过期时间,然后以百分比的形式记录不同过期时间的数据条数。那么,在垃圾回收的过程中,给定一个时间戳,我们就能够估算出某一个 Range 里面包含了多少已经过期的数据了。
- 虽然 RocksDB 提供了一些参数能够让我们根据不同的业务场景对 compaction 的策略进行调整,比如 compaction 的优先级等,但是实际上业务类型多种多样,很难通过一套单一的配置能够满足所有的场景。这时候其实我们也可以根据统计信息来对 compaction 进行一定的“干预”。比方说有的数据区间经常有频繁的删除操作,会留下大量的 tombstone。如果这些 tombstone 不能被快速的 compaction 清除掉,会对读性能造成很大,并且相应的空间也不能释放。针对这个问题,我们会在上层根据统计信息(比如垃圾数据比例)及时发现并主动触发 compaction 来及时处理。
遇到的问题和解决办法
除了上面提到的几个用法以外,这里我们再给大家分享 RocksDB 使用过程中可能遇到的一些坑和解决办法:
- 你是否遇到过数据越删越多或者已经删除了很多数据但是空间长时间不能释放的问题呢?我们知道 RocksDB 的删除操作其实只是写入了一个 tombstone 标记,而这个标记往往只有被 compact 到最底层才能被丢掉的。所以这里的问题很可能是由于层数过多或者每一层之间的放大系数不合理导致上面的层的 tombstone 不能被推到最底层。这时候大家可以考虑开启 level_compaction_dynamic_level_bytes这个参数来解决。
- 你是否遇到过 iterator 的抖动导致的长尾问题呢?这个可能是因为 iterator 在释放的时候需要做一些清理工作的原因,尝试开启 avoid_unnecessary_blocking_io 来解决。
- 你是否遇到过 ingest file 导致的抖动问题?在 ingest file 的过程中,RocksDB 会阻塞写入,所以如果 ingest file 的某些步骤耗时很长就会带来明显的抖动。例如如果 ingest 的 SST 文件跟 memtable 有重叠,则需要先把 memtable flush 下来,而这个过程中都是不能写入的。所以为了避免这个抖动问题,我们会先判断需要 ingest 的文件是否跟 memtable 有重叠,如果有的话会在 ingest 之前先 flush,等 flush 完了再执行 ingest。而这个时候 ingest 之前的 flush 并不会阻塞写,所以也就避免了抖动问题。
- 你是否遇到过某一层的一个文件跟下一层的一万个文件进行 compaction 的情况呢?RocksDB 在 compaction 生成文件的时候会预先判断这个文件跟下一层有多少重叠,来避免后续会产生过大的 compaction 的问题。然而,这个判断对 range deletion 是不生效的,所以有可能会生成一个范围非常广但是实际数据很少的文件,那么这个文件再跟下一层 compact 的时候就会涉及到非常多的文件,这种 compaction 可能需要持续几个小时,期间所有文件都不能被释放,磁盘很容易就满了。由于我们需要 delete range 的场景很有限,所以目前我们通过 delete files in range + scan + delete 的方式来替换 delete range。虽然这种方式比 delete range 开销更大,但是更加可控。虽然也可以通过 compaction filter 来进一步优化,但是实现比较复杂,我们暂时没有考虑。
由于篇幅有限,上面只是提了几个可能大家都会遇到的问题和解决办法。这些与其说是使用技巧,还不如说是“无奈之举”。很多问题是因为 RocksDB 是这么实现的,所以我们只能这么用,即使给 RocksDB 做优化往往也只能是一些局部调整,毕竟 RocksDB 是一个通用的存储引擎,而不是给我们系统专用的。因此,考虑到以后整个系统的演进的需要,我们设计了一个专用的存储引擎 BlockDB。
BlockDB
功能需求
BlockDB 需要解决的一个核心需求是数据分片。我们每个存储节点会存储几千上万个数据分片,目前这些单节点的所有分片都是存储在一个 RocksDB 实例上的。这样的存储方式存在以下缺点:
- 无法对不同数据分片的资源使用进行隔离,这一点对于多租户的支持尤为重要。
- 无法针对不同数据分片的访问模式做优化,比如有的分片读多写少,有的分片写多读少,那么我们希望对前者采取对读更加友好的 compaction 策略,而对后者采取对写更加友好的 compaction 策略,但是一个 RocksDB 实例上我们只能选择一种单一的策略。
- 不同数据分片的操作容易互相影响,一些对数据分片的操作在 RocksDB 中需要加全局锁(比如上面提到的 ingest file),那么数据分片越多锁竞争就会越激烈,容易带来长尾问题。
- 不同数据分片混合存储会带来一些不必要的写放大,因为我们不同业务的数据分片是按照前缀来区分的,不同数据分片的前缀差别很大,导致写入的数据范围比较离散,compaction 的过程中会有很多范围重叠的数据。
虽然 RocksDB 的 Column Family 也能够提供一部分的数据切分能力,但是面对成千上万的数据分片也显得力不从心。而且我们的数据分片还需要支持一些特殊的操作,比如分片之间的分裂合并等。因此,BlockDB 首先会支持数据分片,并且在数据分片之上增加资源控制和自适应 compaction 等功能。
除了数据分片以外,我们还希望减少事务的开销。目前事务数据的存储方式相当于在 RocksDB 的多版本之上再增加了一层多版本。RocksDB 内部通过 sequence 来区分不同版本的数据,然后在 compaction 的时候根据 snapshot sequence 来清除不可见的垃圾数据。我们的事务在 RocksDB 之上通过 timestamp 来区分不同版本的用户数据,然后通过 GC 来回收对用户不可见的垃圾数据。这两者的逻辑是非常相似的,目前的存储方式显然存在一定的冗余。因此,我们会把一部分事务的逻辑下推到 BlockDB 中,一方面可以减少冗余,另一方面也方便在引擎层做进一步的优化。采用多版本并发控制的存储系统有一个共同的痛点,就是频繁的更新操作会导致用户数据的版本数很多,范围查找的时候需要把每一条用户数据的所有版本都扫一遍,对读性能带来很大的影响。实际上,大部分的读请求只会读最新的若干个版本的数据,如果我们在存储层把新旧版本分离开来,就能够大大提升这些读请求的性能。所以我们在 BlockDB 中也针对这个问题做了设计。
性能需求
除了功能需求以外,BlockDB 还希望进一步发挥高性能 SSD(如 NVMe)随机 IO 的特性,降低成本。RocksDB 的数据是以文件单位进行存储的,所以 compaction 的最小单位也是文件。如果一个文件跟下一层完全没有重叠,compaction 可以直接把这个文件 move 到下一层,不会产生额外的 IO 开销。可以想象,如果一个文件越小,那么这个文件跟下一层重叠的概率也越小,能够直接复用这个文件的概率就越大。但是在实际使用中,我们并不能把文件设置得特别小,因为文件太多对文件系统并不友好。基于这一想法,我们在 BlockDB 中把数据切分成 Block 进行存储,而 Block 的粒度比文件小得多,比如 128KB。这里的 Block 可以类比为 SST 文件里的 Block,只是我们把 SST 文件的 Block 切分开来,使得这些 Block 能够单独被复用。但是以 Block 为单位进行存储对范围扫描可能不太友好,因为同一个范围的数据可能会分散在磁盘的各个地方,扫描的时候需要大量的随机读。不过在实际测试中,只要控制 Block 的粒度不要太小,配合上异步 IO 的优化,随机读依然能够充分发挥磁盘的性能。
另外,为了进一步发挥磁盘性能,减少文件系统的开销,BlockDB 还设计了一个 Block System 用于 Block 的存储。Block System 类似于一个轻量级的文件系统,但是是以 Block 为单位进行数据存储的。Block System 既可以基于现有的文件系统来实现,也可以直接基于裸盘来实现,这一设计为将来接入 SPDK 和进一步优化 IO 路径提供了良好的基础。
小结
以上,是我们对于自研强一致在线 KV&表格存储的部分介绍,涵盖整体结构,接口和关键技术中的 Raft、存储引擎。下篇我们会继续介绍关键技术中的分布式事务、分区自动分裂和合并、负载均衡,以及表格层相关内容。欢迎大家持续关注,明天我们会准时更新。