高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

在消息队列中,当消费者去消费消息的时候,无论是通过 pull 的方式还是 push 的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果:

高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

上图中红色的部分代表超出消息处理能力的部分。

我们可以看到消息突刺往往都是瞬时的、不规律的,其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理,这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息,这时候我们就需要一个能够控制消费端消息匀速处理的利器 — AHAS 流控降级,来为消息队列削峰填谷,保驾护航。

https://www.aliyun.com/product/ahas

AHAS 是如何削峰填谷的

AHAS 的流控降级是面向分布式服务架构的专业流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统保护等多个维度来帮助您保障服务的稳定性,同时提供强大的聚合监控和历史监控查询功能。

AHAS 专门为这种场景提供了匀速排队的控制特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。

https://help.aliyun.com/document_detail

比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间,预计排队时长超过超时时间的处理任务将会直接被拒绝。示意图如下图所示:

高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

RocketMQ Consumer 接入示例

本部分将引导您快速在 RocketMQ 消费端接入 AHAS 流控降级 Sentinel。

1. 开通 AHAS

首先您需要到AHAS 控制台开通 AHAS 功能(免费)。可以根据 开通 AHAS 文档 里面的指引进行开通。

https://ahas.console.aliyun.com

https://help.aliyun.com/document_detail/90323.html

2. 代码改造

在结合阿里云 RocketMQ Client 使用 Sentinel 时,用户需要引入 AHAS Sentinel 的依赖 ahas-sentinel-client (以 Maven 为例):

<dependency>
 <groupId>com.alibaba.csp</groupId>
 <artifactId>ahas-sentinel-client</artifactId>
 <version>1.1.0</version>
</dependency>

由于 RocketMQ Client 未提供相应拦截机制,而且每次收到都可能是批量的消息,因此用户在处理消息时需要手动进行资源定义(埋点)。我们可以在处理消息的逻辑处手动进行埋点,资源名可以根据需要来确定(如 groupId + topic 的组合):

private static Action handleMessage(Message message, String groupId, String topic) {
 Entry entry = null;
 try {
 // 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则
 entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic);
 
 // 在此处编写真实的处理逻辑
 System.out.println(System.currentTimeMillis() + " | handling message: " + message);
 return Action.CommitMessage;
 } catch (BlockException ex) {
 // 在编写处理被流控的逻辑
 // 示例:可以在此处记录错误或进行重试
 System.err.println("Blocked, will retry later: " + message);
 return Action.ReconsumeLater; // 会触发消息重新投递
 } finally {
 if (entry != null) {
 entry.exit();
 }
 }
 }

消费者订阅消息的逻辑示例:

Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, "*", (message, context) -> {
 return handleMessage(message);
});
consumer.start();

更多关于 RocketMQ SDK 的信息可以参考 消息队列 RocketMQ 入门文档

https://help.aliyun.com/document_detail

3. 获取 AHAS 启动参数

注意:若在本地运行接入 AHAS Sentinel 控制台需要在页面左上角选择 公网 环境,若在阿里云 ECS 环境则在页面左上角选择对应的 Region 环境。

我们可以进入 AHAS 控制台(https://ahas.console.aliyun.com),点击左侧侧边栏的 流控降级,进入 AHAS 流控降级控制台应用总览页面。在页面右上角,单击添加应用,选择 SDK 接入页签,到 配置启动参数 页签拿到需要的启动参数(详情请参考 SDK 接入文档),类似于:

SDK 接入文档:https://help.aliyun.com/document_detail/90328.html

-Dproject.name=AppName -Dahas.license=<License>

其中 project.name 配置项代表应用名(会显示在控制台,比如 MqConsumerDemo),ahas.license 配置项代表自己的授权 license(ECS 环境不需要此项)。

4. 启动 Consumer,配置规则

接下来我们添加获取到的启动参数,启动修改好的 Consumer 应用。由于 AHAS 流控降级需要进行资源调用才能触发初始化,因此首先需要向对应 group/topic 发送一条消息触发初始化。消费端接收到消息后,我们就可以在 AHAS Sentinel 控制台上看到我们的应用了。点击应用卡片,进入详情页面后点击左侧侧边栏的“机器列表”。我们可以在机器列表页面看到刚刚接入的机器,代表接入成功:

高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

点击“请求链路”页面,我们可以看到之前定义的资源。点击右边的“流控”按钮添加新的流控规则:

高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

我们在“流控方式”中选择“排队等待”,设置 QPS 为 10,代表每 100ms 匀速通过一个请求;并且设置最大超时时长为 2000ms,超出此超时时间的请求将不会排队,立即拒绝。配置完成后点击新建按钮。

5. 发送消息,查看效果

下面我们可以在 Producer 端批量发送消息,然后在 Consumer 端的控制台输出处观察效果。可以看到消息消费的速率是匀速的,大约每 100 ms 消费一条消息:

1550732955137 | handling message: Hello MQ 2453
1550732955236 | handling message: Hello MQ 9162
1550732955338 | handling message: Hello MQ 4944
1550732955438 | handling message: Hello MQ 5582
1550732955538 | handling message: Hello MQ 4493
1550732955637 | handling message: Hello MQ 3036
1550732955738 | handling message: Hello MQ 1381
1550732955834 | handling message: Hello MQ 1450
1550732955937 | handling message: Hello MQ 5871

同时不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。注意在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息。

我们也可以点击左侧侧边栏的“监控详情”进入监控详情页面,查看处理消息的监控曲线:

高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

对比普通限流模式的监控曲线(最右面的部分):

高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

如果不开启匀速模式,只是普通的限流模式,则只会同时处理 10 条消息,其余的全部被拒绝,即使后面的时间系统资源充足多余的请求也无法被处理,因而浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力得到了更好的利用。

Kafka 接入代码示例

Kafka 消费端接入 AHAS 流控降级的思路与上面的 RocketMQ 类似,这里给出一个简单的代码示例:

private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
 pool.submit(() -> {
 Entry entry = null;
 try {
 // 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则
 entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);
 // 在此处理消息.
 System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
 } catch (BlockException ex) {
 // Blocked.
 // NOTE: 在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息
 System.err.println("Blocked: " + record.toString());
 } finally {
 if (entry != null) {
 entry.exit();
 }
 }
 });
}

消费消息的逻辑:

while (true) {
 try {
 ConsumerRecords<String, String> records = consumer.poll(1000);
 // 必须在下次 poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG
 // 建议开一个单独的线程池来消费消息,然后异步返回结果
 for (ConsumerRecord<String, String> record : records) {
 handleMessage(record, groupId, topic);
 }
 } catch (Exception e) {
 try {
 Thread.sleep(1000);
 } catch (Throwable ignore) {
 }
 e.printStackTrace();
 }
}

其它

以上介绍的只是 AHAS 流控降级的其中一个场景 —— 请求匀速,它还可以处理更复杂的各种情况,比如:

  • 流量控制:可以针对不同的调用关系,以不同的运行指标(如 QPS、线程数、系统负载等)为基准,对资源调用进行流量控制,将随机的请求调整成合适的形状(请求匀速、Warm Up 等)。
  • 熔断降级:当调用链路中某个资源出现不稳定的情况,如平均 RT 增高、异常比例升高的时候,会使对此资源的调用请求快速失败,避免影响其它的资源导致级联失败。
  • 系统负载保护:对系统的维度提供保护。当系统负载较高的时候,提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。

您可以参考 AHAS 流控降级文档 来挖掘更多的场景。

https://help.aliyun.com/document_detail

作者:中间件小哥

相关推荐