logstash同步mysql数据到Elasticsearch

安装logstash查看我的另一篇文章  Docker 部署 logstash

同步数据我们首先需要安装好对应的插件,然后下载对应的数据库链接jar包,下面是具体的步骤

1、进入容器中

docker  exec  it  logstash  bash

2、进入到bin 目录下,我这里是/usr/share/logstash/bin,可以看到logstash-plugin文件,然后安装插件

logstash-plugin install logstash-input-jdbc

3、看到如下输出,则表示安装成功 

logstash同步mysql数据到Elasticsearch

二、logstash同步mysql数据到Es

1、场景简介  比如我们需要检索资讯文章,单纯用mysql实现效率实在太低,特别是数据量大的时候。这时候我们就可以用到es,logstash定时把新增和更新的文章同步到es,业务上我们可以直接调用es的API检索文章。

2、在conf.d目录下配置jdbc.conf文件和jdbc.sql文件

(1)配置文件jdbc.conf

input {
    stdin {}
    jdbc {
     #连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连 
        jdbc_connection_string => "jdbc:mysql://192.168.16.241:3306/wsy_blockchain?characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=UCT"
        jdbc_user => "root"
        jdbc_password => "12345678"
        #连接mysql的jar包存放路径
        jdbc_driver_library => "/opt/mysql-connector-java-8.0.15.jar"
        #驱动
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        codec => plain {
            charset => "UTF-8"
        }
        #追踪的字段  我这里用的是创建时间(主要用来测试)
        tracking_column => createtime
        record_last_run => true
        #这个一直找不到原因为什么打开就会报错
        last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run"
        #这个一直找不到原因为什么打开就会报错
        #jdbc_default_timezone => "Asia/Shanghai"                                            #同步数据的语句存放位置             
        statement_filepath => "/usr/share/logstash/bin/jdbc.sql"
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false

        #这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
        schedule => "* * * * *"
        type => "std"
    }

}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
    ruby { 
        code => "event.set(‘timestamp‘, event.get(‘@timestamp‘).time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set(‘@timestamp‘,event.get(‘timestamp‘))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
  
}

output {

    elasticsearch {

        hosts => "192.168.16.241:9200"

        index => "test_data"

        document_type => "users"

        document_id => "%{id}"
    }

    stdout {

        codec => json_lines

    }
}

(2)配置jdbc.sql

select id,nickname,mobile, createtime  from  users where createtime >  :sql_last_value

3、启动配置文件jdbc.conf文件,开始同步数据

logstash -f  jdbc.conf

4、可以看到开始同步了

主要遇到的两个问题

1、查询出的数据库用户创建时间少了8个小时

解决:数据库配置链接需要加参数  serverTimezone=UCT

2、同步数据时间戳少了8个小时,

解决:同步时过滤对应字段timestsmp,获取后加上8小时,如下

filter {
    ruby { 
        code => "event.set(‘timestamp‘, event.get(‘@timestamp‘).time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set(‘@timestamp‘,event.get(‘timestamp‘))"
    }
    mutate {
        remove_field => ["timestamp"]
    }  
}

相关推荐