ElasticSearch MySQL 增量同步
主要用到了一个 JDBC importer for Elasticsearch的库。
想要增量同步,有一些先决条件。首先数据库中要维护一个update_time的时间戳,这个字段表示了该记录的最后更新时间。然后用上面的那个库,定时执行一个任务,这个任务中执行的sql就是根据时间戳判断该记录是否应该被更新。
这里先写一个最简单的例子来展示一下。
从上方插件官网中下载适合的dist包,然后解压。进入bin目录,可以看到一堆sh脚本。在bin目录下创建一个test.sh:
bin=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/bin
lib=/home/csonezp/Dev/elasticsearch-jdbc-2.3.1.0/lib
echo '{
"type" : "jdbc",
"statefile" : "statefile.json",
"jdbc": {
"url" : "jdbc:mysql://myaddr",
"user" : "myuser",
"password" : "mypwd",
"type" : "mytype",
"index": "myindex",
"schedule" : "0 * * * * ?",
"metrics" : {
"enabled" : true
},
"sql" : [
{
"statement" : "select * from gd_actor_info where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
]
}
}' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporterschedule现在设置成每分钟都执行一次,是为了方便观察行为。statefile这一句是一定要加的。$metrics.lastexecutionstart就是这个脚本的关键所在了,这个指的是上一次脚本执行的时间,可以通过比较这个时间和数据库里的字段来判断是否要更新。
Elasticsearch mysql 增量同步 三表联合 脚本
上面简略的说了一下es同步数据脚本的大致情况,但是实际情况里肯定不会像上一篇里面的脚本那么简单。比如目前我就有三张表,两张实体表,一张关联表。大致实现如下:
bin目录建立一个statefile.json文件:
{
"type" : "jdbc",
"statefile" : "statefile.json",
"jdbc": {
"url" : "jdbc:mysql://",
"user" : "",
"password" : "",
"type" : "actor",
"index": "test",
"schedule" : "0 * * * * ?",
"metrics" : {
"lastexecutionstart" : "0",
"lastexecutionend" : "0",
"counter" : "1"
},
"sql" : [
{
"statement" : "select a.actor_id as _id ,a.*,GROUP_CONCAT(b.tag_name ) as tag_name from ( ( gd_actor_info as a left join gd_actor_tag as ab on a.actor_id = ab.actor_id ) left join gd_tag_actor as b on ab.tag_id = b.tag_id) where a.update_time >? or ab.update_time > ? group by a.actor_id ",
"parameter" : [ "$metrics.lastexecutionstart" ,"$metrics.lastexecutionstart" ]
}
]
}
} 主要是lastexecutionstart设置为0,为了让第一次执行能进行一次全量备份。
其实sh脚本信息也就都在上面了,再写一个就好了
ElasticSearch 的详细介绍:请点这里
ElasticSearch 的下载地址:请点这里
相关推荐
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。