kafka 消费重试 实现
第一个文章
在分布式系统中,重试是不可避免的,我们经常使用后台跑定时进行数据同步,同步不成功就实现重试,重试次数多少取决于你追求一致性还是可用性,如果希望两个系统之前无论如何都必须一致,那么你设置重试次数为无限,当然这是理想情况,实际情况是有重试次数限制和重试时间限制,如果超过不成功怎么办?丢弃会造成数据丢失进而永久不一致,人工介入又非常复杂,通过引入死信队列可以优雅处理这种问题。本文是优步Uber工程师夏宁(Ning Xia)发布的一篇如何使用Kafka的死信队列实现重试处理的。
从网络错误到复制问题甚至下游依赖关系等场景中随时可能发生的中断,大规模运行的服务必须尽可能地优雅发现、识别并处理故障。
考虑到优步Uber的运维范围和效率,我们的系统发生故障时必须智能化地具有容错性和不妥协性。为了实现这一目标,我们决定使用开源分布式消息传递平台Apache Kafka,该平台已经过业界测试,并能提供大规模的高性能。
利用这些属性,优步行车保险工程团队通过扩展卡夫卡,在我们现有的事件驱动架构中使用无阻塞请求重新处理和死信队列(DLQ),实现错误处理的解耦,在不中断实时流量情况下实现可观察的错误处理。这一策略有助于我们遍布200多个城市的驾驶员能够可靠地实现每次行程的保费扣除。
在本文中,我们重点介绍了使用实时SLA重新处理大型系统中的请求并分享经验教训的方法。
在事件驱动的体系结构中工作
优步的驾驶员损伤保护系统的后端位于Kafka消息传递架构中,该架构贯穿优步大型微服务生态系统内的多个依赖关系的Java服务。本文我们更专注于我们的重试和死信的策略,并通过一个总的应用程序来管理不同产品的预订,以实现蓬勃发展的在线业务。
在这个模型中,我们希望提供以下功能:
a)能进行支付
b)为每个用户的每个产品的预购订单创建单独的报表记录,以生成实时产品分析。
每个功能都可以通过其各自服务的API提供。根据功能要求,设计了两个服务(消费组),一个是支付消费组完成a功能,一个是报表消费组完成b功能,这两个消费组都预订了相同的预订事件频道(也就是订阅了Kafka主题PreOrder):
当系统收到预订请求时,商店服务发布包含相关请求数据的PreOrder消息。两个消费组都会监听这个PreOrder消息,从而执行自己的业务逻辑并调用其相应的服务。
实施重试的简单快速的解决方案是在客户端调用呼叫时使用定时循环重试。例如,如果支付服务正在发生延迟等待并开始抛出超时异常,则商店服务将继续在指定重试次数下进行重试以完成支付),直到它成功或达到另一个停止条件为止。
简单的重试问题
虽然在客户端层次进行定时循环的重试可能很有用,但大量大规模的系统重试仍可能会受到以下因素影响:
1.阻止批处理。当我们需要实时处理大量消息时,反复重试产生的失败消息可能会阻塞正常的批处理。最严重的情况是超过重试时间限制,这意味会花最长时间,使用的资源会最多。如果没有成功的回应,卡夫卡消费者将会不断提交.
2.难以检索元数据。在重试上获取元数据会很麻烦,比如时间戳和第n次重试。
如果下游支付服务出现重大变化,例如,对于之前是有效的预购订单却遭遇收费策略调整导致拒绝接受,那么这些消息的所有重试都会无效。接收到该特定消息的消费者不会提交该消息的Kafka指针(偏移量),这意味着该消息将被一次又一次消费,代价是导致到达该通道的大量新消息被迫处于等待而无法被正常读取。
如果请求在重试后继续重试失败,我们希望在DLQ中收集这些故障以进行可视性查看和诊断。DLQ应允许以列表方式查看队列的内容,清除管理这些内容,并合并重新处理死信消息,允许全面解决所有受共享问题影响的故障。在优步,我们需要一个可靠并且可扩展地为我们提供这些功能的重试策略。
在单独的队列中处理
为了解决批处理被重试处理阻塞的问题,我们使用单独定义的Kafka主题,专门为重试设计单独队列。在这种情况下,当消费者处理程序在指定的消息重试次数之后会返回特定消息的失败响应,消费者将该消息发布到相应的重试主题中。该处理程序然后将true 返回给原始使用者,该使用者会确认提交了它的Kafka偏移量,从而保证Kafka消息能够持续向下读取。
在这种类型的系统中重试请求非常简单。与主处理流程一样,单独一组消费者将读取重试队列。这些消费者的行为与原始架构中的消费者行为类似,只是消费者使用不同的卡夫卡话题。同时,执行多次重试是通过创建多个主题来完成的,其中每一组不同的监听器(也就是消费组)订阅每个重试主题。当特定主题的处理程序返回给定消息的错误响应时,它会将该消息发布到它下面的下一个重试主题。
最后,在此设计中,DLQ被定义为最终的卡夫卡主题。如果最后一次重试主题的消费者仍然没有成功,那么它会将该消息发布到死信主题。在那里,可以使用许多技术来以主题方式进行数据列表,清除和合并,
重要的是不要一个接一个地立即重新尝试失败的请求; 这样做会放大调用的数量,实质上是等同于垃圾邮件的恶意请求。相反,每个后续级别的重试使用者都可以执行处理延迟,换句话说,随着消息在每个重试主题中逐步下降,超时会增加。此机制遵循漏桶模式。因此,这种重试队列其实是延迟处理队列。
我们通过基于队列的重新处理获得了什么
现在,我们讨论这种方法的好处,因为它涉及确保可靠和可扩展的重新处理:
1.不会都是正常批处理
失败的消息输入他们自己的指定通道,使正在进行批处理能够成功继续进行,而不是要求在出现故障时重新处理它们。因此,传入请求的消耗向前畅通无阻,实现更高的实时吞吐量。
2.解耦
独立工作流在同一个事件上运行,每个工作流都有自己的消费者流程,重试有单独的再处理和死信队列。一个队列中处理失败并不需要重试那些已经成功的其他消息。
3.可配置
创建新主题实际上不会产生开销,并且这些主题产生的消息可以遵循相同的架构。原始处理以及每个重试通道都可以分别在易于编写的较高级别的消费者级别下进行管理,该级别由配置进行管理。
我们还可以区分不同类型错误的处理方式,允许重新尝试网络脆弱等情况,而空指针异常和其他代码错误应该直接进入DLQ,因为重试不会修复它们。
4.观测
将消息处理分割成不同的主题有助于容易地跟踪错误消息的路径,重试消息的时间和次数以及其有效负载的确切属性。将生产率与再处理主题和DLQ的生产率相比较,可以为自动警报提供阈值并跟踪实时服务正常运行时间。
5.灵活性
虽然Kafka本身是用Scala和Java编写的,但Kafka支持多种语言的客户端库。例如,优步的许多服务都使用Go作为他们的Kafka客户端。
使用像Avro这样的序列化框架的Kafka消息格式支持可演化的模式。如果我们的数据模型如果需要更新,则只需要最小的调整来反映这一变化。
6.性能和可靠性
Kafka默认提供至少一次的语义。这种耐久性保证在容错和消息失败的情况下非常有价值; 当谈到提供关键业务数据时(如Uber的情况),消息无损(消息不丢失)是最重要的。而且,Kafka的并行模型和基于拉的系统可实现高吞吐量和低延迟。
其他考虑
由于Kafka只能保证分区内的顺序处理,而跨分区接受无法保证顺序,因此应用程序必须能够处理事件发生的确切顺序以外的事件。此外,至少一次消息传递需要消费者依赖性幂等性,这是任何分布式系统的共同特征。
前面阐述了死信队列提供的显著优势,但真正实施可能因用例而异。例如,根据指定的应用程序处理的数据类型的数量,每个主题代表不同的事件类型,这可能导致需要管理大量主题。在这种情况下,基于计数队列的替代方案可能是比较好的选择,将事件类型与其他字段一起打包,从而以更易于管理的方式跟踪重试次数和时间戳。这种权衡还需要重新考虑如何执行调度,因为这是通过一系列队列阶梯进行管理的。
使用基于计数的卡夫卡主题可实现死信队列,进行重试的单独的重新处理执行,使我们能够在基于事件的系统中重试请求,而不会阻止实时流量。在此框架内,工程师可以根据需要配置,扩展,更新和监控消息传递,但不会对开发人员时间或应用程序正常运行时间造成任何损失。
第二个文章
https://www.cnblogs.com/xsirfly/p/11533501.html
背景
在kafka的消费者中,如果消费某条消息出错,会导致该条消息不会被ack,该消息会被不断的重试,阻塞该分区的其他消息的消费,因此,为了保证消息队列不被阻塞,在出现异常的情况下,我们一般还是会ack该条消息,再另外对失败的情况进行重试
目标
实现一个完善的重试逻辑,一般需要考虑一下几个因素:
- 重试的时间间隔
- 最大重试次数
- 是否会漏掉消息
实现
扔回队尾
在消息出错时,将消息扔回队尾
优点:
- 实现简单,没有别的依赖项
缺点:
- 无法控制重试时间间隔
基于数据库任务表的扫描方案
在数据库中增加一个任务的状态表,然后用一个定时任务去扫描任务表中,失败的任务,然后进行重试,其中记录下重试的次数即可
优点:
- 实现简单,一般这种离线任务,根据统计的需求,都会有一个任务状态表的,所以仅仅是增加一个定时任务去扫表
缺点:
- 性能较差,定时任务,一般都在无意义的扫描,浪费性能
新增重试队列的方案
新增一个重试队列,消费消息出错时,将时间戳和消息发送到重试队列,然后在重试队列中,根据时间,来判断阻塞时间,代码如下:
func handleRetryEvent(ctx context.Context, conf *util.Conf, data []byte) (err error) { defer common.Recover(ctx, &err) log := common.Logger(ctx).WithField("Method", "consumer.handleRetryEvent") retryEvent := &MergeRetryEvent{} err = json.Unmarshal(data, retryEvent) if err != nil { log.WithError(err).Error("failed to unmarshal data") return nil } log.WithField("contact_id", retryEvent.ContactId).Info("receive message") delaySecond := (retryEvent.CreateTime + SLEEPSECOND) - time.Now().Unix() if delaySecond <= 0 { log.Info("send message to account merge event") err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId) return err } else { log.Infof("sleep %d seconds", delaySecond) time.Sleep(time.Duration(delaySecond) * time.Second) err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId) return err } }
优点:
相对于扫表的方案,改方案没有无意义的扫表操作,性能更好
注意:之前在网上看到一个重试队列的实现,因为害怕开过多的线程(协程),作者用了一个channel来缓存重试消息,然后在一个协程池中去消费消息,消费的逻辑和上面的实例代码差不多,这样做是有风险的,因为channel是在本机的内存中,没有本地存储的,是存在丢消息的风险的(服务重启等情况)
参考链接:
https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a
第三个文章
https://www.cnblogs.com/caoweixiong/p/11181587.html
消息处理问题
在从Kafka主题接收消息之后立即处理消息的消费者的实现非常简单。不幸的是,现实要复杂得多,并且由于各种原因,消息处理可能会失败。其中一些原因是永久性问题,例如数据库约束失败或消息格式无效。其他,如消息处理中涉及的依赖系统的临时不可用,可以在将来解决。在这些情况下,重试消息处理可能是一种有效的解决方案。
非阻塞重试逻辑
在像Kafka这样的流媒体系统中,我们不能跳过消息并在以后回复它们。一旦我们移动当前消息中指向Kafka的指针,我们就无法返回。为简单起见,我们假设消息偏移在成功的消息处理之后就被记住了。在这种情况下,除非我们成功处理当前消息,否则我们无法接收下一条消息。如果处理单个消息不断失败,则会阻止系统处理下一条消息。很明显,我们希望避免这种情况,因为通常一次消息处理失败并不意味着下一次消息处理失败。此外,在较长时间(例如一小时)之后,由于各种原因,失败消息的处理可能成功。对他们来说,我们所依赖的系统可以再次出现。
在消息处理失败时,我们可以将消息的副本发布到另一个主题并等待下一条消息。让我们将新主题称为‘retry_topic‘。‘retry_topic‘的消费者将从Kafka接收消息,然后在开始消息处理之前等待一些预定义的时间,例如一小时。通过这种方式,我们可以推迟下一次消息处理尝试,而不会对‘main_topic‘消费者产生任何影响。如果‘retry_topic‘消费者中的处理失败,我们只需放弃并将消息存储在‘failed_topic‘中,以便进一步手动处理此问题。
业务重试场景
现在让我们考虑以下场景。一条新消息被写入主题‘main_topic‘。如果此消息的处理失败,那么我们应该在5分钟内再次尝试。我们怎么做?我们应该向‘retry_topic‘写一条新消息,它包装失败的消息并添加2个字段:
- ‘retry_number‘,值为1
- ‘retry_timestamp‘,其值计算为现在+ 5分钟
这意味着‘main_topic‘使用者将失败的消息处理的责任委托给另一个组件。‘main_topic‘消费者未被阻止,可以接收下一条消息。‘retry_topic‘消费者将立即收到‘main_topic‘消费者发布的消息。它必须从消息中读取‘retry_timestamp‘值并等到那一刻,阻塞线程。线程唤醒后,它将再次尝试处理该消息。如果成功,那么我们可以获取下一个可用消息。否则我们必须再次尝试。我们要做的是克隆消息,递增‘attempt_number‘值(它将为2)并将‘retry_timestamp‘值设置为now + 5分钟。消息克隆将再次发布到‘retry__topic。
如果我们到达重试最高次数。现在是时候说“停止”了。我们将消息写入‘failed_topic‘并将此消息视为未处理。有人必须手动处理它。
下面的图片可以帮助您理解消息流:
总结
正如您所注意到的,在发生某些故障时实施推迟消息处理并不是一件容易的事情。请记住:
- 可以仅按顺序从主题分区中使用消息
- 您不能跳过消费并稍后再处理此消息
- 如果要推迟处理某些消息,可以将它们重新发布到单独的主题,每个延迟值一个
- 处理失败的消息可以通过克隆消息并将其重新发布到重试主题之一来实现,其中包含有关尝试次数和下次重试时间戳的更新信息
- 除非是时候处理消息,否则重试主题的消费者应该阻止该线程
- 重试主题中的消息按时间顺序自然组织,必须按顺序处理