日志平台(网关层) - 基于Openresty+ELKF+Kafka
背景介绍
1、问题现状与尝试
没有做日志记录的线上系统,绝对是给系统运维人员留下的坑。尤其是前后端分离的项目,后端的接口日志可以解决对接、测试和运维时的很多问题。之前项目上发布的接口都是通过Oracle Service Bus(OSB)来做统一编排,在编排时加上日志记录,并将接口日志存储到数据库中。最后基于接口日志数据开发日志平台,来统一的接口日志分析。
但我们总不能为了记录日志而使用OSB,这样很不自由。今年我们有很多后台接口使用Spring来开发,后台程序的部署环境也不局限于Oracle中间件的环境。当某些场景时,脱离了OSB,我们该如何记录接口日志,这是本文要解决的问题。
在我写的Spring系列的文章中,有尝试过使用Spring的AOP来记录日志。在每个项目的代码中,定义一个记录日志的切面,该切面会对该项目下的所有接口做日志记录。
对于一个周期很长、规模很大的一个独立项目来说,这个方案是可行的。因为项目周期很长,花个两天做日志记录的AOP开发没啥问题,而且这个日志更契合该系统的业务特征。
但我们团队所面对的开发,基本上都是数量多、周期短的一些小项目。一个项目的开发周期可能只有十天,就算每个项目在日志记录上只用一天的工作量,所占的比重也有十分之一。如果我们每个项目都要独立的记录日志,累积的工作量也挺大的,而且重复这样的工作很枯燥。
就像面向切面编程(AOP),在一个项目的所有接口上设置“切面”统一编程。如果我们的能在所有的项目上设置“切面”统一编程,就能解决我们现在的问题。这个“切面”就是网关。
2、方案设计
这个方案是公司内的两位技术大佬讨论出来的,这样惊奇的想法,让之前困扰的一切迷雾都豁然开朗了起来。我花了两天做了个Demo,验证方案的确行得通,下文会附上本次Demo中实战操作的代码。
简单来说,所有项目接口都通过Nginx的网关,而我们不需要在代码层面上收集日志,而是在Nginx上获取想要的日志信息,配合ELKF(Elasticsearch、Logstash、Kibana、Filebeat)的解决方案,实现统一的日志平台搭建:
- Nginx+Lua编程,按照我们定义的格式,所有通过网关的接口都会留下日志信息,写入log文件。
- Filebeat收集数据,Filebeat实时监测目标log文件,收集数据推送给Logstash。
- Logstash过滤处理数据,Logstash过滤处理数据后,会将数据同时推送给Elasticsearch和Kafka。
- Elasticsearch+Kibana,Elasticsearch作为数据的搜索引擎,而且利用Kibana的可视化界面,将日志数据以报表的形式显示出来。
- Kafka消息队列中间件,日志的数据被推送到Kafka上之后发布消息,而所有订阅者就能从队列中读数据。本次就是写程序实时的读取队列中的数据,存入数据库。
3、系统环境
在本次Demo中,由于资源限制,所有的产品服务都将部署在一台服务器上,服务器上的相关环境如下:
配置项 | 环境配置信息 |
---|---|
服务器 | 阿里云服务器ECS(公网:47.96.238.21 ,私网:172.16.187.25) |
服务器配置 | 2 vCPU + 4 GB内存 |
JDK版本 | JDK 1.8.0_181 |
操作系统 | CentOS 7.4 64位 |
OpenResty | 1.13.6.2 |
Filebeat | 6.2.4 |
Elasticsearch | 6.2.4 |
Logstash | 6.2.4 |
Kibana | 6.2.4 |
Kafka | 2.10-0.10.2.1 |
基于OpenResty的日志记录
OpenResty® 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。
我们选择OpenResty的目的有两个:(1)使用Lua编程,可以在Nginx上更好的拿到想要的日志信息;(2)系统其它功能模块的集成,例如Jwt的集成,可参考同事写的文章《Nginx实现JWT验证-基于OpenResty实现》。
1、OpenResty安装
在安装OpenResty之前需要先安装好依赖库,OpenResty 依赖库有: perl 5.6.1+, libreadline, libpcre, libssl。我们是CentOS系统,可以直接yum来安装。
[root@Kerry ~]# yum install readline-devel pcre-devel openssl-devel perl
接下来我们在当前CentOS系统上使用新的官方 yum 源
[root@Kerry ~]# yum install yum-utils [root@Kerry ~]# yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
这时我们就可以直接安装OpenResty
[root@Kerry ~]# yum install openresty [root@Kerry ~]# yum install openresty-resty
这样OpenResty就安装完成了,默认情况下程序会被安装到 /usr/local/openresty 目录
# 可查看安装成功 [root@Kerry ~]# cd /usr/local/openresty/bin/ [root@Kerry bin]# ./openresty -v nginx version: openresty/1.13.6.2 # 设置环境变量 [root@Kerry sbin]# vi /etc/profile # 在文件最后面加上 export PATH=${PATH}:/usr/local/openresty/nginx/sbin [root@Kerry sbin]# source /etc/profile
2、记录Nginx日志
OpenResty 安装之后就有配置文件及相关的目录的,为了工作目录与安装目录互不干扰,我们单独建一个工作目录。我在根目录下新建了 /openrestyTest/v1/ 的文件夹,并在该目录下创建 logs 和 conf 子目录分别用于存放日志和配置文件。
[root@Kerry ~]# mkdir /openrestyTest /openrestyTest/v1 /openrestyTest/v1/conf /openrestyTest/v1/logs [root@Kerry ~]# cd /openrestyTest/v1/conf/ # 创建并编辑 nginx.conf [root@Kerry conf]# vi nginx.conf
在nginx.conf中复制以下文本作为测试
worker_processes 1; #nginx worker 数量 error_log logs/error.log; #指定错误日志文件路径 events { worker_connections 1024; } http { server { #监听端口,若你的6699端口已经被占用,则需要修改 listen 6699; location / { default_type text/html; content_by_lua_block { ngx.say("HelloWorld") } } } }
该语法是基于Lua,监听6699端口,输出HelloWorld。我们现在启动Openresty中的Nginx。
[root@Kerry ~]# /usr/local/openresty/nginx/sbin/nginx -p '/openrestyTest/v1/' -c conf/nginx.conf # 由于配置或环境变量,也可以直接使用 [root@Kerry ~]# nginx -p '/openrestyTest/v1/' -c conf/nginx.conf [root@Kerry conf]# curl http://localhost:6699 HelloWorld
访问该端口地址,成功的显示HelloWorld。我提前在本服务器的Tomcat上部署了一个接口,端口是8080。我的想法是将8080反向代理成9000,将所有通过8080端口的服务的日志信息获取到,并输出到本地的log文件中。
我暂时需要记录的日志内容包括:接口地址,请求内容,请求时间,响应内容,响应时间等。代码写好了,直接替换 /openrestyTest/v1/conf/nginx.conf 的文件内容。
worker_processes 1; error_log logs/error.log; events { worker_connections 1024; } http { log_format myformat '{"status":"$status","requestTime":"$requestTime","responseTime":"$responseTime","requestURL":"$requestURL","method":"$method","requestContent":"$request_body","responseContent":"$responseContent"}'; access_log logs/test.log myformat; upstream tomcatTest { server 47.96.238.21:8080; } server { server_name 47.96.238.21; listen 9000; # 默认读取 body lua_need_request_body on; location / { log_escape_non_ascii off; proxy_pass http://tomcatTest; set $requestURL ''; set $method ''; set $requestTime ''; set $responseTime ''; set $responseContent ''; body_filter_by_lua ' ngx.var.requestTime=os.date("%Y-%m-%d %H:%M:%S") ngx.var.requestURL=ngx.var.scheme.."://"..ngx.var.server_name..":"..ngx.var.server_port..ngx.var.request_uri ngx.var.method=ngx.var.request_uri local resp_body = string.sub(ngx.arg[1], 1, 1000) ngx.ctx.buffered = (ngx.ctx.buffered or"") .. resp_body if ngx.arg[2] then ngx.var.responseContent = ngx.ctx.buffered end ngx.var.responseTime=os.date("%Y-%m-%d %H:%M:%S") '; } } }
重新启动Nginx,然后进行验证
[root@Kerry conf]# nginx -p '/openrestyTest/v1/' -c conf/nginx.conf -s reload
我准备好的接口地址为:http://47.96.238.21:8080/springboot-demo/hello ,该接口返回的结果都是“Hello!Spring boot”。
现在用POST方式调用接口http://47.96.238.21:9000/springboot-demo/hello,Request中使用application/json方式输入内容:“segmentFault《日志平台(网关层) - 基于Openresty+ELKF+Kafka》”。然后查看logs文件夹,发现多了个 test.log 文件,我们查看该文件。就可以发现,当我们每调用一次接口,就会同步的输出接口日志到该文件中。
[root@Kerry conf]# tail -500f /openrestyTest/v1/logs/test.log {"status":"200","requestTime":"2018-10-11 18:09:02","responseTime":"2018-10-11 18:09:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"segmentFault《日志平台(网关层) - 基于Openresty+ELKF+Kafka》","responseContent":"Hello!Spring boot!"}
到此为止,提取经过Nginx网关的接口信息,并将其写入日志文件就完成了,所有的接口日志都写入了 test.log 文件中。
E+L+K+F=日志数据处理
ELKF是 Elastic + Logstash + Kibana + FileBeat 四个组件的组合,可能ELK对于大家来说更熟悉,ELKF只不过多了Filebeat,它们都是Elastic公司推出的开源产品。刚好这几天Elastic公司成功上市,掀起了一波ELKF产品讨论的热潮。
原ELK架构中,Logstash负责收集日志信息并上报,但后来Elastic公司又推出了Filebeat,大家发现Filebeat在日志文件收集上效果更好,就只让Logstash负责日志的处理和上报了。在这个系统中,Elastic充当一个搜索引擎,Logstash为日志分析上报系统,FileBeat为日志文件收集系统,Kibana为此系统提供可视化的Web界面。
1、Filebeat安装配置
Filebeat:轻量型日志采集器,负责采集文件形式的日志,并将采集来的日志推送给logstash进行处理。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.4-x86_64.rpm [root@Kerry install]# yum localinstall -y filebeat-6.2.4-x86_64.rpm
安装完成后,我们开始配置Filebeat来采集日志,并推送给Logstash。
[root@Kerry install]# cd /etc/filebeat/ [root@Kerry filebeat]# vi filebeat.yml
该filebeat.yml是filebeat的配置文件,里面大部分的模块都被注释了,本次配置放开的代码有;
filebeat.prospectors: - type: log enabled: true paths: - /openrestyTest/v1/logs/*.log filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 output.logstash: hosts: ["47.96.238.21:5044"]
监听 /openrestyTest/v1/logs/ 目录下的log文件,采集的日志信息输出到logstash,该hosts等我们安装启动了Logstash再说,先启动Filebeat。
[root@Kerry filebeat]# cd /usr/share/filebeat/bin/ [root@Kerry bin]# touch admin.out [root@Kerry bin]# nohup ./filebeat -e -c /etc/filebeat/filebeat.yml > admin.out & # 查看admin.out 日志,是否启动成功
2、Logstash安装配置
Logstash:日志处理工具,负责日志收集、转换、解析等,并将解析后的日志推送给ElasticSearch进行检索。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.rpm [root@Kerry install]# yum localinstall -y logstash-6.2.4.rpm #Logstash不建议用root启动 [root@Kerry install]# group add logstash [root@Kerry install]# useradd -g logstash logstash [root@Kerry install]# passwd logstash # 设置密码 [root@Kerry install]# su logstash [root@Kerry install]# mkdir -pv /data/logstash/{data,logs} [root@Kerry install]# chown -R logstash.logstash /data/logstash/ [root@Kerry install]# vi /etc/logstash/conf.d/logstash.conf
创建并编辑/etc/logstash/conf.d/logstash.conf 文件,配置如下:
input { beats { port => 5044 codec => plain { charset => "UTF-8" } } } output { elasticsearch { hosts => "47.96.238.21:9200" manage_template => false index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" document_type => "%{[@metadata][type]}" } }
1、input:是指Logstash的数据来源,启动后使用5044来监听,是否很熟悉,就是上节Filebeat推送日志的hosts。
2、output;是Logstash输出数据的位置,我们这里定义为elasticsearch,下文中会说到,用于ELK架构中的日志分析
接下来我们修改/etc/logstash/logstash.yml
#vim /etc/logstash/logstash.yml path.data: /data/logstash/data path.logs: /data/logstash/logs
现在可以启动Logstash了
[root@Kerry install]# su logstash [logstash@Kerry root]$ cd /usr/share/logstash/bin/ [logstash@Kerry bin]$ touch admin.out [logstash@Kerry bin]$ nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &
3、Elasticsearch安装配置
ElasticSearch:是一个分布式的RESTful风格的搜索和数据分析引擎,同时还提供了集中存储功能,它主要负责将logstash抓取来的日志数据进行检索、查询、分析等。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.rpm [root@Kerry install]# yum localinstall -y elasticsearch-6.2.4.rpm #Elasticsearch不建议用root启动 [root@Kerry install]# group add elsearch [root@Kerry install]# useradd -g elsearch elsearch [root@Kerry install]# passwd elsearch # 设置密码 [root@Kerry install]# su elsearch [elsearch@Kerry bin]$ mkdir -pv /data/elasticsearch/{data,logs} [elsearch@Kerry bin]$ chown -R elsearch.elsearch /data/elasticsearch/ [elsearch@Kerry bin]$ vi /etc/elasticsearch/elasticsearch.yml path.data: /data/elasticsearch/data path.logs: /data/elasticsearch/logs network.host: 0.0.0.0 http.port: 9200
如果想要外网能访问,host就必须要设成0.0.0.0。Elasticsearch的启动如下
[root@Kerry install]# su elsearch [elsearch@Kerry bin]$ cd /usr/share/elasticsearch/bin/ [elsearch@Kerry bin]$ ./elasticsearch -d # -d 保证后台启动
4、Kibana安装配置
Kibana:Web前端,可以将ElasticSearch检索后的日志转化为各种图表,为用户提供数据可视化支持。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-x86_64.rpm [root@Kerry install]# yum localinstall -y kibana-6.2.4-x86_64.rpm [root@Kerry install]# vi /etc/kibana/kibana.yml server.port: 5601 server.host: "0.0.0.0" elasticsearch.url: "http://47.96.238.21:9200"
同样的,host为0.0.0.0,保证外网能访问。Kibana只作为前端展示,日志数据的获取还是借助于elasticsearch,所以这里配置了elasticsearch.url。接着启动Kibana,就能通过页面看到日志的报表。
[root@Kerry ~]# cd /usr/share/kibana/bin/ [root@Kerry bin]# touch admin.out [root@Kerry bin]# nohup ./kibana >admin.out &
我们在浏览器上访问 http://47.96.238.21:5601/ ,正常来说就能访问Kibana的页面。如果 ELKF一整套配置没问题,就能在Kibana的页面上实时的看到所有日志信息。
从Kafka到数据库
在拿到日志的数据后,通过Elasticsearch和Kibana,已经完成了一个日志查看的平台。但我们自己项目内部也已经开发了日志平台,希望把这些日志接入到之前的日志平台中;或者我们希望定制化一个更符合实际使用的日志平台,这些都需要把拿到的日志数据存储到数据库里。
但所有日志的记录,很明显处于高并发环境,很容易由于来不及同步处理,导致请求发生堵塞。比如说,大量的insert,update之类的请求同时到达数据库,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。在比对市场上开源的消息中间件后,我选择了Kafka。
Apache Kafka是一个分布式的发布-订阅消息系统,能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。Kafka将消息持久化到磁盘中,并对消息创建了备份保证了数据的安全。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
- Broker:Kafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
- Zookeeper:Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
- Producer:生产者将数据推送到broker上,当集群中出现新的broker时,所有的生产者将会搜寻到这个新的broker,并自动将数据发送到这个broker上。
- Consumer:因为Kafka的broker是无状态的,所以consumer必须使用partition
offset来记录消费了多少数据。如果一个consumer指定了一个topic的offset,意味着该consumer已经消费了该offset之前的所有数据。consumer可以通过指定offset,从topic的指定位置开始消费数据。consumer的offset存储在Zookeeper中。
1、Kafka安装与配置
我们开始Kafka的安装和启动
# 安装 [root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget http://apache.fayea.com/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz [root@Kerry install]# tar -zvxf kafka_2.10-0.10.2.1.tgz -C /usr/local/ [root@Kerry install]# cd /usr/local/ [root@Kerry local]# mv kafka_2.10-0.10.2.1 kafka # 启动 [root@Kerry local]# cd /usr/local/kafka/bin/ [root@Kerry bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties [root@Kerry bin]# touch admin.out [root@Kerry bin]# nohup ./kafka-server-start.sh ../config/server.properties >admin.out &
创建一个topic,命名为 kerry
[root@Kerry bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kerry # topic创建成功,下面查看一下 [root@Kerry bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 kerry
我们往这个topic中发送信息
[root@Kerry bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic kerry Hello Kerry!this is the message for test
我们再开一个窗口,从topic中接受消息
[root@Kerry bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning Hello Kerry!this is the message for test # 能成功接收到
2、生产者:Logstash
Kafka已经安装好了,也建好了topic,而我希望往topic中发送消息的对象(生产者)是Logstash。即Logstash从Filebeat中获取数据后,除了输出给Elasticsearch以外,还输出给Logstash,Logstash作为Kafka的生产者。
这里需要修改一下Logstash的配置文件,在output中再加上kafka的信息
vi /etc/logstash/conf.d/logstash.conf input { beats { port => 5044 codec => plain { charset => "UTF-8" } } } output { elasticsearch { hosts => "47.96.238.21:9200" manage_template => false index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" document_type => "%{[@metadata][type]}" } kafka { bootstrap_servers => "localhost:9092" #生产者 topic_id => "kerry" #设置写入kafka的topic compression_type => "snappy" codec => plain { format => "%{message}" } } }
重启Logstash
[root@Kerry bin]# cd /usr/share/logstash/bin [root@Kerry bin]# ps -ef|grep logstash # kill 进程 [root@Kerry bin]# nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &
我们再用POST方式调用之前的测试接口http://47.96.238.21:9000/springboot-demo/hello,请求request为:“这是对kafka的测试”。然后再查看从topic中接受消息
[root@Kerry bin]#./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning {"status":"200","requestTime":"2018-10-12 09:40:02","responseTime":"2018-10-12 09:40:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"这是对kafka的测试","responseContent":"Hello!Spring boot!"}
可以成功的接收到推送过来的日志消息
3、消费者:Springboot编程
日志已经可以保证能够持续不断的推送到Kafka中,那么就需要有消费者订阅这些消息,写入到数据库。我用Spring boot写了个程序,用来订阅Kafka的日志,重要代码如下:
1、application.yml
spring: # kafka kafka: # kafka服务器地址(可以多个) bootstrap-servers: 47.96.238.21:9092 consumer: # 指定一个默认的组名 group-id: kafka1 # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: earliest # key/value的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: # key/value的序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288 # 服务器地址 bootstrap-servers: 47.96.238.21:9092
2、POM.xml
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.6.RELEASE</version> </dependency>
3、KafkaController.java
package df.log.kafka.nginxlog.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.naming.InitialContext; import javax.sql.DataSource; import java.sql.Connection; @RestController @EnableAutoConfiguration public class KafkaController { @RequestMapping("/hello") public String hello(){ return "Hello!Kerry. This is NginxLog program"; } /** * 监听信息 */ @KafkaListener(topics = "kerry" ) public void receive(ConsumerRecord<?, ?> consumer) { // kafkaLog 就是获取到的日志信息 String kafkaLog = (String) consumer.value(); System.out.println("收到一条消息:"+kafkaLog); // 存入数据库的代码省略 } }
当程序部署之后,@KafkaListener(topics = "kerry") 会持续监听topics 为kerry的消息。我们再调用之前的测试接口,会发现新的接口日志会被持续监听到,在控制台上打印出来,并存入数据库。
尾声
本次操作文档是记录Demo的过程,很多地方并不成熟,例如:如何在 Nginx+Lua 时获取更加全面的日志信息;在Logstash上对日志进行再加工;写出漂亮的Spring boot 代码,使得能够很平缓的做写入数据库,用好Kibana的图表等等。
我们下一步就是在项目的生产环境上正式的搭建日志平台,我们已经有了rancher环境,这套架构计划用微服务的方式实现。后续的搭建文档会持续更新。