站点上线联动ELK日志录入
ELK架构日志处理逻辑:
1、业务层Filebeat安装时会自动获取主机运行站点域名及环境信息新增channel及env标签,并将channel的值作为kafka的topic信息
2、Kafka收到Filebeat的新增字段及Topic信息,自动创建Topic信息,以等待logstash消费
3、Logstash根据脚本自动生成input及output配置
这里的topic一定和filebeat的channel一致。
示范:
filebeat层:
- type: log processors: - add_fields: fields: env: "prod" ## ansible调用Python根据网段信息自动判断生成 ip: "10.12.11.27" ## ansible调用Python根据网段信息自动判断生成 apptype: "service" ## ansible调用Python根据域名自动判断生成 channel: "cms.prod.tarscorp.com" ##ansible调用Python根据站点目录生成 enabled: true paths: - /data1/logs/cms.prod.tarscorp.com/*.log output.kafka: codec.json: pretty: true escape_html: false hosts: ["kafka1.mgt.tarscorp.com:9092", "kafka2.mgt.tarscorp.com:9092", "kafka3.mgt.tarscorp.com:9092"] topic: ‘cms.prod.tarscorp.com‘ ## topic和channel取自同一数据 partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Kafka层:忽略,集群+开启自动创建Topic即可
logstash层:
vim prod-input.conf ##输入信息 kafka { topics => "cms.prod.tarscorp.com" ## kafka中的topic信息 bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092" decorate_events => false group_id => "logstash-tars" # consumer_threads => 5 client_id => "mgt-elk-logstash1-prod" codec => "json" add_field => {"topic"=>"cms.prod.tarscorp.com"} ##这里主要是为了方便logstash做判断 } vim prod-javasite.conf ##输出信息 if [topic] == "cms.prod.tarscorp.com" { elasticsearch { hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"] manage_template => false index => "prod-javasite-%{+YYYY.MM.dd}" } }
说明:我们这里技术栈为Java spring,所以所有的Java站点都会放在[prod-javasite-%{+YYYY.MM.dd}]的索引下,由于配置环节众多,所以在站点上架时,采用分发机器先部署服务,然后ansible部署filebeat,其中ansible会通过Python脚本来获取服务器网段及站点信息通过templates来补充channel、Topic、apptype、env、ip标签生成配置,实现自动判断,减轻运维参与环节负担。
使用脚本生成举例:
./add_info.py --env prod --topic cms.prod.tarscorp.com --module javasite
vim add_info.py
#!/usr/bin/env python3 import os,sys,argparse parser = argparse.ArgumentParser(description=‘Logstash configuration file add tools‘) parser.add_argument(‘--env‘,type=str,required=True,help=‘环境信息‘) parser.add_argument(‘--topic‘,type=str,required=True,help=‘Topic信息‘) parser.add_argument(‘--module‘,type=str,required=True,help=‘模块信息‘) args = parser.parse_args() env_info = args.env topic_name = args.topic module_info = args.module date = "%{+YYYY.MM.dd}" template_input = ‘‘‘ kafka { topics => "%s" bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092" decorate_events => false group_id => "logstash-tars" # consumer_threads => 5 client_id => "mgt-elk-logstash1-dev" codec => "json" add_field => {"topic"=>"%s"} } } ‘‘‘ %(topic_name,topic_name) template_output = ‘‘‘ if [topic] == "%s" { elasticsearch { hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"] manage_template => false index => "%s-%s-%s" } } } ‘‘‘ %(topic_name,env_info,module_info,date) init_input = ‘‘‘ input { ‘‘‘ init_output = ‘‘‘ output { ‘‘‘ path_home = "/etc/logstash/conf.d/" input_file = "/etc/logstash/conf.d/%s-input.conf" % (env_info) output_file = "/etc/logstash/conf.d/%s-%s.conf" % (env_info, module_info) if os.path.exists(path_home) == False: print(‘请在logstash主机运行该脚本‘) exit(code=255) if os.path.exists(input_file) == False: with open(input_file, mode=‘w‘, encoding=‘utf-8‘) as f: f.write(init_input) if os.path.exists(output_file) == False: with open(output_file, mode=‘w‘, encoding=‘utf-8‘) as f: f.write(init_output) with open(input_file,mode=‘rb+‘) as f: f.seek(-2,2) f.write(template_input.encode(‘utf-8‘)) with open(output_file,mode=‘rb+‘) as f: f.seek(-2,2) f.write(template_output.encode(‘utf-8‘))