重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

暴走大数据点击右侧关注,暴走大数据!

重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

作者:wwwzw

By 暴走大数据

场景描述:2019年10月16日,在荷兰阿姆斯特丹举行的 Spark+AI 欧洲峰会上,DataBricks 和 Linux 基金会联合宣布,开源项目 Delta Lake 正式成为 Linux 基金会的托管项目。

关键词:Delta Lake 数据湖

2019年10月16日,在荷兰阿姆斯特丹举行的 Spark+AI 欧洲峰会上,DataBricks 和 Linux 基金会联合宣布,开源项目 Delta Lake 正式成为 Linux 基金会的托管项目。

重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

Delta Lake前世今生

2019年4月24日在美国旧金山召开的 Spark+AI Summit 2019 会上,Databricks 的联合创始人及 CEO Ali Ghodsi 宣布将 Databricks Runtime 里面的 Delta Lake 基于 Apache License 2.0 协议开源。

Delta Lake 是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力,其通过写和快照隔离之间的乐观并发控制(optimistic concurrency control),在写入数据期间提供一致性的读取,从而为构建在 HDFS 和云存储上的数据湖(data lakes)带来可靠性。

Delta Lake 还提供内置数据版本控制,以便轻松回滚。目前 Delta Lake 项目地址为 https://delta.io/,代码维护地址 https://github.com/delta-io/delta

Spark 做为一个计算引擎,应该无须质疑是当前大数据行业的领导者。而 Parquet 做为 Spark 的缺省数据存储格式,其实相当薄弱,缺少了太多关键特性,让Spark的用户不胜其扰,简直是Spark易用性的最大敌人!社区的抱怨可谓绵绵不绝,这种对于技术完美主义者,是无法容忍的!在这种背景下,Delta 开始了设计和实现。Databricks一年多前推出Delta之后,各位客户好评不断,但是只在有限的cloud上提供服务。这个实在无法满足那些大量部署Spark的整个社区!

于是乎,今年Spark Summit,使用Apache license 开源了!

Delta Lake的关键特性

现在很多公司内部数据架构中都存在数据湖,数据湖是一种大型数据存储库和处理引擎。它能够存储大量各种类型的数据,拥有强大的信息处理能力和处理几乎无限的并发任务或工作的能力,最早由 Pentaho 首席技术官詹姆斯迪克森在2011年的时候提出。虽然数据湖在数据范围方面迈出了一大步,但是也面临了很多问题,主要概括如下:

  • 数据湖的读写是不可靠的。数据工程师经常遇到不安全写入数据湖的问题,导致读者在写入期间看到垃圾数据。他们必须构建方法以确保读者在写入期间始终看到一致的数据。
  • 数据湖中的数据质量很低。将非结构化数据转储到数据湖中是非常容易的。但这是以数据质量为代价的。没有任何验证模式和数据的机制,导致数据湖的数据质量很差。因此,努力挖掘这些数据的分析项目也会失败。
  • 随着数据的增加,处理性能很差。随着数据湖中存储的数据量增加,文件和目录的数量也会增加。处理数据的作业和查询引擎在处理元数据操作上花费大量时间。在有流作业的情况下,这个问题更加明显。
  • 数据湖中数据的更新非常困难。工程师需要构建复杂的管道来读取整个分区或表,修改数据并将其写回。这种模式效率低,并且难以维护。

