Flume浅度学习指南
flume简介
cloudera 公司开源的,贡献给Apache基金会
http://archive.cloudera.com/c...
只能运行在linux系统上
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
flume用来高效的收集、聚合、移动大量的日志数据
有一个基于流式的简单的有弹性的传输模型
有一个健壮的可容错的机制
使用简单,可以扩展的数据模型运行使用到在线实时分析应用中
简单体现在flume-agent的配置及传输模型简单
在线实时分析应用中
flume日志的实时采集->sparkStreaming/storm/Flink =>mysql/redis=>实时分析的结果进行报表展示
数据(日志)的移动传输工具:
日志=>系统运行日志、web服务器的访问日志、客户端的用户行为日志、软件的运行操作日志
可以将数据从数据源中采集并移动到另外一个目的地:
数据源=>系统本地日志文件中的数据、jms、avro端口、kafka、系统本地目录下... 目的地=>hdfs、hive、hbase、kafka、系统本地一个文件中...
如何将linux本地的一个日志文件中的日志数据采集到hdfs上
脚本+hdfs命令 =>【周期性】上传
#!/bin/sh HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2 $HADOOP_HOME/bin/hdfs -put /.../xx.log /hdfs
针对项目初期数据量较少时可以使用 , 没有容灾性及稳定性
采用flume日志采集框架=>【实时】采集一个日志文件中实时追加的日志数据并写入到目的地
针对不同的应用场景定义并启动对应的flume-agent实例/进程 source -- 定义从哪里采集数据 exec类型的source可以借助Linux的shell命令实现实时读取一个日志文件中动态追加的日志数据 avro类型 …… channel -- 定义了source采集的数据临时存储地 memory 基于内存的管道容器 file 基于磁盘 sink -- 定义将数据最终写入的-目的地 hdfs类型的sink将数据最终写入到hdfs上 hive类型将数据最终写入到hive表 kafka类型将数据最终写入到kafka分布式消息队列中 ……
flume-agent实例的模型
每个flume-agent实例至少由以下三个功能模块组成 source模块 用于监控数据源并进行数据的实时采集,是实时产生数据流的模块 数据源=>系统本地的一个日志文件中、kafka、jms、系统本地的一个目录下、avro端口 。。。 source将采集到的数据提交到channel中 channel模块 用于连接source和sink的管道容器 类似一个队列(FIFO) sink模块 从channel中拉取take(剪切)数据并最终将数据写入到目的地 目的地=>hdfs、hive、hbase、kafka、avro端口... event事件: event事件是flume传输日志数据时基本单元,在flume-agent内部数据都是以事件形式存在 source将采集到的数据封装成一个个的event事件,将事件提交到channel sink从channel消费事件并将事件中封装的数据最终写入到目的地 event事件的数据结构:header + body header 是一个map集合类型 内部的key-value为该事件的元数据信息,主要用来区分不同的事件 body 是一个字节数组类型 body为我们真正要传输的数据
flume的安装使用
flume-ng-1.6.0-cdh5.14.2 安装 1、上次解压flume的安装包 $ tar zxvf /opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/ $ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2 修改目录名称-可选 2、修改flume配置文件 $ mv conf/flume-env.sh.template conf/flume-env.sh 修改后环境配置文件才能生效 $ vi conf/flume-env.sh export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112 3、针对不同的场景需求配置对应的java属性配置文件并启动flume-agent进程 如何启动一个flume-agent进程 $ bin/flume-ng agent \ --name或-n 当前flume-agent实例的别名 \ --conf或-c 当前flume框架的配置文件目录 \ --conf-file,-f 与当前要启动的flume-agent进程相匹配的java属性配置文件的本地路径 Usage: bin/flume-ng <command> [options]...
案例
flume官方简单案例
定义一个flume-agent去监听读取某台服务器上的某个端口中的数据,并将监听读取到的数据最终写入到flume框架自己的日志文件中
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = centos01 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
提交测试: $ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties & 确定目标服务器的端口是否已经成功被flume-agent代理进程监听 $ netstat -antp |grep 44444 --查看端口信息 $ ps -ef | grep flume -- 查看进程信息 安装一个telnet工具并连接服务器端口写入数据 $ sudo yum -y install telnet 发送消息数据 检查flume的日志文件中的数据 $ tail -f logs/flume.log
sources = exec
要求使用flume实时监控读取系统本地一个日志文件中动态追加的日志数据并实时写入到hdfs上的某个目录下
# example.conf: A single-node Flume configuration #同一台Linux上可开启多个flume-agent,但agent别名要区分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停 a2.sources.r2.type = exec # tail -F 文件名 即使没有这个-F后面指定的文件,命令也不会停止,容错能力强 a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 #声明a2的sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog a2.sinks.k2.hdfs.filePrefix = nginxData # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
报错首先找logs目录
报错:找不到类
缺少jar包
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
sinks = hdfs 优化
解决生成的文件过多过小的问题(希望文件的大小=128M)
将日志文件按照日期分目录存储(按照天分目录存储)
将生成的日志文件的格式改为Text文本格式
修改上个例子的flume-agent属性文件
# 声明当前flume-agent的别名及当前的flume-agent实例包含的模块的别名和个数 a2.sources = s2 a2.channels = c2 a2.sinks = k2 # 定义source模块中的s2的类型及与此类型相关的延伸属性 # exec类型的source可以借助执行一条linux shell命令实现读取linux系统上某个文件中的日志数据,其中 cat是一次性读取,tail可以实现实时读取新增加的数据 # shell属性用来声明要执行的命令的运行环境 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /opt/nginx/access.log a2.sources.s2.shell = /bin/sh -c # 定义channel模块中的c2的类型及与此类型相关的延伸属性 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # 定义sink模块中的k2的类型及与此类型相关的延伸属性 a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d #启用根据时间生成路径中的转义字符的具体的时间值 a2.sinks.k2.hdfs.round = true #表示使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #设置文件的前缀 a2.sinks.k2.hdfs.filePrefix = NgnixLog #设置解决文件过多过小问题 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量写入到hdfs上文件中的最大event数量 #batchSize的值需要小于等于transactionCapacity的值 #从性能上考虑,最优的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100 # fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a2.sinks.k2.hdfs.writeFormat = Text # 将a2中的source及sink模块绑定到对应的channel模块上 # 一个source模块可以同时绑定多个channel模块,但是一个sink模块只能绑定一个唯一的channel a2.sources.s2.channels = c2 a2.sinks.k2.channel = c2
sources = spooldir
利用flume监控某个目录下的日志文件,当某个目录下出现符合要求的文件名称的文件时,则对文件中的日志数据进行读取,并将数据最终写入到hdfs上
目录 /opt/data/logs nginx-access.log.2018120309 nginx-access.log.2018120310
# example.conf: A single-node Flume configuration #同一台Linux上可开启多个flume-agent,但agent别名要区分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source # includePattern 用正则表达式指定要包含的文件 # ignorePattern 用正则表达式指定要忽略的文件 a2.sources.r2.type = spooldir a2.sources.r2.spoolDir = /home/chen/mylogs # 由于每次读完会给读完的文件增加.COMPLETED从而形成新文件,需要忽略这些文件 a2.sources.r2.ignorePattern = ^.*\.COMPLETED$ # includePattern和ignorePattern会同时生效 a2.sources.r2.includePattern = ^.*$ # Use a channel # file类型更安全 # memory类型效率更高 a2.channels.c2.type = file a2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data #声明a2的sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d #启用根据时间生成转义字符的具体的时间值 a2.sinks.k2.hdfs.round = true #使用本地linux系统时间戳作为时间基准,否则会自动参考事件的header中的时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true a2.sinks.k2.hdfs.filePrefix = nginxData #设置解决文件过多过小问题 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量写入到hdfs上文件中的最大event数量 #batchSize的值需要小于等于transactionCapacity的值 #从性能上考虑,最优的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100 # fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a2.sinks.k2.hdfs.writeFormat = Text # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
一台汇总服务器
需求: Nginx服务器集群 -- 10台 每台服务器上都有一个access.log日志文件 需要将每台服务器上的日志文件中追加的日志数据实时读取并写入到hdfs上 思路1: 每台Nginx服务器上启动一个flume-agent source - exec channel - mem sink - hdfs 多个flume-agent同时写入数据到hfds上不利于hdfs的稳定性 思路2: 每台Nginx服务器上启动一个flume-agent source - exec channel - mem sink - avro type = avro hostname = 主机名 port = 端口号 将数据统一写入到某台服务器某个端口中 启动一个负责对汇总后的数据统一写入到目的地的flum-agent source - avro type = avro bind = port = channel - mem sink - hdfs
nginxs2flume.properties
# example.conf: A single-node Flume configuration #同一台Linux上可开启多个flume-agent,但agent别名要区分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #依靠的是Linux的命令读取本地文件,Linux的命令不停止flume就不停 a2.sources.r2.type = exec # tail -F 文件名 即使没有这个-F后面指定的文件,命令也不会停止,容错能力强 a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 #声明a2的sink a2.sinks.k2.type = avro a2.sinks.k2.hostname = centos01 a2.sinks.k2.port = 6666 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
flume2hdfs.properties
# example.conf: A single-node Flume configuration #同一台Linux上可开启多个flume-agent,但agent别名要区分 a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = avro a3.sources.r3.bind = centos01 a3.sources.r3.port = 6666 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 #声明a3的sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/test a3.sinks.k3.hdfs.writeFormat = Text # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
Flume Channel Selectors
作用:
当一个flume-agent中存在多个channel时,为Source选择下游的Channel进行配置
类型一:Replicating Channel Selector (default) 复制模式
source将事件复制并提交给所有与其绑定的channel中 默认属性可以省略定义
flume-agent source s1 -- 希望采集用户日志复制份数为n(n=channel的个数)分配到各个channel中 channel c1 c2 sink k1 -- hdfs k2 -- kafka的某个主题中
# 声明agent名称及此agent要使用的source、channel、sink名称 a2.sources = s2 a2.channels = c1 c2 a2.sinks = k1 k2 # 定义source类型及相关属性,需要声明下运行该命令的shell环境 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log # 默认属性就是replicating a2.sources.s2.selector.type = replicating # 定义c1 # 定义channel类型及相关属性 a2.channels.c1.type = memory # 管道的最大容量 a2.channels.c1.capacity = 1000 # 每次最大从source获取的事件数量和sink每次获取的event最大数量 a2.channels.c1.transactionCapacity = 100 # 定义c2 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # k1 # 定义kafka的sink a2.sinks.k1.type = org.apache.flume.source.kafka.KafkaSource a2.sinks.k1.kafka.bootstrap.servers = centos01:9092 a2.sinks.k1.kafka.topics = nginxTopic # k2 # 定义sinks类型及相关属性 a2.sinks.k2.type = hdfs #设置按天进行生成存储目录,每天生成一个文件夹 a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/nginx/%Y%m%d/ #启用根据时间生成转义字符的值 a2.sinks.k2.hdfs.round = true #使用本地时间戳作为基准 a2.sinks.k2.hdfs.useLocalTimeStamp = true #设置文件的前缀 a2.sinks.k2.hdfs.filePrefix = flumeLog #设置解决文件过多过小问题 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量写入到hdfs上文件中的最大event数量 #batchSize的值需要小于等于transactionCapacity的值 a2.sinks.k2.hdfs.batchSize = 100 # fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a2.sinks.k2.hdfs.writeFormat = Text # 绑定sink和source到channel a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2 a2.sources.s2.channels = c1 c2
类型二:Multiplexing Channel Selector(网址案例)多路复用模式
source根据事件的header头信息中的不同值提交给对应的channel中 事件中自带头信息 source直接判断即可 https://blog.csdn.net/looklook5/article/details/40430965--官方案例 事件没有头信息 需要自定义header头信息,又称为定义拦截器 http://www.cnblogs.com/Skyar/p/5831935.html --案例 使用正则匹配选择器类型对日志分解并指定key名称 (\\w+)表示任意单词字符串,范围包括a-zA-Z0-9_ file_roll类型本地目录提前创建,默认目录下每30s回滚一个文件 使用echo "192.168.134.101:bigdata01:spark" >> test.log
mul_demo.properties
#声明代理及三种模块的别名及数量 a1.sources = s1 a1.channels = c1 c2 c3 c4 a1.sinks = k1 k2 k3 k4 # 定义s1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /opt/flume/test.log # 声明该source的channel selector类型 # multiplexing类型的channel selector会根据header中的某个key的值的不同提交给不同的channel a1.sources.s1.selector.type = multiplexing # 声明依据header头的key为course的值来判断提交该条数据给那些channel # header map( k1->v1,k2->v2 ,k3->v3 ) a1.sources.s1.selector.header = course # 如果该条数据中的header中的course的值为hadoop,则将该事件提交给c1 a1.sources.s1.selector.mapping.hadoop = c1 a1.sources.s1.selector.mapping.hive = c1 c2 a1.sources.s1.selector.mapping.hbase = c3 # 如果以上都未匹配到则默认提交给c4 a1.sources.s1.selector.default = c4 # 声明一个source的拦截器组件,别名叫 i1 a1.sources.s1.interceptors = i1 # 声明此拦截器的类型 ,regex_extractor为正则匹配拦截器类型 a1.sources.s1.interceptors.i1.type = regex_extractor # (\\w+) 表示可以匹配任意的字符串 范围包括 a-zA-Z0-9_ # abc:erdfd:gsaafdf # 将一条日志通过':'的分割并声明对应的key值,最终定义出一个包含三个key-value对的header头信息 # 192.168.134.101:bigdata01:spark => # header{ip->192.168.134.101,domain->bigdata01,course->spark} body (192.168.134.101:bigdata01:spark ) a1.sources.s1.interceptors.i1.regex = (\\w+):(\\w+):(\\w+) a1.sources.s1.interceptors.i1.serializers = s1 s2 s3 a1.sources.s1.interceptors.i1.serializers.s1.name = ip a1.sources.s1.interceptors.i1.serializers.s2.name = domain a1.sources.s1.interceptors.i1.serializers.s3.name = course # 定义 c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 定义 c2 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 定义 c3 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 100 # 定义 c4 a1.channels.c4.type = memory a1.channels.c4.capacity = 1000 a1.channels.c4.transactionCapacity = 100 # 定义k1 #file_roll类型为将事件写入到linux本地磁盘 a1.sinks.k1.type = file_roll #定义将事件写入到本地的目录 a1.sinks.k1.sink.directory = /opt/flume/k1 a1.sinks.k1.sink.rollInterval = 0 # 定义k2 a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = /opt/flume/k2 a1.sinks.k2.sink.rollInterval = 0 # 定义k3 a1.sinks.k3.type = file_roll a1.sinks.k3.sink.directory = /opt/flume/k3 a1.sinks.k3.sink.rollInterval = 0 # 定义k4 a1.sinks.k4.type = file_roll a1.sinks.k4.sink.directory = /opt/flume/k4 a1.sinks.k4.sink.rollInterval = 0 # Bind the source and sink to the channel a1.sources.s1.channels = c1 c2 c3 c4 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 a1.sinks.k4.channel = c4 # 最终可以实现的需求 如果source采集的日志为 192.168.134.101:bigdata01:spark =》日志发送给 c4 192.168.134.101:bigdata01:hive =》日志发送给 c1 c2 192.168.134.106:bigdata04:hbase =》日志发送给 c3 192.168.134.106:bigdata04:hadoop =》日志发送给 c1 192.168.134.106:bigdata04:oozie =》日志发送给 c4
Flume Sink Processors
作用: 将一个flume-agent中的多个sink定义到一个Sinkgroups组中 使组内多sink之间实现故障转移或负载均衡 类型:
Default Sink Processor
一个组内只有一个sink,不强制用户为Sink创建Processor 就是之前的单sink案例
Failover Sink Processor
通过配置维护了多个sink组成的优先级列表 需要为所有的sink分配优先级,所有的优先级数字必须是唯一的 数字越大表示优先获取channel数据 搭配 Replicating Channel Selector 使用
nginx2avro.properties
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名 a2.sources = s2 a2.channels = c1 c2 a2.sinks = k1 k2 ## Failover Sink Processor #声明一个由k1 k2组成的组,组名称为g1 a2.sinkgroups = g1 a2.sinkgroups.g1.sinks = k1 k2 #通过以下定义可以声明k1和k2的关系时故障转移关系(ha) a2.sinkgroups.g1.processor.type = failover #配置各个sink的优先级,只要有大小差别就行 a2.sinkgroups.g1.processor.priority.k1 = 10 a2.sinkgroups.g1.processor.priority.k2 = 5 #声明failover前最大等待时间,默认10s a2.sinkgroups.g1.processor.maxpenalty = 10000 # 定义source模块的类型及此类型相关的其他属性 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log a2.sources.s2.shell = /bin/sh -c #定义c1 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 #定义c2 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # 定义k1 # 注意:avro的归属服务器要在不同节点上,否则失去HA意义,端口号因为不在同一台服务器上所以可以相同 # 因为目前是伪分布,所以服务器只能一样并且端口号要区分开 a2.sinks.k1.type = avro a2.sinks.k1.hostname = 192.168.134.101 a2.sinks.k1.port = 4545 # 定义k2 a2.sinks.k2.type = avro a2.sinks.k2.hostname = 192.168.134.101 a2.sinks.k2.port = 4546 # 将source和sink模块绑定到对应的channel管道上 a2.sources.s2.channels = c1 c2 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2
avro2hdfs-active.properties
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名 a3.sources = s3 a3.sinks = k3 a3.channels = c3 # 定义source模块的类型及此类型相关的其他属性 a3.sources.s3.type = avro a3.sources.s3.bind = 192.168.134.101 a3.sources.s3.port = 4545 #定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性 a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # 定义sink模块的类型及相关属性 a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-active/%Y%m%d/ #启用根据时间生成转义字符的值及自动回滚生产日期目录 a3.sinks.k3.hdfs.round = true #使用本地系统时间戳作为基准进行日期回滚 a3.sinks.k3.hdfs.useLocalTimeStamp = true #设置文件的前缀,如果不设置则默认值为FlumeData a3.sinks.k3.hdfs.filePrefix = HiveLog #设置解决文件过多过小问题,将每个文件的大小控制在128M #将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据 #rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息 a3.sinks.k3.hdfs.rollInterval = 0 a3.sinks.k3.hdfs.rollSize = 128000000 a3.sinks.k3.hdfs.rollCount = 0 #写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效 a3.sinks.k3.hdfs.minBlockReplicas = 1 #批量写入到hdfs上文件中的最大event数量 #batchSize的值需要小于等于transactionCapacity的值 #hdfs类型的sink将数据写入到hdsf上的底层源码执行过程 #假如batchSize=200,transactionCapacity=100 #系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException a3.sinks.k3.hdfs.batchSize = 100 # fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile a3.sinks.k3.hdfs.fileType = DataStream # 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a3.sinks.k3.hdfs.writeFormat = Text # 将source和sink模块绑定到对应的channel管道上 a3.sources.s3.channels = c3 a3.sinks.k3.channel = c3
avro2hdfs-failover.properties
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名 a4.sources = s4 a4.sinks = k4 a4.channels = c4 # 定义source模块的类型及此类型相关的其他属性 a4.sources.s4.type = avro a4.sources.s4.bind = 192.168.134.101 a4.sources.s4.port = 4546 #定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性 a4.channels.c4.type = memory a4.channels.c4.capacity = 1000 a4.channels.c4.transactionCapacity = 100 # 定义sink模块的类型及相关属性 a4.sinks.k4.type = hdfs a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-failover/%Y%m%d/ #启用根据时间生成转义字符的值及自动回滚生产日期目录 a4.sinks.k4.hdfs.round = true #使用本地系统时间戳作为基准进行日期回滚 a4.sinks.k4.hdfs.useLocalTimeStamp = true #设置文件的前缀,如果不设置则默认值为FlumeData a4.sinks.k4.hdfs.filePrefix = HiveLog #设置解决文件过多过小问题,将每个文件的大小控制在128M #将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据 #rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息 a4.sinks.k4.hdfs.rollInterval = 0 a4.sinks.k4.hdfs.rollSize = 128000000 a4.sinks.k4.hdfs.rollCount = 0 #写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效 a4.sinks.k4.hdfs.minBlockReplicas = 1 #批量写入到hdfs上文件中的最大event数量 #batchSize的值需要小于等于transactionCapacity的值 #hdfs类型的sink将数据写入到hdsf上的底层源码执行过程 #假如batchSize=200,transactionCapacity=100 #系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException a4.sinks.k4.hdfs.batchSize = 100 # fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile a4.sinks.k4.hdfs.fileType = DataStream # 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a4.sinks.k4.hdfs.writeFormat = Text # 将source和sink模块绑定到对应的channel管道上 a4.sources.s4.channels = c4 a4.sinks.k4.channel = c4
Load balancing Sink Processor
可以配置同属一个组的多个sink之间负载平衡的能力 支持通过round_robin(轮询)或者random(随机)参数来实现事件的分发 默认情况下使用round_robin,也可以自定义分发机制 通常是多个sink绑定在同一个channel上
nginx2avro-balance.properties
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名 #管道留c1即可!!!!!!!!!!!!!!!!!! a2.sources = s2 a2.sinks = k1 k2 a2.channels = c1 #拷贝修改!!!!!!!!!!!!!!!!!! ## Load balancing Sink Processor #声明一个由k1 k2组成的组,组名称为g1 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance #是否以剔除失败的Sinks。 a1.sinkgroups.g1.processor.backoff = true #采用何种负载均衡算法,round_robin, random或自定义 a1.sinkgroups.g1.processor.selector = round_robin # 定义source模块的类型及此类型相关的其他属性 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log a2.sources.s2.shell = /bin/sh -c #定义c1 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 定义k1 a2.sinks.k1.type = avro a2.sinks.k1.hostname = 192.168.134.101 a2.sinks.k1.port = 4546 # 定义k2 a2.sinks.k2.type = avro a2.sinks.k2.hostname = 192.168.134.101 a2.sinks.k2.port = 4545 # 将source和sink模块绑定到对应的channel管道上 # 修改c2为c1!!!!!!!!!!!!!!!!!! a2.sources.s2.channels = c1 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c1
avro2hdfs-01.properties
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名 a3.sources = s3 a3.sinks = k3 a3.channels = c3 # 定义source模块的类型及此类型相关的其他属性 a3.sources.s3.type = avro a3.sources.s3.bind = 192.168.134.101 a3.sources.s3.port = 4545 #定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性 a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # 定义sink模块的类型及相关属性 #修改路径!!!!!!!!!!!!!!!!!! a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume-01/%Y%m%d/ #启用根据时间生成转义字符的值及自动回滚生产日期目录 a3.sinks.k3.hdfs.round = true #使用本地系统时间戳作为基准进行日期回滚 a3.sinks.k3.hdfs.useLocalTimeStamp = true #设置文件的前缀,如果不设置则默认值为FlumeData a3.sinks.k3.hdfs.filePrefix = HiveLog #设置解决文件过多过小问题,将每个文件的大小控制在128M #将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据 #rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息 a3.sinks.k3.hdfs.rollInterval = 0 a3.sinks.k3.hdfs.rollSize = 128000000 a3.sinks.k3.hdfs.rollCount = 0 #写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效 a3.sinks.k3.hdfs.minBlockReplicas = 1 #批量写入到hdfs上文件中的最大event数量 #batchSize的值需要小于等于transactionCapacity的值 #hdfs类型的sink将数据写入到hdsf上的底层源码执行过程 #假如batchSize=200,transactionCapacity=100 #系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException a3.sinks.k3.hdfs.batchSize = 100 # fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile a3.sinks.k3.hdfs.fileType = DataStream # 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a3.sinks.k3.hdfs.writeFormat = Text # 将source和sink模块绑定到对应的channel管道上 a3.sources.s3.channels = c3 a3.sinks.k3.channel = c3
avro2hdfs-02.properties
#以及声明当前的flume-agent应用实例实例中三种模块的数量及别名 a4.sources = s4 a4.sinks = k4 a4.channels = c4 # 定义source模块的类型及此类型相关的其他属性 a4.sources.s4.type = avro a4.sources.s4.bind = 192.168.134.101 a4.sources.s4.port = 4546 #定义当前flume-agent应用实例中的channel模块的类型及此类型相关的其他属性 a4.channels.c4.type = memory a4.channels.c4.capacity = 1000 a4.channels.c4.transactionCapacity = 100 # 定义sink模块的类型及相关属性 #修改路径!!!!!!!!!!!!!!!!!! a4.sinks.k4.type = hdfs a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume02/%Y%m%d/ #启用根据时间生成转义字符的值及自动回滚生产日期目录 a4.sinks.k4.hdfs.round = true #使用本地系统时间戳作为基准进行日期回滚 a4.sinks.k4.hdfs.useLocalTimeStamp = true #设置文件的前缀,如果不设置则默认值为FlumeData a4.sinks.k4.hdfs.filePrefix = HiveLog #设置解决文件过多过小问题,将每个文件的大小控制在128M #将rollInterval和rollCount属性的值改为0后文件的回滚将不再由时间间隔及事件的数量为依据 #rollSize的值需要是一个127M左右的值,因为每个文件是一个block块,每个block块都还包含元数据信息 a4.sinks.k4.hdfs.rollInterval = 0 a4.sinks.k4.hdfs.rollSize = 128000000 a4.sinks.k4.hdfs.rollCount = 0 #写入到hdfs的最小副本数,不设置会导致上面的三个参数不生效 a4.sinks.k4.hdfs.minBlockReplicas = 1 #批量写入到hdfs上文件中的最大event数量 #batchSize的值需要小于等于transactionCapacity的值 #hdfs类型的sink将数据写入到hdsf上的底层源码执行过程 #假如batchSize=200,transactionCapacity=100 #系统创建一个容量为100个event的临时队列容器(transactionCapacity)-》sink会最多取出200(batchSize)个event塞到transactionCapacity容器中-》因为transactionCapacity的容量不够会报错channelException a4.sinks.k4.hdfs.batchSize = 100 # fileType定义的是数据流的格式,默认的数据流的格式为SequenceFile a4.sinks.k4.hdfs.fileType = DataStream # 写入到hdfs上的文件的格式(序列化方法) # 格式改为text后,可以通过cat 或 text 命令查看文件中的日志内容 a4.sinks.k4.hdfs.writeFormat = Text # 将source和sink模块绑定到对应的channel管道上 a4.sources.s4.channels = c4 a4.sinks.k4.channel = c4