高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用
在消息队列中,当消费者去消费消息的时候,无论是通过 pull 的方式还是 push 的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果:
上图中红色的部分代表超出消息处理能力的部分。
我们可以看到消息突刺往往都是瞬时的、不规律的,其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理,这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息,这时候我们就需要一个能够控制消费端消息匀速处理的利器 — AHAS 流控降级,来为消息队列削峰填谷,保驾护航。
https://www.aliyun.com/product/ahas
AHAS 是如何削峰填谷的
AHAS 的流控降级是面向分布式服务架构的专业流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统保护等多个维度来帮助您保障服务的稳定性,同时提供强大的聚合监控和历史监控查询功能。
AHAS 专门为这种场景提供了匀速排队的控制特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。
https://help.aliyun.com/document_detail
比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间,预计排队时长超过超时时间的处理任务将会直接被拒绝。示意图如下图所示:
RocketMQ Consumer 接入示例
本部分将引导您快速在 RocketMQ 消费端接入 AHAS 流控降级 Sentinel。
1. 开通 AHAS
首先您需要到AHAS 控制台开通 AHAS 功能(免费)。可以根据 开通 AHAS 文档 里面的指引进行开通。
https://ahas.console.aliyun.com
https://help.aliyun.com/document_detail/90323.html
2. 代码改造
在结合阿里云 RocketMQ Client 使用 Sentinel 时,用户需要引入 AHAS Sentinel 的依赖 ahas-sentinel-client (以 Maven 为例):
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>ahas-sentinel-client</artifactId> <version>1.1.0</version> </dependency>
由于 RocketMQ Client 未提供相应拦截机制,而且每次收到都可能是批量的消息,因此用户在处理消息时需要手动进行资源定义(埋点)。我们可以在处理消息的逻辑处手动进行埋点,资源名可以根据需要来确定(如 groupId + topic 的组合):
private static Action handleMessage(Message message, String groupId, String topic) { Entry entry = null; try { // 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则 entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic); // 在此处编写真实的处理逻辑 System.out.println(System.currentTimeMillis() + " | handling message: " + message); return Action.CommitMessage; } catch (BlockException ex) { // 在编写处理被流控的逻辑 // 示例:可以在此处记录错误或进行重试 System.err.println("Blocked, will retry later: " + message); return Action.ReconsumeLater; // 会触发消息重新投递 } finally { if (entry != null) { entry.exit(); } } }
消费者订阅消息的逻辑示例:
Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", (message, context) -> { return handleMessage(message); }); consumer.start();
更多关于 RocketMQ SDK 的信息可以参考 消息队列 RocketMQ 入门文档。
https://help.aliyun.com/document_detail
3. 获取 AHAS 启动参数
注意:若在本地运行接入 AHAS Sentinel 控制台需要在页面左上角选择 公网 环境,若在阿里云 ECS 环境则在页面左上角选择对应的 Region 环境。
我们可以进入 AHAS 控制台(https://ahas.console.aliyun.com),点击左侧侧边栏的 流控降级,进入 AHAS 流控降级控制台应用总览页面。在页面右上角,单击添加应用,选择 SDK 接入页签,到 配置启动参数 页签拿到需要的启动参数(详情请参考 SDK 接入文档),类似于:
SDK 接入文档:https://help.aliyun.com/document_detail/90328.html
-Dproject.name=AppName -Dahas.license=<License>
其中 project.name 配置项代表应用名(会显示在控制台,比如 MqConsumerDemo),ahas.license 配置项代表自己的授权 license(ECS 环境不需要此项)。
4. 启动 Consumer,配置规则
接下来我们添加获取到的启动参数,启动修改好的 Consumer 应用。由于 AHAS 流控降级需要进行资源调用才能触发初始化,因此首先需要向对应 group/topic 发送一条消息触发初始化。消费端接收到消息后,我们就可以在 AHAS Sentinel 控制台上看到我们的应用了。点击应用卡片,进入详情页面后点击左侧侧边栏的“机器列表”。我们可以在机器列表页面看到刚刚接入的机器,代表接入成功:
点击“请求链路”页面,我们可以看到之前定义的资源。点击右边的“流控”按钮添加新的流控规则:
我们在“流控方式”中选择“排队等待”,设置 QPS 为 10,代表每 100ms 匀速通过一个请求;并且设置最大超时时长为 2000ms,超出此超时时间的请求将不会排队,立即拒绝。配置完成后点击新建按钮。
5. 发送消息,查看效果
下面我们可以在 Producer 端批量发送消息,然后在 Consumer 端的控制台输出处观察效果。可以看到消息消费的速率是匀速的,大约每 100 ms 消费一条消息:
1550732955137 | handling message: Hello MQ 2453 1550732955236 | handling message: Hello MQ 9162 1550732955338 | handling message: Hello MQ 4944 1550732955438 | handling message: Hello MQ 5582 1550732955538 | handling message: Hello MQ 4493 1550732955637 | handling message: Hello MQ 3036 1550732955738 | handling message: Hello MQ 1381 1550732955834 | handling message: Hello MQ 1450 1550732955937 | handling message: Hello MQ 5871
同时不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。注意在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息。
我们也可以点击左侧侧边栏的“监控详情”进入监控详情页面,查看处理消息的监控曲线:
对比普通限流模式的监控曲线(最右面的部分):
如果不开启匀速模式,只是普通的限流模式,则只会同时处理 10 条消息,其余的全部被拒绝,即使后面的时间系统资源充足多余的请求也无法被处理,因而浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力得到了更好的利用。
Kafka 接入代码示例
Kafka 消费端接入 AHAS 流控降级的思路与上面的 RocketMQ 类似,这里给出一个简单的代码示例:
private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) { pool.submit(() -> { Entry entry = null; try { // 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则 entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic); // 在此处理消息. System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString()); } catch (BlockException ex) { // Blocked. // NOTE: 在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息 System.err.println("Blocked: " + record.toString()); } finally { if (entry != null) { entry.exit(); } } }); }
消费消息的逻辑:
while (true) { try { ConsumerRecords<String, String> records = consumer.poll(1000); // 必须在下次 poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG // 建议开一个单独的线程池来消费消息,然后异步返回结果 for (ConsumerRecord<String, String> record : records) { handleMessage(record, groupId, topic); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } }
其它
以上介绍的只是 AHAS 流控降级的其中一个场景 —— 请求匀速,它还可以处理更复杂的各种情况,比如:
- 流量控制:可以针对不同的调用关系,以不同的运行指标(如 QPS、线程数、系统负载等)为基准,对资源调用进行流量控制,将随机的请求调整成合适的形状(请求匀速、Warm Up 等)。
- 熔断降级:当调用链路中某个资源出现不稳定的情况,如平均 RT 增高、异常比例升高的时候,会使对此资源的调用请求快速失败,避免影响其它的资源导致级联失败。
- 系统负载保护:对系统的维度提供保护。当系统负载较高的时候,提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
您可以参考 AHAS 流控降级文档 来挖掘更多的场景。
https://help.aliyun.com/document_detail
作者:中间件小哥