flume与kafka集成
1、flume配置文件
agent1.sources = r1 agent1.channels = c1 agent1.sinks = k1 agent1.sources.r1.type = exec agent1.sources.r1.command = tail -F /opt/soft/tomcatloging/logs/test.log agent1.sources.r1.batchSize = 10 agent1.sources.r1.channels= c1 agent1.sinks.k1.type = org.apache.flume.sink.KafkaSink agent1.sinks.k1.channel = c1 agent1.sinks.k1.metadata.broker.list = 172.18.90.51:9092 agent1.sinks.k1.serializer.class = kafka.serializer.StringEncoder agent1.sinks.k1.request.required.acks = 1 agent1.sinks.k1.custom.topic.name = test22 agent1.channels.c1.type=memory agent1.channels.c1.capacity=10000 agent1.channels.c1.transactionCapacity=500 agent1.channels.c1.keep-alive=30
2、KafkaSink类
package org.apache.flume.sink; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.commons.lang.StringUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; public class KafkaSink extends AbstractSink implements Configurable{ private static final Logger logger = LoggerFactory.getLogger(AbstractSink.class); public static final String PARTITION_KEY_NAME = "custom.partition.key"; public static final String ENCODING_KEY_NAME = "custom.encoding"; public static final String DEFAULT_ENCODING = "UTF-8"; public static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name"; public static final String CUSTOME_CONSUMER_THREAD_COUNT_KEY_NAME = "custom.thread.per.consumer"; private Properties parameters; private Producer<String, String> producer; @Override public synchronized void start() { super.start(); ProducerConfig config = new ProducerConfig(parameters); this.producer = new Producer<String,String>(config); } @Override public Status process() throws EventDeliveryException { Status status = null; //start transaction Channel ch = getChannel(); Transaction tx = ch.getTransaction(); tx.begin(); try{ Event event = ch.take(); String partitionKey = (String)parameters.get(PARTITION_KEY_NAME); String encoding = StringUtils.defaultIfEmpty((String)parameters.get(ENCODING_KEY_NAME), DEFAULT_ENCODING); String topic = Preconditions.checkNotNull((String)parameters.get(CUSTOME_TOPIC_KEY_NAME), "custom.topic.name is required"); String eventData = new String(event.getBody(),encoding); KeyedMessage<String, String> data; if(StringUtils.isEmpty(partitionKey)){ data = new KeyedMessage<String, String>(topic, eventData); }else{ data = new KeyedMessage<String, String>(topic,partitionKey, eventData); } if(logger.isInfoEnabled()){ logger.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]"); } producer.send(data); tx.commit(); status = Status.READY; }catch(Throwable t){ tx.rollback(); status = Status.BACKOFF; if(t instanceof Error){ throw (Error)t; } }finally{ tx.close(); } return status; } @Override public void configure(Context context) { ImmutableMap<String, String> props = context.getParameters(); parameters = new Properties(); for(String key : props.keySet()){ String value = props.get(key); parameters.put(key, value); } } @Override public synchronized void stop() { producer.close(); } }
3、把相关的kafka及scala包导入到flume的lib中。
相关推荐
chenguangchun 2020-07-26
myt0 2020-07-18
IT影风 2020-07-18
chenguangchun 2020-06-28
jiaomrswang 2020-06-26
myt0 2020-06-16
xiaoxiaojavacsdn 2020-06-08
zzjmay 2020-06-07
strongyoung 2020-06-04
ErixHao 2020-05-20
啦啦啦啦啦 2020-05-15
wanfuchun 2020-05-14
onwaygoahead 2020-05-05
xiaoxiaojavacsdn 2020-05-01
chenguangchun 2020-04-18
QAnyang 2020-03-14
wsong 2020-03-13