【大数据实践】Kafka生产者编程(5)——ProducerConfig详解(下)

前言

上一篇文章【大数据实践】Kafka生产者编程(4)——ProducerConfig详解(上)中,对kafka producer的相关配置项进行了部分介绍,在本文章中,将继续完成剩下配置项的介绍。

ProducerConfig类

buffer.memory

重要性:高
类型:Long
默认值:33554432字节,即32M

producer可以用于缓存等待发送到服务端的消息记录的缓冲区大小,当消息记录发送到缓冲区的速度大于传输到server的速度,那么等待发送的消息记录将会放在缓冲区,缓冲区如果满了,那么producer会阻塞max.block.ms指定的毫秒数,超过该毫秒数时,将抛出异常。

注意:该缓冲区的大小设置与整个producer需要使用到的producer大体一致,但是要注意并不是所有的缓冲区都是用来存放待发送的records的,比如还有一部分用于压缩数据(当压缩数据的选项开启),还有一部分用于维护in-flight(正在发送)的请求列表。

retry.backoff.ms

重要性:低
类型:Long
默认值:100毫秒

当一个producer到指定的partition的请求request失败时,在重连之前,需要等待的毫秒数。这是为了避免在某些失败的场景下,过于密集地重复发送请求。

compression.type

重要性:高
类型:String
默认值:"none"

producer对数据使用的压缩类型,包括:

  • none:无压缩类型
  • gzip
  • snappy
  • lz4

producer在压缩数据时,是对所有batches的数据一起进行压缩,而不是一个batch一个batch压缩,所以,一次压缩的batches越多,压缩率越高,压缩效果越好。

metrics.sample.window.ms

重要性:低
类型:Long
默认值:30000毫秒,即30秒

计算度量样本的时间窗口,度量用于kafka监控。

metrics.num.samples

重要性:低
类型:int
默认值:2

维护用于计算度量metrics的样本数量。

metrics.recording.level

重要性:低
类型:String
默认值:info

用于metrics的最高纪录等级。

metric.reporters

重要性:低
类型:List
默认值:Collections.emptyList()

被作为metrics reporter的类的列表,这些类都实现了接口org.apache.kafka.common.metrics.MetricsReporter,因此当有新的metric生成时,这些reporters类都可以接收到通知。通常都会包含JmxReporter类,以用于注册JMX统计。

JMX(Java Management Extensions)

kafka使用jmx调取kafka broker的内部数据,来监控一些敏感的数据。

JMX相关资料:

从零开始玩转JMX(一)——简介和Standard MBean
从零开始玩转JMX(二)——Condition
从零开始玩转JMX(三)——Model MBean
从零开始玩转JMX(四)——Apache Commons Modeler & Dynamic MBean

max.in.flight.requests.per.connection

重要性:低
类型:int
默认值:5

在单个连接中,producer客户端在阻塞之前,可以允许未被确认的最大请求数,即当一个连接中未被确认的请求数超过了该设置,那么该producer客户端将会阻塞。注意:如果该值设置得比1大,当出现发送失败的情况,且retries配置项又开启时,那么存在消息被重新排序的风险。

retries

重要性:低
类型:int
默认值:0,表示不重试

当该值被设置成大于0时,客户端会重新发送消息,并且记录发送失败的错误。注意,该重试配置项和客户端因收到错误而重发是一样的。当retries配置项大于0,且max.in.flight.requests.per.connection配置项的值大于1时,存在将重试记录重新排序的风险,也就是说,消息记录的顺序可能会被打乱。原因是:当两个batch被发送到同一个partition时,如果第一个失败,而第二个成功,那么第一个会被重试,此时第二个batch就排在前面了。

key.serializer

重要性:高
类型:Class
默认值:无

消息记录key的序列化类。

value.serializer

重要性:高
类型:Class
默认值:无

消息记录中value的序列化类。

connections.max.idle.ms

重要性:中
类型:Long
默认值:540000毫秒,即9分钟

如果一个连接空闲时间超过该配置值,那么该连接将会被关闭。

partitioner.class

重要性:中
类型:Class
默认值:无

计算消息记录要分配到哪个partitioner的类。在前面的文章【大数据实践】Kafka生产者编程(3)——Interceptor & Partitioner中,对partitioner有详细讲解。

interceptor.classes

重要性:低
类型:List
默认值: Collections.emptyList()

拦截链,在前面的文章【大数据实践】Kafka生产者编程(3)——Interceptor & Partitioner有详细讲解。

enable.idempotence

重要性:低
类型:Boolean
默认值:false

是否使用幂等性。如果设置为true,表示producer将确保每一条消息都恰好有一份备份;如果设置为false,则表示producer因发送数据到broker失败重试使,可能往数据流中写入多分重试的消息。

注意:如果使用idempotence,即enable.idempotence为true,那么要求配置项max.in.flight.requests.per.connection的值必须小于或等于5;配置项retries的值必须大于0;acks配置项必须设置为all。如果这些值没有被用户明确地设置,那么系统将自动选择合适的值。如果设置的值不合适,那么会抛出ConfigException异常。

transaction.timeout.ms

重要性:低
类型:Int
默认值:60000毫秒,即60秒

在主动中止(aborting)一个事务transaction之前,事务协调员(transaction coordinator)最多等待的最长时间——为了让producer更新事务状态。如果该值大于kafka broker中设置的transaction.max.timeout.ms配置项的值,那么producer 的请求将因为InvalidTransactionTimeout错误而失败。

transactional.id

重要性:低
类型:String
默认值:null,表示transactions不能被使用

配置TransactionalId,用于事务的递交(delivery)。该配置项可以为跨多个producer的session提供可靠性语义,因为它可以保证在开始一个新的事务之前,使用相同事务ID(TransactionalId)的事务一定会完成。

注意:如果配置了transactional.id的值,那么必须配置enable.idempotence为true。在发布环境中,事务要求一个kafka集群必须有最少3个brokers(推荐设置)。在开发环境中,可以通过调整broker的配置项transaction.state.log.replication.factor来进行调整,以方便开发。

小结

终于将producer的各个配置项过了一遍,通常我们需要关注的大概都是重要性为高的配置项。从上面的配置项中,我们也了解到一些新的概念,比如度量(metrics)事务transactions等。这些概念可能会在接下来的文章中进行讲解。

相关推荐