基于Apache Flume Datahub插件同步上云
简介
Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub。
环境要求
JDK (1.6以上,推荐1.7)
Flume-NG 1.x
Apache Maven 3.x
插件部署
下载插件源码
$ git clone [email protected]:aliyun/aliyun-odps-flume-plugin.git
使用maven编译项目
$ cd aliyun-odps-flume-plugin/
$ mvn clean package -DskipTests
编译成功后,插件所在目录为:target/odps_sink
部署Datahub Sink插件
将文件夹odps_sink移动到Apache Flume安装目录下
$ mkdir {YOUR_FLUME_DIRECTORY}/plugins.d
$ mv target/odps_sink/ {YOUR_FLUME_DIRECTORY}/plugins.d/
移动后,核验ODPS Sink插件是否已经在相应目录:
$ ls { YOUR_APACHE_FLUME_DIR }/plugins.dodps_sink
配置示例
Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对日志文件中的结构化数据进行解析,并上传到Datahub Topic中。
需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):
# test_basic.logsome,log,line1some,log,line2
...
下面将创建Datahub Topic,并把每行日志的第一列和第三列作为一条记录写入Topic中。
创建Datahub Topic
使用datahub console创建topic语句示例如下:
ct test_project test_topic 1 1 (string c1, string c2);
Flume配置文件
在Flume安装目录的**conf/**文件夹下创建名为**datahub_basic.conf**的文件,并输入内容如下:
# A single-node Flume configuration for Datahub# Name the components on this agenta1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log# Describe the sinka1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = test_project
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.batchSize = 1a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = c1,c2,
a1.sinks.k1.serializer.charset = UTF-8a1.sinks.k1.shard.number = 1a1.sinks.k1.shard.maxTimeOut = 60# Use a channel which buffers events in memorya1.channels.c1.type = memory
a1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 1000# Bind the source and sink to the channela1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这里serializer配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。
启动Flume
配置完成后,启动Flume并指定agent的名称和配置文件路径,添加**-Dflume.root.logger=INFO,console**选项可以将日志实时输出到控制台。
$ cd {YOUR_FLUME_DIRECTORY}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
写入成功,显示日志如下:
...Write success. Event count: 2...
数据使用
日志数据通过Flume上传到Datahub后,可以使用StreamCompute流计算来进行实时分析,例如对于一些Web网站的日志,可以实时统计各个页面的PV/UV等。另外,导入Datahub的数据也可以配置Connector将数据归档至MaxCompute中,方便后续的离线分析。