FlumeNG与Kafka整合

1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka

#agent section  
producer.sources = s  
producer.channels = c  
producer.sinks = r  
  
#source section  
#producer.sources.s.type = seq  
producer.sources.s.type = netcat  
producer.sources.s.bind = localhost  
producer.sources.s.port = 44444  
producer.sources.s.channels = c  
  
# Each sink's type must be defined  
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink  
producer.sinks.r.metadata.broker.list=127.0.0.1:9092  
producer.sinks.r.partition.key=0  
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition  
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder  
producer.sinks.r.request.required.acks=0  
producer.sinks.r.max.message.size=1000000  
producer.sinks.r.producer.type=sync  
producer.sinks.r.custom.encoding=UTF-8  
producer.sinks.r.custom.topic.name=test  
  
#Specify the channel the sink should use  
producer.sinks.r.channel = c  
  
# Each channel's type is defined.  
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000

2,配置consumer,source是Kafka,sink是logger

consumer.sources = s  
consumer.channels = c  
consumer.sinks = r  
  
consumer.sources.s.type = seq  
consumer.sources.s.channels = c  
consumer.sinks.r.type = logger  
  
consumer.sinks.r.channel = c  
consumer.channels.c.type = memory  
consumer.channels.c.capacity = 100  
  
consumer.sources.s.type = org.apache.flume.plugins.KafkaSource  
consumer.sources.s.zookeeper.connect=127.0.0.1:2181  
consumer.sources.s.group.id=testGroup  
consumer.sources.s.zookeeper.session.timeout.ms=400  
consumer.sources.s.zookeeper.sync.time.ms=200  
consumer.sources.s.auto.commit.interval.ms=1000  
consumer.sources.s.custom.topic.name=test  
consumer.sources.s.custom.thread.per.consumer=4

3,分别运行着两个agent

bin/flume-ng agent --conf conf  --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf  --conf-file conf/comsumer1.properties   --name consumer -Dflume.root.logger=INFO,console

4,这时telnet上端口44444

hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444  
Trying ::1...  
Trying 127.0.0.1...  
Connected to localhost.  
Escape character is '^]'.  
1111111111111111  
OK  
kak^Hfkakakkakakakkakkakkaakaknnnm  
OK  
abcdefghijklmnopqrstuvwxyz  
OK

两个agent都有信息输出了

org.apache.flume.plugins的代码参考 :https://github.com/baniuyao/flume-kafka上面也有详细的使用方法

相关推荐