Kafka 0.8 Producer (0.9以前版本适用)
Kafka旧版本producer由scala编写,0.9以后已经废除
示例代码如下:
import kafka.producer.KeyedMessage; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties properties = new Properties(); properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.requird.acks", "1"); ProducerConfig config = new ProducerConfig(properties); Producer<String, String> producer = new Producer<String, String>(config); KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","key","hello"); producer.send(msg); } }
自定义partition示例代码如下:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } }
更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算