Flume集群搭建

Flume 集群搭建 ,配置了2个sink,负载均衡

三台服务器,分别是

192.168.134.131    master
192.168.134.132 datanodea
192.168.134.133 datanodeb

集群的模式是这样的:

                                Master

                                    |

                =====================

                ||                                         ||

           DataNodeA                         DataNodeB  

这是master

#agent
agent.channels = channel
agent.sources = source
agent.sinks = node1 node2
agent.sinkgroups = g1
agent.sinkgroups.g1.sinks = node1 node2
agent.sinkgroups.g1.processor.type = load_balance
agent.sinkgroups.g1.processor.backoff = true
agent.sinkgroups.g1.processor.selector = round_robin
agent.sinkgroups.g1.processor.selector.maxTimeOut=10000


#channel
agent.channels.channel.type = memory
agent.channels.channel.capacity = 1000000
agent.channels.channel.transactionCapacity = 1000000
agent.channels.channel.keep-alive = 10


#source
agent.sources.source.channels = channel
agent.sources.source.type = avro
agent.sources.source.bind = master
agent.sources.source.port = 41414
agent.sources.source.threads = 5


#sink

#node1
agent.sinks.node1.channel = channel
agent.sinks.node1.type = avro
agent.sinks.node1.hostname = datanodea
agent.sinks.node1.port = 41414

#node2
agent.sinks.node2.channel = channel
agent.sinks.node2.type = avro
agent.sinks.node2.hostname = datanodeb
agent.sinks.node2.port = 41414

 这是datanodea

#agent
agent.channels = ch1 ch2
agent.sources = source
agent.sinks = elasticsearch file
agent.source.source.selector.type = replicating



#channel
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000000
agent.channels.ch1.transactionCapacity = 1000000
agent.channels.ch1.keep-alive = 10

agent.channels.ch2.type = memory
agent.channels.ch2.capacity = 1000000
agent.channels.ch2.transactionCapacity = 1000000
agent.channels.ch2.keep-alive = 10

#source
agent.sources.source.channels = ch1 ch2
agent.sources.source.type = avro
agent.sources.source.bind = datanodea
agent.sources.source.port = 41414
agent.sources.source.threads = 5


#sink
agent.sinks.file.channel = ch1
agent.sinks.file.type = file_roll
agent.sinks.file.sink.directory = /opt/flume/data
agent.sinks.file.sink.serializer = TEXT

agent.sinks.elasticsearch.channel = ch2
agent.sinks.elasticsearch.type = elasticsearch
agent.sinks.elasticsearch.hostNames = master:9300
agent.sinks.elasticsearch.indexName = flume_index
agent.sinks.elasticsearch.indexType = flume_type
agent.sinks.elasticsearch.clusterName = elasticsearch
agent.sinks.elasticsearch.batchSize = 1
agent.sinks.elasticsearch.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

 这是datanodeb

#agent
agent.channels = ch1 ch2
agent.sources = source
agent.sinks = elasticsearch file
agent.source.source.selector.type = replicating


#channel
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000000
agent.channels.ch1.transactionCapacity = 1000000
agent.channels.ch1.keep-alive = 10

agent.channels.ch2.type = memory
agent.channels.ch2.capacity = 1000000
agent.channels.ch2.transactionCapacity = 1000000
agent.channels.ch2.keep-alive = 10


#source
agent.sources.source.channels = ch1 ch2
agent.sources.source.type = avro
agent.sources.source.bind = datanodeb
agent.sources.source.port = 41414
agent.sources.source.threads = 5


#sink
agent.sinks.file.channel = ch1
agent.sinks.file.type = file_roll
agent.sinks.file.sink.directory = /opt/flume/data
agent.sinks.file.sink.serializer = TEXT

agent.sinks.elasticsearch.channel = ch2
agent.sinks.elasticsearch.type = elasticsearch
agent.sinks.elasticsearch.hostNames = master:9300
agent.sinks.elasticsearch.indexName = flume_index
agent.sinks.elasticsearch.indexType = flume_type
agent.sinks.elasticsearch.clusterName = elasticsearch
agent.sinks.elasticsearch.batchSize = 1
agent.sinks.elasticsearch.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

相关推荐