FlumeNG与Kafka整合
1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka
#agent section producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq producer.sources.s.type = netcat producer.sources.s.bind = localhost producer.sources.s.port = 44444 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=test #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000
2,配置consumer,source是Kafka,sink是logger
consumer.sources = s consumer.channels = c consumer.sinks = r consumer.sources.s.type = seq consumer.sources.s.channels = c consumer.sinks.r.type = logger consumer.sinks.r.channel = c consumer.channels.c.type = memory consumer.channels.c.capacity = 100 consumer.sources.s.type = org.apache.flume.plugins.KafkaSource consumer.sources.s.zookeeper.connect=127.0.0.1:2181 consumer.sources.s.group.id=testGroup consumer.sources.s.zookeeper.session.timeout.ms=400 consumer.sources.s.zookeeper.sync.time.ms=200 consumer.sources.s.auto.commit.interval.ms=1000 consumer.sources.s.custom.topic.name=test consumer.sources.s.custom.thread.per.consumer=4
3,分别运行着两个agent
bin/flume-ng agent --conf conf --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/comsumer1.properties --name consumer -Dflume.root.logger=INFO,console
4,这时telnet上端口44444
hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444 Trying ::1... Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. 1111111111111111 OK kak^Hfkakakkakakakkakkakkaakaknnnm OK abcdefghijklmnopqrstuvwxyz OK
两个agent都有信息输出了
org.apache.flume.plugins的代码参考 :https://github.com/baniuyao/flume-kafka上面也有详细的使用方法
相关推荐
Kafka 2020-09-18
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29