【技术实验】mysql准实时同步数据到Elasticsearch
摘要: Elasticsearch作为大数据场景下搜索和分析的引擎,广泛应用于实时数据分析等场景。本文作者梳理了从MySQL准实时同步数据到Elasticsearch的实操步骤,帮助开发者理解和快速上手。
实验背景
Elasticsearch在阿里云商业化已经有一段时间,它作为大数据场景下搜索和分析的引擎,可以用于很多场景。前两天有同学提到需要将MySQL中的数据准实时的同步到ElasticSearch中的需求,由于自己对ES也很感兴趣但一直没有机会实操,恰好趁这个机会学习验证了一下,并把过程记录下来,方便新人尽快上手少走弯路。
本次实操采用Logstash实现将MySQL数据实时同步到ElasticSearch,过程中主要以实操步骤为主,并没有详细介绍这两个产品本身,所以文档适合对Logstash和Elasticsearch有一些基础的人阅读。使用Logstash过程中会用到一些参数,每个参数的详细解释请参考官方文档。
实验步骤
1. 开通阿里云Elasticsearch
开通阿里云Elasticsearch。产品链接
ES控制台中修改配置,允许自动创建索引。
2. 开通对应的ECS和RDS(MySQL)
开通与Elasticsearch相同Region、相同专有网络下的ECS一台,建议配置2核8G,操作系统:Aliyun Linux 17.1 64位,用于运行Logstash程序。
开通RDS(MySQL)
3. 在ECS上安装和配置curl
SSH登陆到ESC中,检查ECS是否已经安装curl。
curl是利用URL语法在命令行方式下工作的开源文件传输工具,可以用于测试访问ES服务。(Aliyun Linux 17.1 64位默认是预装curl的,若其他系统没有预装请自行安装)。
检查ECS中是否可以通过curl命令访问到ES服务。
curl -u ES用户名:ES密码 -XGET 'http://ES服务IP:9200/?pretty'
红色字体部分要替换相应的用户名、密码和ES服务IP或域名。如下图所以,能返回红框内的内容说明curl是可以访问到ES服务的。
4. 安装JDK8、MySQL5.6驱动以及Logstash -6.0.0
ECS中分别安装JDK8、MySQL5.6驱动以及Logstash -6.0.0。如下图:
安装Logstash input、output插件,此案例数据输入是MySQL,输出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch。
安装插件的命令分别是(在Logstash主目录下运行):
./bin/logstash-plugin install logstash-input-jdbc
./bin/logstash-plugin install logstash-output-elasticsearch
5. MySQL中创建数据库、测试的数据表
如下图所示
建表语句(其中updatetime用于记录数据更新时间戳):
create table jm_es_employee ( id varchar(10), first_name varchar(20), last_name varchar(20), age int(10), about varchar(100), interests varchar(100), updatetime timestamp null default current_timestamp on update current_timestamp );
6. 配置Logstash作业文件
ECS中创建Logstash作业配置文件,文件名为logstash-mysql-es.conf。
配置文件内容:
input{ jdbc { jdbc_driver_library => "mysql-connector-java-5.1.44-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/db_name" jdbc_user => "db_user" jdbc_password => "db_password" jdbc_paging_enabled => "true" jdbc_page_size => "1000" jdbc_default_timezone =>"Asia/Shanghai" schedule => "* * * * *" statement => "select * from jm_es_employee where updatetime > :sql_last_value" use_column_value => true tracking_column => "updatetime" last_run_metadata_path => "./logstash_jdbc_last_run" } } output{ elasticsearch { hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200" user => "elastic" password => "es_password" index => "employee" document_id => "%{id}" } stdout { codec => json_lines } }
其中红色字体部分要做相应的替换,input中的 schedule参数用于配置数据刷新频率,schedule => " *"表示每分钟刷新一次,这也是MySQL数据同步的最小频率。Logstash支持丰富的参数配置,详情请参考Elasitc官网文档。
7. 同步数据
ECS中指定参数启动Logstash服务,执行命令:
logstash -f logstash-mysql-es.conf
之后每分钟会去MySQL中刷新数据
RDS中写入几条测试数据,脚本如下:
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]');
由于之前在Logstash配置文件中,output部分既配置了输出到ES,同时也输出到控制台。所以当检测到MySQL中有更新时,数据会输出到控制台中,如下图:
此时说明MySQL中的数据更新已经被Logstash推送到ES服务。通过在ECS执行命令检查ES服务中的索引是否被创建。执行命令:
curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/_cat/indices?v'
红框内的employee即我们在配置文件中指定的索引名,说明ES中的索引已经被成功创建。
8. 结果验证
通过关键字检索ES服务,验证写入Mysql的数据是否被成功索引到ES并被检索到,执行命令通过关键字“Smith “来检索数据:
curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/employee/_search?q=last_name:Smith&pretty'
至此,MySQL中的数据已经被成功索引到Elasticsearch,并也可以被准实时的检索到。
作者:奇米 阿里巴巴高级工程师
相关推荐
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。