flume
概念
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
模型
a)Source:采集组件,用于跟数据源对接,以获取数据 b)Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据 c)Channel:传输通道组件,用于从source将数据传递到sink
相关案例
采集目录
# Name the components on this agent #设置source名 a1.sources = r1 #设置sink名 a1.sinks = k1 #设置channel名 a1.channels = c1 # Describe/configure the source ##注意:不能往监控目中重复丢同名文件 #采集文件夹,所以采集组件位spooldir a1.sources.r1.type = spooldir #采集目录 a1.sources.r1.spoolDir = /export/servers/dirfile #文件头 a1.sources.r1.fileHeader = true # Describe the sink #因为要存到hdfs上,所以下沉组件位hdfs a1.sinks.k1.type = hdfs #sink传输的管道c1 a1.sinks.k1.channel = c1 #存储在hdfs的位置 a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/ #存储文件前缀名 a1.sinks.k1.hdfs.filePrefix = events- #多长时间存储一次 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 #文件多大存储一次 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory #管道类型 a1.channels.c1.type = memory #管道中最多存储event数量 a1.channels.c1.capacity = 1000 #每次最大可以从source中拿到或者送到sink中的event数量 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
采集文件
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /export/servers/taillogs/access_log agent1.sources.source1.channels = channel1 #configure host for source #agent1.sources.source1.interceptors = i1 #agent1.sources.source1.interceptors.i1.type = host #agent1.sources.source1.interceptors.i1.hostHeader = hostname # Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
两个agent级联
第一个flume用来监控数据并发送到第二个flume
第二个flume数据汇总
采集数据的flume
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/servers/taillogs/access_log a1.sources.r1.channels = c1 # Describe the sink ##sink端的avro是一个数据发送者 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.52.120 a1.sinks.k1.port = 4141 a1.sinks.k1.batch-size = 10 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
合并的flume
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ##source中的avro组件是一个接收者服务 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 192.168.52.120 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node01:8020/avro/hdfs/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动顺序
先启动flume2(合并数据) 在启动flume1(采集数据)
高可用failover
名称 HOST 角色 Agent1 node01 Web Server #采集数据 Collector1 node02 AgentMstr1 #数据合并和存储 Collector2 node03 AgentMstr2 #数据合并和存储
Agent1
#agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2 # ##set gruop agent1.sinkgroups = g1 # ##set channel agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100 # agent1.sources.r1.channels = c1 agent1.sources.r1.type = exec agent1.sources.r1.command = tail -F /export/servers/taillogs/access_log # agent1.sources.r1.interceptors = i1 i2 agent1.sources.r1.interceptors.i1.type = static agent1.sources.r1.interceptors.i1.key = Type agent1.sources.r1.interceptors.i1.value = LOGIN agent1.sources.r1.interceptors.i2.type = timestamp # ## set sink1 agent1.sinks.k1.channel = c1 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = node02 agent1.sinks.k1.port = 52020 # ## set sink2 agent1.sinks.k2.channel = c1 agent1.sinks.k2.type = avro agent1.sinks.k2.hostname = node03 agent1.sinks.k2.port = 52020 # ##set sink group agent1.sinkgroups.g1.sinks = k1 k2 # ##set failover agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.priority.k1 = 10 agent1.sinkgroups.g1.processor.priority.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000 #
Collector1
#set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1 # ##set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # ## other node,nna to nns a1.sources.r1.type = avro a1.sources.r1.bind = node02 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = node02 a1.sources.r1.channels = c1 # ##set sink to hdfs a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/failover/ a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=TEXT a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d #
Collector2
#set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1 # ##set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # ## other node,nna to nns a1.sources.r1.type = avro a1.sources.r1.bind = node03 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = node02 a1.sources.r1.channels = c1 # ##set sink to hdfs a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/failover/ a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=TEXT a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d #
负载均衡load balancer
node01:采集数据,发送到node02和node03机器上去 node02:接收node01的部分数据 node03:接收node01的部分数据
node01
#agent name a1.channels = c1 a1.sources = r1 a1.sinks = k1 k2 #set gruop a1.sinkgroups = g1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/servers/taillogs/access_log # set sink1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = node02 a1.sinks.k1.port = 52020 # set sink2 a1.sinks.k2.channel = c1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = node03 a1.sinks.k2.port = 52020 #set sink group a1.sinkgroups.g1.sinks = k1 k2 #set failover a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
node02
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = node02 a1.sources.r1.port = 52020 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
node03
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = node03 a1.sources.r1.port = 52020 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
多文件监控
A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log 现在要求: 把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上然后统一收集到hdfs中。 但是在hdfs中要求的目录为: /source/logs/access/20180101/** /source/logs/nginx/20180101/** /source/logs/web/20180101/**
服务端配置文件开发
在node03上面开发flume配置文件
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim avro_source_hdfs_sink.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.52.120
a1.sources.r1.port =41414
添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.52.100:8020/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000
组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
采集端文件生成脚本
在node01与node02上面开发shell脚本,模拟数据生成
cd /export/servers/shells
vim server.sh
!/bin/bash
while true
do
date >> /export/servers/taillogs/access.log;
date >> /export/servers/taillogs/web.log;
date >> /export/servers/taillogs/nginx.log;
sleep 0.5;
done
顺序启动服务
node03启动flume实现数据收集
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin
bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
node01与node02启动flume实现数据监控
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin
bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console