第一次的Kafka实践

工作也有三年了,也是因为比较懒惰和自身水平的原因,之前对一些没用过的技术进行实践后总没有过什么过多的总结。这次也是因为在工作之中遇到的一些问题需要使用到Kafka,过程中也遇到并解决了一些问题。在阶段性完成工作后,以为同事提及是否有什么总结和经验可以分享,使我突然意识到需要总结一下自己遇到问题与技术,从而更好地去认识这陌生的01世界。

一、kafka简介

Kafka是一个分布式的消息队列,有所耳闻的基本都了解它是个什么样的“人物”。简要的介绍一下我对它的一些认识。

  1. 基于Scala和Java编写,利用JVM运行;
  2. 队列在Kafka中称为Topic,每个Topic可定义replication(副本)和partition(分片)信息;
  3. Consumer(消费者)有Group(组)概念,各组之间的消费情况互不影响;

二、Kafka服务安装

了解了Kafka一些很基础的概念后,还是需要动手去体验它是如何操作的。下面是在CentOS7的一些安装过程。

1. 服务文件下载

可在Kafka的官网http://kafka.apache.org/下载最新的资源文件

2. 修改配置

目前仅使用单机版本的Kafka来提供服务,对于文件解压等基础操作不在阐述。主要需要修改的配置文件有:server.properties,配置文件kafka目录下的config文件夹下。

修改内容如下:
#    listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers.
advertised.listeners=PLAINTEXT://:9092
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

简单的说明

  • listeners 配置服务监控的socket的地址
  • advertised.listeners 配置节点通知producer和consumer的hostname和port
  • log.dirs 配置消息数据存储的目录

3.启动服务

这里我使用的是单机模式,并不是集群的运行方式。Kafka使用的是1.1.0的版本,内部自带了Zookeeper那么就不需要额外的下载Zookeeper了。

1)首先启动Zookeeper服务

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log &

2)启动Kafka服务

nohup ./bin/kafka-server-start.sh config/server.properties > kafka.log &

简单提一下,nohup和&是为了让Zookeeper和Kafka的服务能够在后台运行,并将执行命令输出的日志分别记录到对应文件中去;

4.队列创建与查看

我们使用kafka-topics.sh这个脚本来进行一系列关于Topics(队列)的定义等操作
1)建立一个队列

./bin/kafka-topics.sh --create --zookeeper 192.168.3.203:2181 --topic test --replication-factor 1 --partitions 2
### 进行一些说明
## --create 指定命令为创建
## --zookeeper 指定Zookeeper的服务
## --topic 指定队列名称
## --replication-factor 指定队列的副本数,这个副本数需要和集群的节点数相对应
## --partitions 指定队列的分片数量,适当的分片可以提升性能

2)查看队列列表

./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --list
### 说明
## --list list命令指定列出当前的topic列表

3)查看指定队列

./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --topic test --describe
### 说明
## --topic 指定需要查看的队列名
## --describe 该命令用于描述topic队应的基础信息

4)删除队列

./bin/kafka-topics.sh --zookeeper 192.168.3.203:2181 --topic test --delete
### 说明
## --topic 指定需要删除的队列名
## --delete 该命令用于删除队列

其他的一些使用方式,可以查看kafka-topics的帮助说明;

5.消费者信息查看

我们使用kafka-consumer-groups.sh这个脚本来了解关于consumer的一些基础信息;
1)查看消费者组列表

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --list
### 说明
## --bootstrap-server 指定Kafka的服务地址 
## --list 列出所有消费者

2)查看消费者组信息

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --group test --describe
### 说明
## --group 指定消费者组组名
## --describe 指定为描述命令

3)删除消费者组

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.3.203:9092 --group test --delete
### 说明
## --delete 指定为删除命令

三、Kafka客户端实践

1.Producer数据发送

在Kafka的官方API中Producer有如下的使用例子

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

    producer.close();

还有如下针对,发送数据时使用事务去提交的例子:

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

    producer.initTransactions();

    try {
        producer.beginTransaction();
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        producer.commitTransaction();
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // We can't recover from these exceptions, so our only option is to close the producer and exit.
        producer.close();
    } catch (KafkaException e) {
        // For all other exceptions, just abort the transaction and try again.
        producer.abortTransaction();
    }
    producer.close();

在个人使用中出现过如下的两个问题,希望大家也能规避;

1.频繁发送数据导致的connection创建过多

  每次新建一个Producer时,会利用网络去建立一个连接。如果使用线程频繁地去创建Producer的话,在Client端会出现java.net.BindException: Address already in use: JVM_Bind的异常。
  这是由于在Producer与Kafka Server建立连接时,需要在客户端分配一个端口,每个机器的端口数量有限,当被耗尽时,变无法在与Kafka Server建立连接了。

2.使用事务发送数据的事务ID不能重复

  当producer使用事务进行数据发送时,事务结束之后,需要开启一个新的事务进行数据提交。这个时候的事务ID不能重复,需要重新设置一个transactional.id,否则进行数据提交时将会出现异常。

2.Consumer数据处理

同样Consumer在官方的API中也有使用说明,大致使用如下:

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }
        if (buffer.size() >= minBatchSize) {
            insertIntoDb(buffer);
            consumer.commitSync();
            buffer.clear();
        }
    }

在个人使用中也遇到一些问题,拿出来与大家分享一下;

1.offset设置,获取数据

  offset对应的是一个消费组的偏移量,按照如上的Consumer的代码获取使用脚本一个新创建Topic的数据,将获取不到任何数据。无论使用Producer往Topic中发送了对多数据,我都没有办法取到。
  这使我有点困惑,起初我使用命令进行数据拉取后,改代码可以拉取到数据,命令如下:

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.203:9092 --from-beginning --topic test

于是,通过查看kafka-console.consumer.sh,了解到它在Kafka包中对用的Scala代码中,发现参数--from-beginning对应设置了一个配置,"auto.offset.reset"为"earliest"。
auto.offset.reset参数说明如下:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

设置配置后,便可以在未初始化时获取到数据了。

2.查看队列的数量的方法

  当我们使用到队列作为中间件缓存消息时,我们常常会关心当前队列中剩余的数据量。这便于我们去判断当前我们的数据处理程序的响应能力,以便于调整部署情况。
  当前基于我的了解,Kafka没有提供直接的方法返回队列中的数据剩余量,需要我们手工的去计算。我是这样做的:

List<PartitionInfo> partInfos = consumer.partitionsFor(topic);
     List<TopicPartition> partitions = new ArrayList<TopicPartition>();
     for (PartitionInfo info : partInfos) {
           partitions.add(new TopicPartition(topic, info.partition()));
     }
            
     Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
     for (TopicPartition topicPartitionKey : endOffsets.keySet()) {
         count += (endOffsets.get(topicPartitionKey) - consumer.position(topicPartitionKey));
     }
     return count;

遍历消费者在Topic中的每一个分片,在每个分片中对offset - position的差值做累加得到最终的未处理数量。这样做只能获取一个大概的计算值,不是十分准确。因为在Producer使用transition发送数据时,完成一个事务后offset会加1去做记录,而且可能还存在一些未知的情况。
  当然这个方法是一种很笨的方法,或许你有更好的方式呢,欢迎交流。

最后,我目前使用的Kafka的经验也总结完,Kafka内部设计很复杂也很有趣,需要进一步地探索,学习总会有回报的。

相关推荐