Kafka 生产者 自定义序列化

Kafka在生产者中序列化为二进制对象推送给Broker,下面是一个自定义序列化的示例,序列化一个User对象;

首先,引入jackson-mapper-asl

<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.12</version>
</dependency>

然后定义需要被序列化的实体类:

package cn.org.fubin;

public class User {
    private String firstName;
    private String lastName;
    private int age;
    private String address;

    public User() {
    }

    public User(String firstName, String lastName, int age, String address) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
        this.address = address;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "User{" +
                "firstName=‘" + firstName + ‘\‘‘ +
                ", lastName=‘" + lastName + ‘\‘‘ +
                ", age=" + age +
                ", address=‘" + address + ‘\‘‘ +
                ‘}‘;
    }
}

接下来,创建序列化类,实现Kafka客户端提供的Serializer接口:

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Map;

public class UserSerializer implements Serializer {

    private ObjectMapper objectMapper;

    public void configure(Map configs, boolean isKey) {
        objectMapper = new ObjectMapper();
    }

    public byte[] serialize(String topic, Object data) {
        byte[] ret = null;
        try {
            ret = objectMapper.writeValueAsString(data).getBytes("utf-8");
        } catch (IOException e) {
            System.out.println("序列化失败");
            e.printStackTrace();
        }
        return ret;
    }

    public void close() {

    }
}

Kafka默认提供如下实现:

Kafka 生产者 自定义序列化

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.RetriableException;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 *
 * 可重试异常
 * 1. 分区副本不可用
 * 2. Controller当前不可用
 * 3. 网络瞬时故障
 *
 * 可自行恢复,超过重试次数也需要自行处理
 *
 *
 * 不可重试异常
 * 1. 发送消息尺寸过大
 * 2. 序列化失败异常
 * 3. 其他类型异常
 *
 *
 */

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("value.serializer", "cn.org.fubin.UserSerializer");

        properties.put("acks", "-1");
        System.out.println(ProducerConfig.ACKS_CONFIG);
        properties.put("retries", "3");
        properties.put("batch.size", 1048576);
        properties.put("linger.ms", 10);
        properties.put("buffer.memory", "33554432");
        System.out.println(ProducerConfig.BUFFER_MEMORY_CONFIG);
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
        properties.put("max.block.ms", "3000");

        String topic = "test-topic";
        Producer<String,User> producer = new KafkaProducer<String, User>(properties);

        User user = new User("a","b",23,"china");
        ProducerRecord<String ,User> record = new ProducerRecord<String, User>(topic,user);
        producer.send(record).get();
        producer.close();

    }

}

 然后在主类中指定声明好的序列化类,并发送一个User实体:

相关推荐