Kafka Producer参数配置,如何保证消息严格有序?

一、retries参数

配置为大于0的值的话,客户端会在消息发送失败时重新发送。重试等同于在producer发送有异常时重新发送消息。如果不把max.in.flight.requests.per.connection设为1,重试可能会改变消息的顺序。两条消息同时发送到同一个分区,第一条失败了,并在第二条发送成功后重新发送,那么第二条消息可能在第一条消息前到达。

设置最大重试retries 次数,默认值=0。

二、max.in.flight.requests.per.connection 参数

max.in.flight.requests.per.connection - affects ordering,设置为1可以保证有序性,但是发送性能会受影响。不为1的时候,如果发生消息重发则会乱序。

在阻塞之前,客户端在单个连接上发送的未确认(unacknowledged)请求的最大数目。当这个参数设置>1时且开启了retries(重试)参数,遇到失败请求时,就有可能造成消息重排序。当这个参数设置为1是,消费顺序正常。

默认值:5(大于1)

英文解释:

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

三、示例代码

如果某些场景要求消息是有序的,那么不建议把retries设置成0。可以把max.in.flight.requests.per.connection设置成1,虽然会严重影响生产者的吞吐量,但是可以保证严格有序。

package com.rickie.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class ProducerParamsDemo {
 public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 3);
 props.put("max.in.flight.requests.per.connection", 1);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("max.block.ms", 30000);
 props.put("request.timeout.ms", 30000);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++) {
 producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
 }
 producer.close();
 }
}

Kafka Producer参数配置,如何保证消息严格有序?

相关推荐