Kafka 2.3 Producer (0.9以后版本适用)
kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。
这里直接使用最新2.3版本,0.9以后的版本都适用。
注意引用的包为:org.apache.kafka.clients.producer
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerDemo { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); kafkaProducer.send(new ProducerRecord<>("topic", "value")); kafkaProducer.close(); } }
0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class TransactionsProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close(); } }
更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算