Hive事务原理和Datax同步事务表问题解决
一、事务的概述
1、定义
事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工作单位。
2、特点
- 原子性:一个事务是一个不可再分割的工作单位,事务中的所有操作要么都发生,要么都不发生。
- 一致性:事务开始之前和事务结束以后,数据库的完整性约束没有被破坏。这是说数据库事务不能破坏关系数据的完整性以及业务逻辑上的一致性。
- 隔离性:多个事务并发访问,事务之间是隔离的,一个事务不影响其它事务运行效果。这指的是在并发环境中,当不同的事务同时操作相同的数据时,每个事务都有各自完整的数据空间。事务查看数据更新时,数据所处的状态要么是另一事务修改它之前的状态,要么是另一事务修改后的状态,事务不会查看到中间状态的数据。事务之间的相应影响,分别为:脏读、不可重复读、幻读、丢失更新。
- 持久性(Durability):意味着在事务完成以后,该事务锁对数据库所作的更改便持久的保存在数据库之中,并不会被回滚。
3、事务实现的原理
- 预写日志(Write-ahead logging):保证原子性和持久性。
- 锁(locking):保证隔离性;锁是指在并发环境中通过读写锁来保证操作的互斥性。根据隔离程度不同,锁的运用也不同。
- 一致性,是因为一致性是应用相关的话题,它的定义一个由业务系统来定义,什么样的状态才是一致?而实现一致性的代码通常在业务逻辑的代码中得以体现。
二、使用事务的准备条件和限制
1、准备条件
a、 默认事务是关闭的,需要设置开启。
b、需要添加如下配置
<property>
<name>hive.support.concurrency</name>
<value>
true
</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>
true
</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>
true
</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>
1
</value>
</property>
2、限制条件
- BEGIN, COMMIT, ROLLBACK 暂时不支持,所有操作自动提交
- 目前只支持ORC 的文件格式。
- 表必须支持分桶(即条件1的设置)
- 必须设置事务管理器 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager ,否则事务表无法工作(条件1的设置)
- 目前支持快照级别的隔离。就是当一次数据查询时,会提供一个数据一致性的快照
- 已有的zookeeper和内存的锁管理和Hive的事务不冲突
- LOAD DATA. 语句目前在事务表中暂时不支持
三、Hive中实现事务的原理
1、Hive事务实现原理
HDFS是不支持文件的修改,并且当有数据追加到文件,HDFS不对读数据的用户提供一致性的。为了在HDFS上支持以上的特性,Hive借鉴了其他数据仓库工具的方法。如果Hive中的表开启transaction,数据是insert进去的,则insert进去的数据会放到delta文件夹里面,hive后台会有个进程定时去跟base里的数据合并,然后删除delta文件。即:表和分区的数据都被存在base files。 新的记录和更新,删除都存在delta files。一次事务操作创建一系列的delta files,将其合并为base。在读取的时候,将基础文件和修改,删除合并,最后返回给查询。
2、几个名词解释
创建一个事务表:CREATE table demo (
num string,
create_date int,
number(19,4)
) clustered by (num ) into 40 buckets stored as orc tblproperties(‘transactional‘=‘true‘);
a、表的Base and Delta Directories 目录:
hive> dfs -ls -R /user/hive/warehouse/demo;
drwxr-xr-x - ekoifman staff 0 2016-06-09 17:07 /user/hive/warehouse/demo/delta_0000044_0000044_0000
-rw-r--r-- 1 ekoifman staff 610 2016-06-09 17:07 /user/hive/warehouse/demo/delta_0000044_0000044_0000/bucket_00000
- Minor compaction :将已有的delta files重写到一个单独的delta file,每个分桶一个。
- Major compaction: 将delta文件和base 重写到一个新的base file,每个分桶一个。 这个合并操作的代价更大。
- 所有的合并操作都是后台进行,不会影响并行的数据读取和写入。合并完成之后,系统会等到所以的读操作完成再删除旧的文件。
d、Worker
该进程检查在 hive.txn.timeout 内没有心跳的事务并丢弃。对于一个初始化过事务的client,如果心跳停止了,它所锁住的资源会被释放。
四、datax通过数据中遇到的问题
1、前提条件
当开启了事务,但是没有配置参数,即事务限制条件:
- 客户端
hive.support.concurrency – true
hive.enforce.bucketing – true (Hive 2.0 默认)
hive.exec.dynamic.partition.mode – nonstrict
hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager - 服务端 (Metastore)
hive.compactor.initiator.on – true
hive.compactor.worker.threads – a positive number
当直接向表格中插入数据时,HDFS中存储的都是delta数据,datax无法读取,会报错:
java.lang.IllegalArgumentException: delta_0051045_0051045 does not start with base_
at org.apache.hadoop.hive.ql.io.AcidUtils.parseBase(AcidUtils.java:144)
at org.apache.hadoop.hive.ql.io.AcidUtils.parseBaseBucketFilename(AcidUtils.java:172)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.run(OrcInputFormat.java:544)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-02-12 12:58:21.367 [0-0-36-reader] ERROR HdfsReader$Job - 从orcfile文件路径[hdfs://10.20.30.50:8020/inceptor1/user/hive/warehouse/iri.db/hive/var_scr/delta_0051045_0051045/bucket_00017]中读取数据发生异常,请联系系统管理员。