Logstash Oracle同步设置

input {

  jdbc {
    #jdbc驱动包位置
    jdbc_driver_library => "D:\tools\elk\logstash-7.6.1\ojdbc8-12.2.0.1.jar"
    #jdbc驱动类
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    # 数据库相关配置
    jdbc_connection_string => "jdbc:oracle:thin:callcard///callcardsitoracle01.beta.hic.cloud:1521/callcardsit_srv"
    jdbc_user => "callcard"
    jdbc_password => "huawei123"
    # 是否清除sql_last_value的记录,需要增量同步时此字段必须为false;
    clean_run => true
    # 同步频率(分 时 天 月 年),默认每分钟同步一次;
    schedule => "*/10 * * * * *"

    use_column_value => true
    tracking_column_type => timestamp
    tracking_column => updated_at

    # 同步SQL
    statement =>"select *
from user n
       
where updated_at>:sql_last_value and updated_at<sysdate order by updated_at desc"
    # 索引类型,不需要指定type,否则会在同步ES后生成type字段
    #type => "user"
    # 设置时区
    #jdbc_default_timezone =>"Asia/Shanghai"
  }

}
 
filter {
 # 删除无用字段
  mutate {   
          remove_field => "@timestamp"
          remove_field => "@version"
  }     

  # 时间+8个时区,思想是找临时变量,最后+8后替换
  ruby {
        code => "event.set(‘@created_at‘, event.get(‘created_at‘).time.localtime + 8*60*60)"
  }       
  ruby {   
        code => "event.set(‘created_at‘,event.get(‘@created_at‘))"
  }       
  mutate { 
          remove_field => ["@created_at"]
  }
}
output {
  stdout {
    codec => rubydebug
  }
  #if [type]=="user" {
    elasticsearch {
      # ES host:port
      hosts => ["127.0.0.1:9200"]
      #将mysql数据加入blog索引下,会自动创建
      index => "user"
      # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号_id
      document_id => "%{id}"
    }
 # }
 
}

相关推荐