由于存在这些挑战,许多大数据项目无法实现其愿景,有时甚至完全失败。我们需要一种解决方案,使数据从业者能够利用他们现有的数据湖,同时确保数据质量。这就是 Delta Lake 产生的背景。Delta Lake 解决了上述问题,简化了数据湖构建。以下是 Delta Lake 提供的主要功能:

  • ACID 事务:Delta Lake 提供多个写操作之间的 ACID 事务。每个写操作都是一个事务,事务日志中记录的写操作有一个串行顺序。事务日志会跟踪文件级的写操作,并使用乐观并发控制,这非常适合数据湖,因为尝试修改相同文件的多个写操作并不经常发生。在存在冲突的场景中,Delta Lake 会抛出一个并发修改异常,以便用户处理它们并重试它们的作业。Delta Lake 还提供了强大的序列化隔离级别,允许工程师不断地对目录或表进行写操作,而用户可以不断地从相同的目录或表中读取数据。读取者将看到读操作开始时存在的最新快照。
  • 模式管理:Delta Lake 会自动验证正在写入的 DataFrame 模式是否与表的模式兼容。表中存在但 DataFrame 中不存在的列会被设置为 null。如果 DataFrame 中有额外的列在表中不存在,那么该操作将抛出异常。Delta Lake 具有可以显式添加新列的 DDL 和自动更新模式的能力。
  • 可伸缩的元数据处理:Delta Lake 将表或目录的元数据信息存储在事务日志中,而不是存储在元存储(metastore)中。这使得 Delta Lake 能够在固定的时间内列出大型目录中的文件,并且在读取数据时非常高效。
  • 数据版本控制和时间旅行:Delta Lake 允许用户读取表或目录先前的快照。当文件在写期间被修改时,Delta Lake 将创建文件的新版本并保存旧版本。当用户希望读取表或目录的旧版本时,他们可以向 Apache Spark 的读操作 API 提供一个时间戳或版本号,Delta Lake 根据事务日志中的信息构建该时间戳或版本的完整快照。这使得用户可以重新进行试验并生成报告,如果需要,还可以将表还原为旧版本。
  • 统一的批处理和流接收(streaming sink):除了批处理写之外,Delta Lake 还可以使用 Apache Spark 的结构化流作为高效的流接收。再结合 ACID 事务和可伸缩的元数据处理,高效的流接收现在支持许多接近实时的分析用例,而且无需维护复杂的流和批处理管道。
  • 记录更新和删除(即将到来):Delta Lake 将支持合并、更新和删除 DML 命令。这使得工程师可以轻松地维护和删除数据湖中的记录,并简化他们的变更数据捕获和 GDPR 用例。由于 Delta Lake 在文件粒度上跟踪和修改数据,因此,比读取和覆写整个分区或表要高效得多。
  • 数据期望(即将到来):Delta Lake 还将支持一个新的 API,用于设置表或目录的数据期望。工程师将能够通过指定布尔条件及调整严重程度来处理数据期望。当 Apache Spark 作业写入表或目录时,Delta Lake 将自动验证记录,当出现违规时,它将根据所预置的严重程度处理记录。

Delta Lake ACID 保证是建立在存储系统的原子性和持久性基础之上的。具体来说,该存储系统需要提供以下特性:

  • 原子可见性:必须有一种方法使文件完全可见或完全不可见。
  • 互斥:只有一个写入者能够在最终目的地创建(或重命名)文件。
  • 一致性清单:一旦在目录中写入了一个文件,该目录未来的所有清单都必须返回该文件。

Delta Lake 仅在 HDFS 上提供所有这些保证。通过插件的方式加入 LogStore API 的自定义实现,可以使它与其他存储系统一起工作。

Delta Lake牛刀初试

官网提供了QuickStart方便我们快速学习。

创建一个Maven工程,加入以下依赖:

<dependency>
 <groupId>io.delta</groupId>
 <artifactId>delta-core_2.11</artifactId>
 <version>0.4.0</version>
</dependency>

Create a table

创建一个 Delta 类型的表方法很简单,如下。

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
SparkSession spark = ... // create SparkSession
Dataset<Row> data = data = spark.range(0, 5);
data.write().format("delta").save("/tmp/delta-table");

然后我们到目录下看看:


➜ delta-table tree├── _delta_log│ └── 00000000000000000000.json├── part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet├── part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet├── part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet└── part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet

从上面的结果可以看到,在创建 Delta 表的时候,生成了一个 json 文件,这个文件也是 Delta 的 transaction log,也就是事务日志,所以的事务相关操作都会记录到这个日志中,可以做 replay 使用,后面研究源码的时候会深入分析,和若干 parquet 文件(Delta 底层使用的文件格式)。

Update table data

Dataset<Row> data = data = spark.range(5, 10);
data.write().format("delta").mode("overwrite").save("/tmp/delta-table");

Read Data


scala> val df = spark.read.format("delta").load("/tmp/delta-table")df: org.apache.spark.sql.DataFrame = [id: bigint]scala> df.show()+---+| id|+---+| 8|| 9|| 5|| 7|| 6|+---+

Conditional update without overwrite相当于upsert

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");
// 所有偶数加100
deltaTable.update(
 functions.expr("id % 2 == 0"),
 new HashMap<String, Column>() {{
 put("id", functions.expr("id + 100"));
 }}
);
// 删除所有偶数
deltaTable.delete(condition = functions.expr("id % 2 == 0"));
// 更新
Dataset<Row> newData = spark.range(0, 20).toDF();
deltaTable.as("oldData")
 .merge(
 newData.as("newData"),
 "oldData.id = newData.id")
 .whenMatched()
 .update(
 new HashMap<String, Column>() {{
 put("id", functions.col("newData.id"));
 }})
 .whenNotMatched()
 .insertExpr(
 new HashMap<String, Column>() {{
 put("id", functions.col("newData.id"));
 }})
 .execute();
deltaTable.toDF().show();

欢迎点赞+收藏+转发朋友圈素质三连

重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

重磅 | Delta Lake正式加入Linux基金会,重塑数据湖存储标准

文章不错?点个【在看】吧!

相关推荐