Logstash v7.3 处理基于json filter的日志处理与转换
利用Logstash 提供的filter机制,可以方便地按照用户要求解析日志信息。如果需要抽取、分析、计算可能蕴含的元数据信息,可以利用Logstash 中的filter 机制完成相应的功能。
Logstash v7.3的安装,可以参考如下文章:
Logstash 7.3 概述、原理、安装、配置以及实际应用演示
一、基于json filter的日志处理与转换
如果日志数据源是json格式,则利用json filter,可以解析json 数据。
如下是基于json filter的Logstash 配置文件,文件名为 filter-demo.conf。
input {
stdin{}
}
filter {
json {
source=>"message"
}
}
output {
stdout { codec => "rubydebug"}
}
在input、output 中间的filter 部分嵌入了对json 数据的解析方法,其中的source => 是指让 json filter 插件解析指定字段的json 格式数据。
sournce=>"message" 是让 json filter 解析message 字段的数据。
启动logstash,并指定配置文件:
当我们输入如下json 数据后,Logstash的filter 会对输入信息进行加工处理,经过处理后返回的结果如下:
{"name":"rickie", "age":16}
{
"host" => "Thinkpad-T460P",
"message" => "{\"name\":\"rickie\", \"age\":16}\r",
"@timestamp" => 2019-10-04T05:48:21.122Z,
"@version" => "1",
"name" => "rickie",
"age" => 16
}
其中,name和age是解析出来的结构化数据。
对比一下,没有配置json filter 插件的情况。
如下是没有基于json filter的Logstash 配置文件,文件名为 no-filter-demo.conf。
input {
stdin{}
}
output {
stdout { codec => "rubydebug"}
}
启动logstash,并指定配置文件:
当我们输入如下json 数据后,Logstash 处理后返回的结果如下:
{"name":"rickie", "age":16}
{
"@version" => "1",
"@timestamp" => 2019-10-04T05:53:26.708Z,
"host" => "Thinkpad-T460P",
"message" => "{\"name\":\"rickie\", \"age\":16}\r"
}
没有前面 json filter 处理后的name、age节点信息。
二、将处理后的日志通过TCP输出
前面的输出通过stdout{} 标准输出,这里演示Logstash 将处理后的日志从TCP Socket 中输出。
Logstash 配置文件,输出中添加tcp 输出,文件名为 tcp-output.conf:
input {
stdin{}
}
filter {
json {
source=>"message"
}
}
output {
stdout { codec => "rubydebug"}
tcp {
host=>"127.0.0.1"
port=>6000
mode=>"client"
}
}
设计一个Java应用程序作为TCP Server端,接收来自Logstash output输出的日志信息。
package com.rickie.Output;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class TCPServer_output {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(6000);
// 阻塞等待消息
Socket socket = serverSocket.accept();
InputStream inputStream = socket.getInputStream();
while (true) {
byte[] buf = new byte[1024];
int len = inputStream.read(buf);
System.out.println(new String(buf, 0, len));
}
}
}
TCP 端口号和Logstash 中配送的port 需要保持一致。
依次启动Java应用程序、Logstash:
(1)启动Java应用程序(TCP Server)
(2)启动Logstash,并设置配置文件
bin\logstash -f ..\logstash-conf\tcp-output.conf
输入:{"name":"rickie", "age":16}
在TCP Server端输出的内容: