Flume详解

1、Flume简介

         Flume是Cloudera 开发的分布式日志收集系统,是 hadoop 周边组件之一。提供分布式、高可靠和高可用的海量日志聚合的服务,支持在系统中定制各类数据发送方;同时,Flume提供对数据进行简单处理,并写到各种数据接收方(可定制)。当前 Flume 已纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。

1.1、设计目标

1.1.1、可靠性

         当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱以此分别为:end-to-end(收集数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送)、Store on failure(这也是scribe采用的策略,当数据接受方crash时,将数据写到本地,待恢复后,继续发送)、best effort(数据发送到接收方后,不会进行确认)。

1.1.2、可扩展性

         Flume采用了三层架构,分别为agent、collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用zookeeper进行管理和负载均衡),这就避免了单点故障问题。

1.1.3、可管理性

         所有agent和collector由master统一管理,这使得系统便于维护。多master情况,flume利用zookeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了web和shell script command两种形式对数据流进行管理。

1.1.4、功能可扩展性

         用户可以根据需要添加自己的agent、collector或者storage。此外,flume自带了很多组件,包括各种agent(file、syslog等),collector和storage(file、HDFS等)。

1.2、Flume架构

1.2.1、Flume逻辑架构

Flume详解 

         正如前面提到的,flume采用了分层架构,分别为:agent、collector和storage。其中,agent和collector均由两部分组成:source和sink,source是数据来源,sink是数据去向。

         Flume使用两个组件:master和node。Node根据在master shell或者web中动态配置,决定其作为agent还是collector。

1.2.2、Agent详解

         Agent的作用是将数据源的数据发送给collector。

  1、Flume自带了很多直接可用的数据源(source),例如:

  • Text(“filename”):将文件filename作为数据源,按行发送
  • tail(“filename”):将探测filename新产生的数据,按行发送出去
  • fsyslogTcp(5140):监听TCP的5140端口,并且接受到的数据发送出去
  • tailDir(“dirname[,fileregex=”.*”[,startFromEnd=false[,recurseDepth=0]]]):监听目录中的文件末尾,使用这则去选定需要监听的文件(不包括目录),recurseDepth为递归监听其子目录的深度。
     注:更多可参见http://www.cnblogs.com/zhangmiao-chp/archive/2011/05/18/2050465.html

  2、 同时提供了很多sink,如:

  • console(“format”):直接将数据显示在console上
  • text(“txtfile”):将数据写到文件txtfile中
  • dfs(“dfsfile”):将数据写到HDFS上的dfsfile文件中
  • syslogTcp(“host”,prot):将数据通过tcp传递给host节点
  • agentSink[(“machine”[,port])]:等价于agentE2ESink,如果省略machine参数,默认使用flume.collector.event.host;如果省略port参数,默认使用flume.collector.event.port。
  • agentDFOSink[("machine" [,port])]:本地热备agent,agent发现collector节点故障后,不断检查collector的存活状态以便重新发送event,在此间产生的数据将缓存到本地磁盘中
  • agentBESink[("machine"[,port])]:不负责的agent,如果collector故障,将不做任何处理,它发送的数据也将被直接丢弃
  • agentE2EChain:指定多个collector提高可用性。当向主collector发送event失效后,转向第二个collector发送,当所有的collector失败后,它会非常执着的再来一遍

     注:想了解更多见http://www.cnblogs.com/zhangmiao-chp/archive/2011/05/18/2050472.html

1.2.3Collector详解

         Collector的作用是将多个agent的数据汇总后,添加到storage中,它的source和sink与agent类似。

1)数据源(source)如:

  • collectorSource[(port)]:Collector source,监听端口汇聚数据
  • autoCollectorSource:通过master协调物理节点自动汇聚数据
  • logicalSource:逻辑source,由master分配端口并监听rpcSink

 2)Sink如:

  • collectorSink( "fsdir","fsfileprefix",rollmillis):collectorSink,数据通过collector汇聚之后发送到hdfs, fsdir 是hdfs目录,fsfileprefix为文件前缀码
  • customdfs(“hdfspath”[,”format”]):自定义格式dfs

1.2.4、Storage详解

         Storage是存储系统,可以是一个普通file,也可以是hdfs、hive、hbase等分布式存储。

1.2.5、Master详解

         Master是管理协调agent和collector的配置等信息,是flume集群的控制器。

1.3、flume数据流

         在flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生、传输、处理并最终写入目标的一条路径。

Flume详解 

      1、对于agent数据流配置就是从那里得到数据,并把数据发送到那个collector。

      2、对于collector是接收agent发送过来的数据,并把数据发送到指定的目标机器上。

注:flume框架对hadoop和zookeeper的依赖只是在jar包上,并不要求flume启动时必须将hadoop和zookeeper服务也启动。

2、Flume-NG详解

2.1、Flume-ng架构

Flume详解

         Flume NG是一个从flume继承保留来的,因此大部分概念是相同的,官网给出的解释如下:

  • you still have sources and sinks and they still do the same thing.they are now connected by channels.
  • channels are pluggable and dictate durability.flume NG ships with an in-memory channel for fast,but non-durable event;delivery and a jdbc-based channel for durable event delivery.we have recently added a file-based durable channel too.
  • there’s no more logical or physical nodes.we call all physical nodes agents and agents can run zero or more sources and sinks.
  • There's no master and no ZooKeeper dependency anymore. At this time, Flume runs with a simple file-based configuration system
  • Just about everything is a plugin, some end user facing, some for tool and system developers. (Specifically, sources, sinks, channels, configuration providers, lifecycle management policies, input and output formats, compression, source and sink channel adapters, and the kitchen sink.)
  • Tons of things are not yet implemented. Please file JIRAs and / or vote for features you deem important.

2.1.1、event

         事件是flume ng中一种广义的数据单位。事件是类似于JMS和类似邮件系统的邮件,一般都比较小。事件是在一个更大的数据集常用单记录。事件被做成头和身体的,前者是一个键/值映射,后者是一个任意字节数组。

2.1.2、source

A source of data from which Flume NG receives data. Sources can be pollable or event driven. Pollable sources, like they sound, are repeatedly polled by Flume NG source runners where as event driven sources are expected to be driven by some other force. An example of a pollable source is the sequence generator which simple generates events whose body is a monotonically increasing integer. Event driven sources include the Avro source which accepts Avro RPC calls and converts the RPC payload into a Flume event and the netcat source which mimics the nc command line tool running in server mode. Sources are a user accessible API extension point.

2.1.3、sink

A sink is the counterpart to the source in that it is a destination for data in Flume NG. Some of the builtin sinks that are included with Flume NG are the Hadoop Distributed File System sink which writes events to HDFS in various ways, the logger sink which simply logs all events received, and the null sink which is Flume NG's version of /dev/null. Sinks are a user accessible API extension point.

2.1.4、channel

         通道是一个源和一个接收器之间的管道事件。渠道也决定了一个源和一个接收器之间的事件持久性。例如,一个通道可能会在内存中,在内存虽然快,但不作任何保证防止数据丢失,它也可以全面持久的(从而可靠),其中每一个事件,保证交付连接的接收器,即使在失败的案例,如断电。渠道是一个用户访问API的扩展点。

2.1.5、agent

Flume NG 归纳代理的概念:代理人是任何物理的JVM中运行的Flume NG。一般每台机器运行一个agent,但是在一个单一的agent中可以运行任意数量的source、sink和channel。

2.1.6、client

客户端并不一定是一个Flume NG组件尽可能连接到Flume 和发送数据到源。一个流行和良好的客户端的一个例子将是一个像的Log4j Appender直接发送事件到flume avro源的日志记录。另一个例子可能是syslog守护进程。

2.2、flume NG安装配置

Flume-ng来源于cloudera公司开发的flume-og系统,flume-ng对flume-og系统进行了重构差生的。因此如果我们采用cloudera manager安装的hadoop集群,则安装flume-ng的过程非常简单。

结合实际生成环境的配置,在这里主要讲解一下source、channel、sink和interceptors的配置以及他们的作用,如果想深入理解可以查看如下链接:

https://flume.apache.org/FlumeUserGuide.html

2.2.1、flume-ng安装

         在采用hadoop构建集群的过程中,为了保证服务器版本的稳定性,我们一般都会自己搭建hadoop的yum源,采用yum的方式安装flume-ng是一个非常愉快的过程,我们需要的就是在/etc/yum.repo.d/中配置我们自己搭建yum源(如果是cloudera的话,配置cdh的源),然后执行yum search flume-ng查看是否能找到所需的依赖包。然后执行yum install flume-ng就可以完成flume-ng的安装。

         Flume-ng比较复杂的地方就是flume配置文件的配置。如果采用cloudera mananger cdh4源安装出来的配置文件位于:/usr/lib/flume-ng/conf目录下的flume.conf文件中;

2.2.2、flume-ng配置source源

         下图是flume-ng支持的所有source的配置,在生产环境中,不是所有的配置都能用到,常用的有avro和exec配置。因为我们主要讲解的就是这两种源的配置。

Flume详解

      ① 关exec配置方式:

Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as datewill probably not - the former two commands produce streams of data where as the latter produces a single event and exits.

Flume详解

配置实例:

Flume详解

         注意:在使用的command的时候,最好用tial –n 0 –F /var/log/source,这样配置只会监听到后续刷新到日志文件中的最新日志内容。

       ②关于avro源配置方式

Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.

Flume详解

         配置实例:

Flume详解

         注意:当源为avro源配置时,需要采用avro-client或者配置avro sink将日志数据发送到该源。通常在把该agent配置为collector的时候,会把source配置为avro源,用来接收上一个源(agent)发送来的数据。

2.2.3flume-ng配置channel管道

         下图是flume-ng支持的所有channel类型,在生产环境中,我们主要需要考虑两个方面,一个是数据传输速度,另一个是持久化。这两方面是一个矛盾体,只能根据我们的实际情况来决定,channel更偏重哪一个方向。因此在这里讲解一下memory和file两种方式的配置:

Flume详解

      ①基于memory的channel配置

The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.

Flume详解

         配置实例:

Flume详解

注意:是实际应用中,主要配置capacity和transactionCapacity这两个参数。内存方式传输速度快,但数据没有持久化,一旦发生异常,则存储在里面的数据丢失。但是在实际的应用中,该配置是应用最广泛的。

      ②基于file的channel配置

File的配置项非常多,但是常用的配置项只有type、checkpointDir和dataDirs。其中type是必须的,就是声明管道类型为file;checkpointDir是配置用来存储checkpoint(在File header里前8个字节存储了版本号,接下来24个字节是sequeuece no,接下来4个字节存储了checkpoint的状态)的目录;dataDir就是配置存储日志文件的路径。

Flume详解

注:如果想深入了解filechannel,可以查看该文章:http://blog.csdn.net/xiaochawan/article/details/8996102

2.2.4、flume-ng配置sink

         Sink就是用来配置日期最终保存在那里,在hadoop集群应用中,sink通常有两种配置方式比较常用,分别是avro和hdfs。如果配置为avro通常是将日志发送给下一个agent(collector)处理;如果配置为hdfs通常是把日志存储到hdfs中。

         下图是sink支持的所有配置类型:

Flume详解

      ① Avro配置详解

      在前面也讲解过了,配置为avro用来中继日志传输。配置实例如下:

Flume详解

      注:这里的hostname和port,不是说该服务器会监听该ip地址和端口号,而是将日志发送到的主机和对应的端口号。

      ② Hdfs配置详解

如果配置为hdfs类型,则是将日志保存到hdfs文件系统中。在hdfs配置中,我们需要先了解一些hdfs配置类型中,会自动解析的变量。

Flume详解

         常用的配置实例如下:

Flume详解

         注:在hdfs配置中还有这个参数比较重要就是useLocalTimeStamp,默认情况下该变量为false,即系统中所取的timestamp来源于header中。讲到这里,需要强调两点,一个是host也是从header中提取的,两一个是默认情况下header中存在timestamp和host两个参数选项,但都为空,所以我们在hdfs中是获取到的都是空的,这通常是通过最初的agent在source配置中,添加上interceptors配置解决(下面会讲解)。而且如果event中间经过多个agent进行中继,如果中继的agent没有再显示的配置interceptors重写header中的信息,则event中的header信息就是来源于最初的agent,中继过程不会对其修改。

2.2.5、flume-ng配置interceptors

         为什么要配置interceptors,在hdfs配置这一章节中已经详解说明了原因。其实通过给source添加interceptors配置,可以灵活的传递很多有用的信息。在interceptors配置这一章节中,主要讲解timestamp、host和static配置方式,这也是我们生产环境中常用的配置,其他的方式感兴趣就去自己看吧。

         下面是flume-ng支持的所有interceptors方式:

Flume详解

      Timestamp配置:

通过interceptorstimestamp配置,将time时间插入到event header中,并且timestamp的精确度为millis (普通的timestamp精确度为10位,而这个为13位)。配置参数如下:

Flume详解

         配置实例:

Flume详解

         注:默认情况下event header中是存在timestamp的,不过值为空;所以上面的即使配置了也不能改变event header中的timestamp的值,所以还需要添加一个参数,即preserveExisting=true,替换掉原有的timestamp参数。

     ② Host配置

通过配置该项,将agent的hostname或者ip地址插入到event header的host变量中,常用的配置参数如下:

Flume详解

      配置实例:

Flume详解

      注意:我们是时间的配置过程中,需要最好显示的指明是用ip地址还是hostname;还有就是痛timestamp一样的问题,默认情况下event header的hostname为空,所以只是简单的如实例中的那种配置,对实际的hostname不会产生任何影响,需要配置preserveExisting=true,替换掉原有的hostname参数。

      Static配置

      Static配置是为用户自定义变量和其值传入到event header中传输。他的配置参数如下:

Flume详解

     配置实例:

Flume详解

     注意:在这里我着重强调的还是preserveExisting问题。为了配置的变量能够生效,我们最好是配置preserveExisting变量,并设置为true。

3、flume-ng的一个完成配置实例

# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1 sink2

# configurate source
tier1.sources.source1.channels = channel1
tier1.sources.source1.type     = exec
tier1.sources.source1.command = tail --follow=name /usr/local/openresty/nginx/logs/logapi/api.log
# configurate channel
tier1.channels.channel1.type   = memory
tier1.channels.channel1.capacity = 500
tier1.channels.channel1.transactionCapacity = 400
# configurate sink
tier1.sinks.sink1.channel      = channel1
tier1.sinks.sink1.type         = hdfs
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.path = /logs/orignal/%Y%m%d/
tier1.sinks.sink1.hdfs.rollInterval = 3600
tier1.sinks.sink1.hdfs.fileType = CompressedStream
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.filePrefix = api-152.%Y%m%d%H%M
tier1.sinks.sink1.hdfs.inUseSuffix = .tmp
tier1.sinks.sink1.hdfs.codeC = gzip
tier1.sinks.sink1.hdfs.round = true
tier1.sinks.sink1.hdfs.roundValue = 5
tier1.sinks.sink1.hdfs.roundUnit = minute

tier1.sinks.sink2.channel      = channel1
tier1.sinks.sink2.type         = avro
tier1.sinks.sink2.hostname = *.*.*.*
tier1.sinks.sink2.port = 9999

在该实例中source、channel、sink和interceptors都存在了。Source为avro类型,即可以接收avro-agent读取日志文件发送过来的数据,也可以接受来源于上一个agent通过avro方式发送来的数据;interceptors的配置是为event header中添加timestamp和host,并且如果event需要穿过多个agent,且中继agent没有在配置interceptors替换掉event header中的值,则源agent配置的event header的值会最终传输到sink中使用;channel配置为memory类型,为了使保证传输效率,可以容忍部分日志的丢失;sink配置为hdfs类型,就是将日志保存到我们指定路径下的hdfs文件中,并且每隔1小时对日志拆分,并压缩保存的日志。

相关推荐