使用Logstash把MySQL数据导入到Elasticsearch中

总结:这种适合把已有的MySQL数据导入到Elasticsearch中

有一个csv文件,把里面的数据通过Navicat Premium 软件导入到数据表中,共有998条数据

文件下载地址:https://files.cnblogs.com/files/sanduzxcvbnm/SalesJan2009.zip

csv文件格式如下:
使用Logstash把MySQL数据导入到Elasticsearch中

Logstash 配置
1.下载连接mysql的驱动包,放到指定目录下
在地址https://dev.mysql.com/downloads/connector/j/下载最新的Connector。下载完这个Connector后,把这个connector存入到Logstash安装目录下的如下子目录中:

logstash-core/lib/jars/

使用Logstash把MySQL数据导入到Elasticsearch中

conf文件内容如下:

input {
    jdbc {
       jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/salesjan2009?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
       jdbc_user => "root"
       jdbc_password => "root"
       jdbc_validate_connection => true
       jdbc_driver_library => ""
       jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
       parameters => { "Product_id" => "Product1" }
       statement => "SELECT * FROM salesjan2009 WHERE Product = :Product_id"
    }    
}
 
filter {
	mutate {
	 	rename => {
        	"longitude" => "[location][lon]"
        	"latitude" => "[location][lat]"
    	}
    }
}
 
output {
    stdout {
    
    }
 
   elasticsearch {
        hosts => ["192.168.75.21:9200"]
     	index => "sales" # 指定索引名
     	document_type => "_doc"
        user => "elastic"
        password => "GmSjOkL8Pz8IwKJfWgLT"
    } 
}

说明:
1.jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/salesjan2009?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
连接的数据库地址,端口号,数据库名,字符编码,时区等
2.
jdbc_user => "root"
jdbc_password => "root"
连接数据库使用的用户名和密码,根据自己的实际情况而定

3.jdbc_driver_library
驱动包路径,若是在logstash指定目录下则留空,若不是则需要指定绝对路径

4.jdbc_driver_class
最新使用的驱动包类

5.parameters
设置一个参数Product_id,其值是Product1

6.statement
sql语句,结合上面的理解,是查询salesjan2009数据表中条件Product的值是Product_id也即是Product1的数据

7.filter mutate
新增一个字段,重构经纬度参数值结构

运行Logstash来加载我们的MySQL里的数据到Elasticsearch中:

./bin/logstash --debug -f  ./config/conf.d/sales.conf

可以在Kibana中查看到最新的导入到Elasticsearch中的数据:
使用Logstash把MySQL数据导入到Elasticsearch中

注意数据总数,并不是数据表中的全部数据,而是根据查询条件获得的部分数据。

相关推荐