RocketMq-半消息(十)
概念:
半消息: 在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知rocketmq发送
操作步骤 :
1.(生产者)发送-【半消息】
2.(生产者)本地监听-【半消息】处理结果
3.(消费者)处理-【半消息】
1.(生产者)发送-【半消息】
// 消息体 @Data @Builder @ToString public class UserMoneyParams { int userId; String act; double money; String info; String infoParams; } // 发送消息 // 发送-队列半消息: rocketMQ @RequestMapping("rocketMQHalf") public ApiResult rocketMQHalf() { int orderId = 2; double money = 10; // 用户余额变更-参数体 UserMoneyParams userMoneyParams = UserMoneyParams.builder() .act("pay-order") .userId(orderId) .money(money) .build(); // 用户数据变更-参数 UserOrder userOrder = this.userOrderMapper.selectByPrimaryKey(1); log.info("发送前参数: "+userMoneyParams.toString()); rocketMQTemplate.sendMessageInTransaction( // 半消息-分组 "tsca-group-half", // 半消息-topic "member-change-money-half-topic", // 半消息-数据体 MessageBuilder .withPayload(userMoneyParams) .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID()) .build(), userOrder ); return ApiResult.success("发送队列-半消息"); }
2.(生产者)本地监听-【半消息】处理结果
@RocketMQTransactionListener(txProducerGroup = "tsca-group-half") @RequiredArgsConstructor @Slf4j public class UserMoneyHalfListener implements RocketMQLocalTransactionListener { @Autowired RedisUtil redisUtil; @Autowired UserOrderService userOrderService; // 生产者-消息处理完毕,继续执行本地方法(含事务) @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { try { Object userMoneyParams=message.getPayload(); log.info("消息-args:"+arg); // 消息主体加密无法获取 log.info("消息-主体:"+ JSON.toJSONString(userMoneyParams)); log.info("消息-主体-头部:"+message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID)); log.info("半消息-本地-处理完成"); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.warn("半消息-本地-发生异常,回滚: "+e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } } // 生产者-消息处理超时 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { // 查询消息是否已经处理 String messageID = String.valueOf(message.getHeaders().get("tsca-half-message-id")); Object messageData = this.redisUtil.getValue(messageID, String.class); if (messageData != null && messageData.equals("ok")) { // 超时且消息已经处理完毕 log.info("半消息-本地消息超时-且已经处理完毕"); return RocketMQLocalTransactionState.COMMIT; } else { log.info("半消息-本地消息超时-且未处理完毕"); // 超时且消息未处理完毕 return RocketMQLocalTransactionState.ROLLBACK; } } }
3.(消费者)处理-【半消息】
@Service @RocketMQMessageListener(consumerGroup = "tsca-group-half", topic = "member-change-money-half-topic") @Slf4j public class UserMoneyHalfListener implements RocketMQListener<UserMoneyParams> { // @Autowired // UserMoneyService memberOrderService; @Override public void onMessage(UserMoneyParams memberMoneyMessage) { log.info("收到-用户余额变动-半消息"); try { } catch (Exception e) { log.info("更改余额错误: "+e.getMessage()); e.printStackTrace(); } log.info(JSON.toJSONString(memberMoneyMessage)); } }
相关推荐
LCFlxfldy 2020-08-17
IT农场 2020-11-13
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30