使用多线程增加kafka消费能力
前提:本例适合那些没有顺序要求的消息主题。
kafka通过一系列优化,写入和读取速度能够达到数万条/秒。通过增加分区数量,能够通过部署多个消费者增加并行消费能力。但还是有很多情况下,某些业务的执行速度实在是太慢,这个时候我们就要用到多线程去消费,提高应用机器的利用率,而不是一味的给kafka增加压力。
使用Spring创建一个kafka消费者是非常简单的。我们选择的方式是继承kafka的ShutdownableThread
,然后实现它的doWork
方法即可。
参考:https://github.com/apache/kaf...
多线程消费某个分区的数据
即然是使用多线程,我们就需要新建一个线程池。
我们创建了一个最大容量为20的线程池,其中有两个参数需要注意一下。(参考《JAVA多线程使用场景和注意事项简版》)。
我们使用了了零容量的SynchronousQueue
,一进一出,避免队列里缓冲数据,这样在系统异常关闭时,就能排除因为阻塞队列丢消息的可能。
然后使用了CallerRunsPolicy
饱和策略,使得多线程处理不过来的时候,能够阻塞在kafka的消费线程上。
然后,我们将真正处理业务的逻辑放在任务中多线程执行,每次执行完毕,我们都手工的commit一次ack
,表明这条消息我已经处理了。由于是线程池认领了这些任务,顺序性是无法保证的,可能有些任务没有执行完毕,后面的任务就已经把它的offset给提交了。o.O
不过这暂时不重要,首先让它并行化运行就好。
可惜的是,当我们运行程序,直接抛出了异常,无法进行下去。
程序直接说了:
KafkaConsumer is not safe for multi-threaded access
显然,kafka的消费端不是线程安全的,它拒绝你这么调用它的api。kafka的初衷是好的,想要避免一些并发环境的问题,但我确实需要使用多线程处理。
kafka消费者通过比较调用者的线程id来判断是否是由外部线程发起请求。
long threadId = Thread.currentThread().getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet();
}
得,只能将commitSync
函数放在线程外面了,先提交ack、再执行任务。
加入管道
我们获取的消息,可能在真正被执行之前,会进行一些过滤,比如一些空值或者特定条件的判断。虽然可以直接放在消费者线程里运行,但显的特别的乱,可以加入一个生产者消费者模型(你可以认为这是画蛇添足)。这里采用的是阻塞队列依然是SynchronousQueue
,它充当了管道的功能。
我们把任务放入管道后,立马commit。如果线程池已经满了,将一直阻塞在消费者线程里,直到有空缺。然后,我们单独启动了一个线程,用来接收这些数据,然后提交到这部分的代码看起来大概这样。
应用能够启动了,消费速度贼快。
参数配置
kafka的参数非常的多,我们比较关心的有以下几个参数。
max.poll.records
调用一次poll,返回的最大条数。这个值设置的大,那么处理的就慢,很容易超出max.poll.interval.ms
的值(默认5分钟),造成消费者的离线。在耗时非常大的消费中,是需要特别注意的。
enable.auto.commit
是否开启自动提交(offset)如果开启,consumer已经消费的offset信息将会间歇性的提交到kafka中(持久保存)
当开启offset自动提交时,提交请求的时间频率由参数`
auto.commit.interval.ms`控制。
fetch.max.wait.ms
如果broker端反馈的数据量不足时(fetch.min.bytes),fetch请求等待的最长时间。如果数据量满足需要,则立即返回。
session.timeout.ms
consumer会话超时时长,如果在此时间内,server尚未接收到consumer任何请求(包括心跳检测),那么server将会判定此consumer离线。此值越大,server等待consumer失效、rebalance时间就越长。
heartbeat.interval.ms
consumer协调器与kafka集群之间,心跳检测的时间间隔。kafka集群通过心跳判断consumer会话的活性,以判断consumer是否在线,如果离线则会把此consumer注册的partition分配(assign)给相同group的其他consumer。此值必须小于“session.timeout.ms”,即会话过期时间应该比心跳检测间隔要大,通常为session.timeout.ms的三分之一,否则心跳检测就失去意义。
在本例中,我们的参数简单的设置如下,主要调整了每次获取的条数和检测时间。其他的都是默认。
消息保证
仔细的同学可能会看到,我们的代码依然不是完全安全的。这是由于我们提前提交了ack导致的。程序正常运行下,这无伤大雅。但在应用异常关闭的时候,那些正在执行中的消息,很可能会丢失,对于一致性要求非常高的应用,我们要从两个手段上进行保证。
使用关闭钩子
第一种就是考虑kill -15的情况。这种方式比较简单,只要覆盖ShutdownableThread的shutdown方法即可,应用将有机会执行线程池中的任务,确保消费完毕再关闭应用。
@Override public void shutdown() { super.shutdown(); executor.shutdown(); }
使用日志处理
应用oom,或者直接kill -9了,事情就变得麻烦起来。
维护一个单独的日志文件(或者本地db),在commit之前写入一条日志,然后在真正执行完毕之后写入一条对应的日志。当系统启动时,读取这些日志文件,获取没有执行成功的任务,重新执行。
想要效率,还想要可靠,是得下点苦力气的。
借助redis处理
这种方式与日志方式类似,但由于redis的效率很高(可达数万),而且方便,是优于日志方式的。
可以使用Hash结构,提交任务的同时写入Redis,任务执行完毕删掉这个值,那么剩下的就是出现问题的消息。
在系统启动时,首先检测一下redis中是否有异常数据。如果有,首先处理这些数据,然后正常消费。
End
多线程是为了增加效率,redis等是为了增加可靠性。业务代码是非常好编写的,搞懂了逻辑就搞定了大部分;业务代码有时候又是困难的,你要编写大量辅助功能增加它的效率、照顾它的边界。
以程序员的角度来说,最有竞争力的代码都是为了照顾小概率发生的边界异常。
kafka在吞吐量和可靠性方面,有各种的权衡,很多都是鱼和熊掌的关系。不必纠结于它本身,我们可以借助外部的工具,获取更大的收益。在这种情况下,redis当机与应用同时当机的概率还是比较小的。5个9的消息保证是可以做到的,剩下的那点不完美问题消息,你为什么不从日志里找呢?
扩展阅读:
3、360度测试:KAFKA会丢数据么?其高可用是否满足需求?