flume与kafka集成

1、flume配置文件

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1

agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /opt/soft/tomcatloging/logs/test.log
agent1.sources.r1.batchSize = 10
agent1.sources.r1.channels= c1

agent1.sinks.k1.type = org.apache.flume.sink.KafkaSink
agent1.sinks.k1.channel = c1
agent1.sinks.k1.metadata.broker.list = 172.18.90.51:9092
agent1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
agent1.sinks.k1.request.required.acks = 1
agent1.sinks.k1.custom.topic.name = test22

agent1.channels.c1.type=memory
agent1.channels.c1.capacity=10000
agent1.channels.c1.transactionCapacity=500
agent1.channels.c1.keep-alive=30

2、KafkaSink类

package org.apache.flume.sink;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;

public class KafkaSink extends AbstractSink implements Configurable{
	
	private static final Logger logger = LoggerFactory.getLogger(AbstractSink.class);
	
	public static final String PARTITION_KEY_NAME = "custom.partition.key";
    public static final String ENCODING_KEY_NAME = "custom.encoding";
    public static final String DEFAULT_ENCODING = "UTF-8";
    public static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
    public static final String CUSTOME_CONSUMER_THREAD_COUNT_KEY_NAME = "custom.thread.per.consumer";

	private Properties parameters;
	private Producer<String, String> producer;
	
	@Override
	public synchronized void start() {
		super.start();
		ProducerConfig config = new ProducerConfig(parameters);
		this.producer = new Producer<String,String>(config);
	}
	
	@Override
	public Status process() throws EventDeliveryException {
		
		Status status = null;
		
		//start transaction
		Channel ch = getChannel();
		Transaction tx = ch.getTransaction();
		tx.begin();
		
		try{
			Event event = ch.take();
			
			String partitionKey = (String)parameters.get(PARTITION_KEY_NAME);
			String encoding = StringUtils.defaultIfEmpty((String)parameters.get(ENCODING_KEY_NAME), DEFAULT_ENCODING);
			String topic = Preconditions.checkNotNull((String)parameters.get(CUSTOME_TOPIC_KEY_NAME), "custom.topic.name is required");
			
			String eventData = new String(event.getBody(),encoding);
			
			KeyedMessage<String, String> data;
			
			if(StringUtils.isEmpty(partitionKey)){
				data = new KeyedMessage<String, String>(topic, eventData);
			}else{
				data = new KeyedMessage<String, String>(topic,partitionKey, eventData);
			}
			
			if(logger.isInfoEnabled()){
				logger.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]");
			}
			
			producer.send(data);
			tx.commit();
			status = Status.READY;
		}catch(Throwable t){
			tx.rollback();
			status = Status.BACKOFF;
			if(t instanceof Error){
				throw (Error)t;
			}
		}finally{
			tx.close();
		}
		
		return status;
	}

	@Override
	public void configure(Context context) {
		ImmutableMap<String, String> props = context.getParameters();
		
		parameters = new Properties();
		for(String key : props.keySet()){
			String value = props.get(key);
			parameters.put(key, value);
		}
	}

	
	@Override
	public synchronized void stop() {
		producer.close();
	}

}

3、把相关的kafka及scala包导入到flume的lib中。

相关推荐