【大数据实践】Kafka生产者编程(2)——producer发送流程

前言

在上一篇文章【大数据实践】Kafka生产者编程(1)——KafkaProducer详解中,主要对KafkaProducer类中的函数进行了详细的解释,但仅针对其中的一些方法,对于producer背后的原理、机制,没有做深入讲解。因此,在本文章中,尝试介绍kafka producer整个发送流程。在写作此文章时,自己也处于对Kafka的学习阶段,可能有些细节掌握的并不精确,希望大家指正。

Producer消息发送流程

【大数据实践】Kafka生产者编程(2)——producer发送流程

备注:图片来源:https://blog.csdn.net/zhanglh...

构造KafkaProducer对象

在上一篇文章中,详细介绍了KafkaProducer的构造函数,其主要是对producer的一些选项进行配置。配置项可在类ProducerConfig中找到:

package org.apache.kafka.clients.producer;

public class ProducerConfig extends AbstractConfig {
...
}

其中,除了可以配置一些简单的数值,还可以配置一些kafka自带或者我们自定义的类,如:

  • key.serializer:key的序列化类,kafka在包package org.apache.kafka.common.serialization;中实现了一系列常用的序列化和反序列化的类。若要自定义序列化类,则需要实现接口org.apache.kafka.common.serialization.Serializer,如Integer的序列化类:

    package org.apache.kafka.common.serialization;
    
     import java.util.Map;
    
     public class IntegerSerializer implements Serializer<Integer> {
     public IntegerSerializer() {
     }
    
     public void configure(Map<String, ?> configs, boolean isKey) {
     }
    
     public byte[] serialize(String topic, Integer data) {
         return data == null ? null : new byte[]{(byte)(data.intValue() >>> 24), (byte)(data.intValue() >>> 16), (byte)(data.intValue() >>> 8), data.byteValue()};
     }
    
     public void close() {
     }
     }
  • value.serializer:value的序列化类。
  • partitioner.class:partition分配的类,使消息均匀发送到topic的各个分区partition中,Kafka默认partition为org.apache.kafka.clients.producer.internals.DefaultPartitioner。若要自定义负载均衡算法,需要实现org.apache.kafka.clients.producer.Partitioner接口。
  • 拦截链Interceptors:为拦截器List,可以让用户在消息记录发送之前,或者producer回调方法执行之前,对消息或者回调信息做一些逻辑处理。可以通过实现org.apache.kafka.clients.producer.ProducerInterceptor接口来定义自己的拦截器。

构造ProducerRecord

ProducerRecord即消息记录,记录了要发送给kafka集群的消息、分区等信息:

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
  • topic:必须字段,表示该消息记录record发送到那个topic。
  • value:必须字段,表示消息内容。
  • partition:可选字段,要发送到哪个分区partition。
  • key:可选字段,消息记录的key,可用于计算选定partition。
  • timestamp:可选字段,时间戳;表示该条消息记录的创建时间createtime,如果不指定,则默认使用producer的当前时间。
  • headers:可选字段,(作用暂时不明,待再查证补充)。

发送ProducerRecord

异步发送 & 同步发送

异步发送时,直接将消息记录扔进发送缓冲区,立即返回,有另外的线程负责将缓冲区中的消息发送出去。异步发送时,需要设置callback方法,当收到broker的ack确认时,将调用callback方法。下面直接贴kafka官方例子中,展示的异步和同步发送方法:

package kafka.examples;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                    messageNo,
                    messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }
}

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
     * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
     * non-null.
     *
     * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
     *                  occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

拦截链拦截处理ProducerRecord

调用send方法时,首先拦截器Interceptor将拦截ProducerRecord,调用Interceptor的onSend方法,对消息记录进行一些处理,返回处理后的ProducerRecord。

对ProducerRecord的Key和Value序列化

调用配置的key 和 value的序列化类,对ProducerRecord的key和value进行序列化,并设置到ProducerRecord中。

设置ProducerRecord的partition

通过DefaultPartitioner类或者配置项中指定的自定义Partitioner类中的partiton方法,计算出消息要发送到topic中某个分区partition。设置到ProducerRecord中。

检查ProducerRecord长度是否超过限制

根据配置项max.request.sizebuffer.memory进行检查,超出任何一项就会抛出异常。

设置ProducerRecord时间戳

如果ProducerRecord构建时已经指定时间戳,则用构建时指定的,否则用当前时间。

ProducerRecord放入缓冲区

ProducerRecord放入缓存区(RecordAccumulator维护)时,发往相同topic的相同partition的消息记录将会被捆绑batch压缩,压缩到ProducerBatch中。也就是说,ProducerBatch中可能包含多个ProducerRecord。这样做的目的是为了一次请求发送多个record,提高性能。

RecordAccumulator为每一个TopicPartition维护了一个双端队列:

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

相同topic的相同partition的ProducerBatch将被放在对应的队列中。

压缩策略有:

·NONE:就是不压缩。
·GZIP:压缩率为50%
·SNAPPY:压缩率为50%
·LZ4:压缩率为50%

唤醒Sender

当一个ProducerBatch已满或者有新的ProducerBatch到达时,会唤醒真正发送消息记录的发送线程Sender,将ProducerBatch发送到kafka集群。

Sender的发送逻辑如下:

  1. 检查kafka集群中是否存在要发送的ProducerBatch对应的leader partition,存在则认为可发送,不存在说明服务端出现了问题,则该Batch暂不发送。
  2. 过滤掉过期的ProducerBatch,对于过期的ProducerBatch,会通过Sensor通知Interceptor发送失败。
  3. 发送Batch。
  4. 处理发送结果,调用callback和拦截器的onAcknowledge进行处理。

小结

本文章对producer消息大体发送流程进行一次梳理,其中有一些自己还不是特别懂,也就没有写得特别详细,后续如果有进一步的了解,将修改本文进行补充。后面的文章将对发送过程中构建Producer时,自定义Inteceptor和自定义Partitioner进行介绍。

相关推荐