Logstash 参考指南(Logstash配置示例)
Logstash配置示例
下面的示例演示如何配置Logstash来过滤事件,处理Apache日志和syslog消息,并使用条件来控制哪些事件由过滤器或输出处理。
如果你需要帮助构建grok模式,请尝试grok调试器,Grok调试器是基本许可证下的X-Pack特性,因此可以免费使用。配置过滤器
过滤器是一种在线处理机制,它提供了根据需要对数据进行切片和切割的灵活性,让我们看一下活动中的一些过滤器,下面的配置文件设置了grok
和date
过滤器。
input { stdin { } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
使用此配置运行Logstash:
bin/logstash -f logstash-filter.conf
现在,将下面的行粘贴到你的终端并按Enter键,这样它就会被stdin
输入处理:
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
你应该会看到返回到stdout
的是这样的:
{ "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"", "@timestamp" => "2013-12-11T08:01:45.000Z", "@version" => "1", "host" => "cadenza", "clientip" => "127.0.0.1", "ident" => "-", "auth" => "-", "timestamp" => "11/Dec/2013:00:01:45 -0800", "verb" => "GET", "request" => "/xampp/status.php", "httpversion" => "1.1", "response" => "200", "bytes" => "3891", "referrer" => "\"http://cadenza/xampp/navi.php\"", "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"" }
如你所见,Logstash(在grok
过滤器的帮助下)能够解析日志行(碰巧是Apache的“组合日志”格式),并将其分解为许多不同的离散信息,一旦开始查询和分析日志数据,这就非常有用。例如,你将能够轻松地在HTTP响应码、IP地址、referrers等上运行报表。Logstash有很多现成的grok
模式,因此如果你需要解析通用的日志格式,很可能已经有人为你完成了这项工作,有关更多信息,请参阅GitHub上Logstash grok模式的列表。
本例中使用的另一个过滤器是date
过滤器,这个过滤器会解析一个时间戳,并将其用作事件的时间戳(不管你什么时候使用日志数据)。你将注意到,本例中的@timestamp
字段设置为2013年12月11日,尽管Logstash在随后的某个时间摄取了该事件,这在备份日志时非常方便,它使你能够告诉Logstash“使用此值作为此事件的时间戳”。
处理Apache日志
让我们做一些有用的事情:处理apache2访问日志文件!我们将从本地主机上的文件中读取输入,并根据需要使用条件处理事件。首先,创建一个名为logstash-apache.conf
的文件包含以下内容(你可以根据需要更改日志文件路径):
input { file { path => "/tmp/access_log" start_position => "beginning" } } filter { if [path] =~ "access" { mutate { replace => { "type" => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
然后,使用以下日志条目(或使用你自己的webserver中的一些日志条目)创建上面配置的输入文件(在本例中为“/tmp/access_log”):
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3" 134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)" 98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
现在,使用-f
标志运行Logstash以将其传递到配置文件:
bin/logstash -f logstash-apache.conf
现在你应该在Elasticsearch中看到你的apache日志数据了,Logstash打开并读取指定的输入文件,处理遇到的每个事件。记录到此文件的任何追加行也将被捕获,由Logstash作为事件处理,并存储在Elasticsearch中。还有一个额外的好处,他们储藏的字段“type
”设置为“apache_access
”(这是由输入配置中的type ⇒ "apache_access"
行)。
在这个配置中,Logstash只查看apache access_log,但是通过更改上面配置中的一行就可以同时查看access_log和error_log(实际上是任何文件匹配*log
):
input { file { path => "/tmp/*_log" ...
当你重新启动Logstash时,它将同时处理error和access日志,但是,如果你检查数据(可能使用elasticsearch-kopf
),你会看到access_log被分解为离散字段,而error_log则不是。这是因为我们使用了grok
过滤器来匹配标准的组合apache日志格式,并自动将数据分割为单独的字段。如果我们能根据它的格式来控制行是如何被解析的,不是很好吗?嗯,我们可以…
注意,Logstash不会重新处理access_log文件中已经查看的事件,从文件中读取数据时,Logstash保存其位置,并只在添加新行时处理它们。
使用条件
你可以使用条件来控制哪些事件由过滤器或输出处理,例如,你可以根据出现在哪个文件(access_log、error_log以及以“log”结尾的其他随机文件)中来标记每个事件。
input { file { path => "/tmp/*_log" } } filter { if [path] =~ "access" { mutate { replace => { type => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } else if [path] =~ "error" { mutate { replace => { type => "apache_error" } } } else { mutate { replace => { type => "random_logs" } } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
这个示例使用type
字段标记所有事件,但实际上不解析error
或random
文件,有很多类型的错误日志,它们应该如何标记取决于你使用的日志。
类似地,你可以使用条件将事件定向到特定的输出,例如,你可以:
- 警告nagios任何状态为5xx的apache事件
- 将任何4xx状态记录到Elasticsearch
- 通过statsd记录所有的状态代码
要告诉nagios任何具有5xx状态码的http事件,首先需要检查type
字段的值,如果是apache,那么你可以检查status
字段是否包含5xx错误,如果是,发送到nagios。如果不是5xx错误,检查status
字段是否包含4xx错误,如果是,发送到Elasticsearch。最后,将所有apache状态码发送到statsd
,无论状态字段包含什么:
output { if [type] == "apache" { if [status] =~ /^5\d\d/ { nagios { ... } } else if [status] =~ /^4\d\d/ { elasticsearch { ... } } statsd { increment => "apache.%{status}" } } }
处理Syslog消息
Syslog是Logstash最常见的用例之一,而且它处理得非常好(只要日志行大致符合RFC3164),Syslog实际上是UNIX网络日志记录标准,它将消息从客户端发送到本地文件,或通过rsyslog发送到集中式日志服务器。对于本例,你不需要一个功能正常的syslog实例;我们将从命令行中伪造它,这样你就可以了解发生了什么。
首先,让我们为Logstash + syslog创建一个简单的配置文件,名为logstash-syslog.conf
。
input { tcp { port => 5000 type => syslog } udp { port => 5000 type => syslog } } filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
使用这个新配置运行Logstash:
bin/logstash -f logstash-syslog.conf
通常,客户端将连接到端口5000上的Logstash实例并发送消息,对于本例,我们将telnet到Logstash并输入一条日志行(类似于前面在STDIN中输入日志行),打开另一个shell窗口与Logstash syslog输入进行交互并输入以下命令:
telnet localhost 5000
复制粘贴以下行作为示例(你可以自己尝试一些,但是要记住,如果grok
过滤器对你的数据不正确,它们可能无法解析)。
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154] Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log) Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
现在,当你的原始shell处理和解析消息时,你应该会看到Logstash的输出!
{ "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "@timestamp" => "2013-12-23T22:30:01.000Z", "@version" => "1", "type" => "syslog", "host" => "0:0:0:0:0:0:0:1:52617", "syslog_timestamp" => "Dec 23 14:30:01", "syslog_hostname" => "louis", "syslog_program" => "CRON", "syslog_pid" => "619", "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "received_at" => "2013-12-23 22:49:22 UTC", "received_from" => "0:0:0:0:0:0:0:1:52617", "syslog_severity_code" => 5, "syslog_facility_code" => 1, "syslog_facility" => "user-level", "syslog_severity" => "notice" }