MySQL导入导出实践
最近一次数据迁移,需要将MySQL的数据导出、处理后导入到新表和ES。这里做个简单记录,方便后续查询。
注: 为了写文章方便及隐私安全,实际内容会有所简化。例如表结构简化、数据库连接部分全部用 xxx 表示、目录及文件名均为化名等。
实践过程
原表:
book_db 库 - b_book(id,create_time,update_time,price,title,intro)
新表:
book 库 - book(id,price,title,create_time,update_time) - book_ext(id,book_id,intro,create_time)
MySQL导出
mkdir -p /tmp/ # 导出原始数据 mysql -hxxx -uxxx -pxxx book_db --default-character-set=utf8 -e 'select id,create_time,update_time,price,title,intro from b_book' | sed 's/NULL//g' > /tmp/b_book.csv
sed 's/NULL//g'是因为导出的数据有些字段存的NULL,新表不需要存储NULL,所以去掉。
导出的数据每行默认以 分隔,第一行包含字段名。这里我们删掉第一行:
sed -i '1d' /tmp/b_book.csv
数据处理
cd /tmp/ # 处理create_time,update_time,price,并生成文件 book.csv cat b_book.csv | awk -F ' ' -v OFS=' @@@ ' '{gsub(/[-:]/," ",$2); $2=mktime($2);gsub(/[-:]/,"",$3);$3=mktime($3);$4=$4*100;$6="";print $0}' > book.csv # 生成文件 book_ext.csv cat b_book.csv | awk -F ' ' -v OFS=' @@@ ' '{print $1,$6}' > book_ext.csv # 生成文件 book_es.csv cat b_book.csv | awk -F ' ' -v OFS=' @@@ ' '{$4=$4*100;print $0}' > book_es.csv
因为原表里时间都是datetime格式,新表是时间戳格式,这里处理成时间戳格式。价格原表是以元为单位,这里*100是为了处理成以分为单位。
-v OFS=' @@@ '表示输出的时候每列以@@@为分隔符。原因是原表里的intro字段存储的是html,可能包含常用转义字符,这里使用@@@确保能正确分隔每列。
导入到MySQL
mysql -hxxx -uxxx -pxxx book Load Data LOCAL InFile '/tmp/book.csv' Into Table book character set utf8 Fields Terminated By ' @@@ ' Enclosed By '' Escaped By '' Lines Terminated By ' ' (id,create_time,update_time,price,title); Load Data LOCAL InFile '/tmp/book_ext.csv' Into Table book_ext character set utf8 Fields Terminated By ' @@@ ' Enclosed By '' Escaped By '' Lines Terminated By ' ' (book_id,intro);
说明:
- Terminated 字段分隔符(列分隔符)。一般是空格或者
- Enclosed 字段括起字符。没有为空字符即可
- Escaped 转义字符。没有为空字符即可
- Terminated 记录分隔符(行结束符)
Into Table 代表插入,记录已存在(唯一键约束)则失败不再往下执行。Replace Into Table 代表覆盖,记录已存在则覆盖(是整条记录覆盖,没有列出的字段给默认值)。Ignore Into Table 遇到已存在直接跳过。
导入到ES
由于生产的book_es.csv文件比较大,所以这里按20000条生成一个文件,防止文件过大,ES导入失败。
cd /tmp/ awk '{filename = "book_es.csv." int((NR-1)/20000) ".csv"; print >> filename}' book_es.csv
ConvertBookToEs.php是PHP脚本,生成ES批量导入的文件。见附录。执行后生成很多book_es.csv.*.csv.json文件。
php ConvertBookToEs.php
importToEs.sh是ES批量导入脚本,如下:
#!/bin/bash for file in `ls /tmp/book_es.csv.*.csv.json` do echo $file; curl -XPOST http://xxx:9200/book/doc/_bulk -H "Content-Type: application/json" --data-binary "@$file" >> importToEs.log done
执行脚本:
sh importToEs.sh
等待数分钟,便执行完毕了。
实现MySQL LOAD DATA按字段更新
为了将大量数据加载到MySQL中,LOAD DATA INFILE是迄今为止最快的选择。但是,虽然这可以以INSERT IGNORE或REPLACE的方式使用,但目前不支持ON DUPLICATE KEY UPDATE。
如果我们想批量更新某个字段,ON DUPLICATE KEY UPDATE如何使用LOAD DATA INFILE模拟?
stackoverflow 上有网友给了答案。步骤是:
1)创建一个新的临时表。
CREATE TEMPORARY TABLE temporary_table LIKE target_table;
2)从临时表中删除所有索引以加快速度。(可选)
SHOW INDEX FROM temporary_table; DROP INDEX `PRIMARY` ON temporary_table; DROP INDEX `some_other_index` ON temporary_table;
3)将CSV加载到临时表中
LOAD DATA INFILE 'your_file.csv' INTO TABLE temporary_table Fields Terminated By ' ' Enclosed By '' Escaped By '' Lines Terminated By ' ' (field1, field2);
4)使用ON DUPLICATE KEY UPDATE复制数据
SHOW COLUMNS FROM target_table; INSERT INTO target_table SELECT * FROM temporary_table ON DUPLICATE KEY UPDATE field1 = VALUES(field1), field2 = VALUES(field2);
MySQL将假定=之前的部分引用INSERT INTO子句中指定的列,第二部分引用SELECT列。
5)删除临时表
DROP TEMPORARY TABLE temporary_table;
使用SHOW INDEX FROM和SHOW COLUMNS FROM此过程可以针对任何给定的表自动执行。
注:官方文档里 INSERT ... SELECT ON DUPLICATE KEY UPDATE语句被标记为基于语句的复制不安全。所以上述方案请在充分测试后再实施。详见:
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
附录
ConvertBookToEs.php
<?php /** * 转换wish_book为ES 批量格式(json) */ //id,create_time,update_time,price,title,intro function dealBook($file) { $fp = fopen($file, 'r'); while (!feof($fp)) { $line = explode(' @@@ ', fgets($fp, 65535)); if ($line && isset($line[1])) { $arr_head = [ 'index' => [ '_id' => (int)$line[0] ] ]; $arr = [ 'id' => (int)$line[0], 'create_time' => strtotime($line[1]), 'update_time' => strtotime($line[2]), 'price' => intval($line[3]), 'title' => (string)$line[4], 'intro' => (string)$line[18], ]; file_put_contents($file . '.json', json_encode($arr_head, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND); file_put_contents($file . '.json', json_encode($arr, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND); } } } try { //处理CSV文件为es bluk json格式 //参考 https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html $files = glob("/tmp/book_es.csv.*.csv"); if (false === $files) { exit("can not find csv file"); } $pids = []; foreach ($files as $i => $file) { $pid = pcntl_fork(); if ($pid < 0) { exit("could not fork"); } if ($pid > 0) { $pids[$pid] = $pid; } else { echo time() . " new process, pid:" . getmypid() . PHP_EOL; dealBook($file); exit(); } } while (count($pids)) { foreach ($pids as $key => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if ($res == -1 || $res > 0) { echo 'Child process exit,pid ' . $pid . PHP_EOL; unset($pids[$key]); } } sleep(1); } } catch (Exception $e) { $message = $e->getFile() . ':' . $e->getLine() . ' ' . $e->getMessage(); echo $message; }
参考
1、Linux命令行文本工具 - 飞鸿影~ - 博客园
https://www.cnblogs.com/52fhy/p/5836429.html
2、mysqldump 导出 csv 格式 --fields-terminated-by=, :字段分割符; - superhosts的专栏 - CSDN博客
https://blog.csdn.net/superhosts/article/details/26054997
3、Batch Processing | Elasticsearch Reference [6.4] | Elastic
https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
4、mysql导入数据load data infile用法整理 - conanwang - 博客园
https://www.cnblogs.com/conanwang/p/5890753.html
5、MySQL LOAD DATA INFILE with ON DUPLICATE KEY UPDATE - Stack Overflow
https://stackoverflow.com/questions/15271202/mysql-load-data-infile-with-on-duplicate-key-update
6、mysql - INSERT INTO ... SELECT FROM ... ON DUPLICATE KEY UPDATE - Stack Overflow
https://stackoverflow.com/questions/2472229/insert-into-select-from-on-duplicate-key-update
7、MySQL :: MySQL 5.6参考手册:: 13.2.5.2 INSERT ... ON DUPLICATE KEY UPDATE语法
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
8、复制表结构和数据SQL语句