Kafka Producer参数配置,如何保证消息严格有序?
一、retries参数
配置为大于0的值的话,客户端会在消息发送失败时重新发送。重试等同于在producer发送有异常时重新发送消息。如果不把max.in.flight.requests.per.connection设为1,重试可能会改变消息的顺序。两条消息同时发送到同一个分区,第一条失败了,并在第二条发送成功后重新发送,那么第二条消息可能在第一条消息前到达。
设置最大重试retries 次数,默认值=0。
二、max.in.flight.requests.per.connection 参数
max.in.flight.requests.per.connection - affects ordering,设置为1可以保证有序性,但是发送性能会受影响。不为1的时候,如果发生消息重发则会乱序。
在阻塞之前,客户端在单个连接上发送的未确认(unacknowledged)请求的最大数目。当这个参数设置>1时且开启了retries(重试)参数,遇到失败请求时,就有可能造成消息重排序。当这个参数设置为1是,消费顺序正常。
默认值:5(大于1)
英文解释:
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
三、示例代码
如果某些场景要求消息是有序的,那么不建议把retries设置成0。可以把max.in.flight.requests.per.connection设置成1,虽然会严重影响生产者的吞吐量,但是可以保证严格有序。
package com.rickie.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerParamsDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 3); props.put("max.in.flight.requests.per.connection", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("max.block.ms", 30000); props.put("request.timeout.ms", 30000); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); } producer.close(); } }