Bulk Upsert for MySQL & PostgreSQL
什么是 Upsert
"UPSERT" is a DBMS feature that allows a DML statement's author to atomically either insert a row, or on the basis of the row already existing, UPDATE that existing row instead, while safely giving little to no further thought to concurrency. One of those two outcomes must be guaranteed, regardless of concurrent activity, which has been called "the essential property of UPSERT".
简而言之,就是,不存在就插入,存在就更新。
单记录 Upsert
MySQL有INSERT...ON DUPLICATE KEY UPDATE语法,可以实现Upsert:
INSERT INTO customers (id, first_name, last_name, email) VALUES (30797, 'hooopo1', 'wang', '[email protected]') ON DUPLICATE KEY UPDATE first_name = VALUES(first_name), last_name = VALUES(last_name);
PostgreSQL 从 9.5 也有了INSERT ... ON CONFLICT UPDATE语法,效果和 MySQL 类似:
INSERT INTO customers (id, first_name, last_name, email) VALUES (30797, 'hooopo1', 'wang', '[email protected]') ON CONFLICT(id) DO UPDATE SET first_name = EXCLUDED.first_name, last_name = EXCLUDED.last_name;
批量 Upsert
之前研究 MySQL 里如何插入最快 ,里面提到 LOAD INFILE
方式批量插入,并且 MySQL 的 bulk insert
是支持 REPLACE
语意的,即批量插入的同时还可以 upsert
。
LOAD DATA LOCAL INFILE '/Users/hooopo/data/out/product_sales_facts.txt' REPLACE INTO TABLE product_sale_facts FIELDS TERMINATED BY ',' (`id`,`date_id`,`order_id`,`product_id`,`address_id`,`unit_price`,`purchase_price`,`gross_profit`,`quantity`,`channel_id`,`gift`)
当然 PostgreSQL 也有 Copy功能,和 MySQL 的 LOAD INFILE
类似。然而,copy
命令不支持 Upsert,这使一些增量 ETL 的工作非常不方便。
不过有一种利用 staging 表的方式实现 bulk upsert,大致步骤如下:
一. 目标表
二. 把增量数据批量插入中间表
CREATE TABLE IF NOT EXISTS staging LIKE customers INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES; COPY staging (id, email, first_name, last_name) FROM STDIN WITH DELIMITER ',' NULL '\N' CSV;
三. 把目标表中与 staging 表冲突部分删掉
DELETE FROM customers USING staging WHERE customers.id = staging.id
四. 把 staging 表批量插入到目标表,因为冲突部分已经删掉,所以这步不会有任何冲突。
INSERT INTO customers (SELECT * FROM staging);
五. 把 staging 表清空
TRUNCATE TABLE staging;
上面过程确实很麻烦,如果使用 kiba-plus 的话,只需要简单的 DSL:
destination Kiba::Plus::Destination::PgBulk2, { :connect_url => DEST_URL, :table_name => "customers", :truncate => false, :columns => [:id, :email, :first_name, :last_name], :incremental => true, :unique_by => :id }
相关链接:
相关推荐
CoderToy 2020-11-16
emmm00 2020-11-17
王艺强 2020-11-17
ribavnu 2020-11-16
bianruifeng 2020-11-16
wangshuangbao 2020-11-13
苏康申 2020-11-13
vivenwan 2020-11-13
moyekongling 2020-11-13
云中舞步 2020-11-12
要啥自行车一把梭 2020-11-12
aydh 2020-11-12
kuwoyinlehe 2020-11-12
minerk 2020-11-12
vitasfly 2020-11-12
jazywoo在路上 2020-11-11
敏敏张 2020-11-11
世樹 2020-11-11