使用rabbitmq手动确认消息的,定时获取队列消息实现
描述问题
最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互。
相关业务
本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送
相关分析
网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理、性能和效率等相关基础业务的工作量,望而却步。。。。。。
还好spring有相关的 org.springframework.amqp 工具包,简化的大量麻烦>_> 让我们开始吧
了解rabbit的相关几个概念
了解了这几个概念的时候你可能已经关注到了我们今天的主题SimpleMessageListenerContainer
我们使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写,通过SimpleRabbitListenerContainerFactory我们可以完成相关SimpleMessageListenerContainer容器的管理,
但对于使用此容器批量消费的方式,官方并没有相关说明,网络上你可能只找到这篇SimpleMessageListenerContainer批量消息处理对于问题描述是很清晰,但是回答只是说的比较简单
下面我们就对这个问题的答案来个coding
解决办法
首先我们因为需要失败重试,使用spring的<a target="_blank" href="https://www.ancii.com/link/v1/8D52KvqFgjIEIEpaimQOPtXA-R5xOYA-pyuU27wq0aeMNIjvgD6RTBCfaXAPB5kajjtqL9PzceNNWD5NTGYHl1I4UoGUGX_3ZHQYgUGjQUuDSysXiDeViJ0HX4FcegSYwv66seecQaj8U6VgT6MjYSGwbHd2kdm3xKBQxMGCuHl7UkYyfCphbdG_nm-NsznE/" rel="nofollow" title="RepublishMessageRecoverer">RepublishMessageRecoverer</a>可以解决这个问题,这显然有一个缺点,即将在整个重试期间占用线程。所以我们使用了死信队列
相关配置
@Bean ObjectMapper objectMapper() { ObjectMapper objectMapper = new ObjectMapper(); DateFormat dateFormat = objectMapper.getDateFormat(); JavaTimeModule javaTimeModule = new JavaTimeModule(); SimpleModule module = new SimpleModule(); module.addSerializer(new ToStringSerializer(Long.TYPE)); module.addSerializer(new ToStringSerializer(Long.class)); module.addSerializer(new ToStringSerializer(BigInteger.class)); javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); objectMapper.registerModule(module); objectMapper.registerModule(javaTimeModule); objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化扩展日期格式支持 objectMapper.enable(SerializationFeature.INDENT_OUTPUT); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); return objectMapper; } @Bean RabbitAdmin admin (ConnectionFactory aConnectionFactory) { return new RabbitAdmin(aConnectionFactory); } @Bean MessageConverter jacksonAmqpMessageConverter( ) { return new Jackson2JsonMessageConverter(objectMapper()); } @Bean Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) { Queue queue = new Queue(Queues.QUEUE_BCW_PUSH); rabbitAdmin.declareQueue(queue); return queue; } @Bean Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) { Queue queue = new Queue(Queues.QUEUE_BCW_PAY); rabbitAdmin.declareQueue(queue); return queue; } @Bean Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) { Queue queue = new Queue(Queues.QUEUE_BCW_PULL); rabbitAdmin.declareQueue(queue); return queue; } /** * 声明一个交换机 * @return */ @Bean TopicExchange controlExchange () { return new TopicExchange(Exchanges.ExangeTOPIC); } /** * 延时重试队列 */ @Bean public Queue bcwPayControlRetryQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 10 * 1000); arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC); // 如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key arguments.put("x-dead-letter-routing-key", "queue_bcw.push"); return new Queue("@retry", true, false, false, arguments); } /** * 延时重试队列 */ @Bean public Queue bcwPushControlRetryQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 10 * 1000); arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC); // 如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key arguments.put("x-dead-letter-routing-key", "queue_bcw.push"); return new Queue("@retry", true, false, false, arguments); } /** * 延时重试队列 */ @Bean public Queue bcwPullControlRetryQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 10 * 1000); arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC); // 如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key // arguments.put("x-dead-letter-routing-key", "queue_bcw"); return new Queue("@retry", true, false, false, arguments); } @Bean public Binding bcwPayControlRetryBinding() { return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry"); } @Bean public Binding bcwPushControlRetryBinding() { return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry"); } @Bean public Binding bcwPullControlRetryBinding() { return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry"); } /** * 队列绑定并关联到RoutingKey * * @param queueMessages 队列名称 * @param exchange 交换机 * @return 绑定 */ @Bean Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push"); } /** * 队列绑定并关联到RoutingKey * * @param queueMessages 队列名称 * @param exchange 交换机 * @return 绑定 */ @Bean Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay"); } /** * 队列绑定并关联到RoutingKey * * @param queueMessages 队列名称 * @param exchange 交换机 * @return 绑定 */ @Bean Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull"); } @Bean @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(jacksonAmqpMessageConverter()); return factory; }
下面就是我们的主题,定时任务使用的是org.springframework.scheduling
/** * 手动确认消息的,定时获取队列消息实现 */ public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer { protected final Logger logger = LoggerFactory.getLogger(getClass()); private List<Message> body = new LinkedList<>(); public long start_time; private Channel channel; @Autowired private ObjectMapper objectMapper; @Autowired private RabbitTemplate rabbitTemplate; public QuartzSimpleMessageListenerContainer() { // 手动确认 this.setAcknowledgeMode(AcknowledgeMode.MANUAL); this.setMessageListener((ChannelAwareMessageListener) (message,channel) -> { long current_time = System.currentTimeMillis(); int time = (int) ((current_time - start_time)/1000); logger.info("====接收到{}队列的消息=====",message.getMessageProperties().getConsumerQueue()); Long retryCount = getRetryCount(message.getMessageProperties()); if (retryCount > 3) { logger.info("====此消息失败超过三次{}从队列的消息删除=====",message.getMessageProperties().getConsumerQueue()); try { channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ex) { ex.printStackTrace(); } return; } this.body.add(message); /** * 判断数组数据是否满了,判断此监听器时间是否大于执行时间 * 如果在最后延时时间段内没有业务消息,此监听器会一直开着 */ if(body.size()>=3 || time>60){ this.channel = channel; callback(); } }); } private void callback(){ // channel = getChannel(getTransactionalResourceHolder()); if(body.size()>0 && channel !=null && channel.isOpen()){ try { callbackWork(); }catch (Exception e){ logger.error("推送数据出错:{}",e.getMessage()); body.stream().forEach(message -> { Long retryCount = getRetryCount(message.getMessageProperties()); if (retryCount <= 3) { logger.info("将消息置入延时重试队列,重试次数:" + retryCount); rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message); } }); } finally{ logger.info("flsher too data"); body.stream().forEach(message -> { //手动acknowledge try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { logger.error("手动确认消息失败!"); e.printStackTrace(); } }); body.clear(); this.stop(); } } } abstract void callbackWork() throws Exception; /** * 获取消息失败次数 * @param properties * @return */ private long getRetryCount(MessageProperties properties){ long retryCount = 0L; Map<String,Object> header = properties.getHeaders(); if(header != null && header.containsKey("x-death")){ List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death"); if(deaths.size()>0){ Map<String,Object> death = deaths.get(0); retryCount = (Long)death.get("count"); } } return retryCount; } @Override @Scheduled(cron = "0 0/2 * * * ? ") public void start() { logger.info("start push data scheduled!"); //初始化数据,将未处理的调用stop方法,返还至rabbit body.clear(); super.stop(); start_time = System.currentTimeMillis(); super.start(); logger.info("end push data scheduled!"); } public List<WDNJPullOrder> getBody() { List<WDNJPullOrder> collect = body.stream().map(data -> { byte[] body = data.getBody(); WDNJPullOrder readValue = null; try { readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() { }); } catch (IOException e) { logger.error("处理数据出错{}",e.getMessage()); } return readValue; } ).collect(Collectors.toList()); return collect; } }
后续
当然定时任务的启动,你可以写到相关rabbit容器实现的里面,但是这里并不是很需要,所以对于这个的小改动,同学你可以自己实现
@Scheduled(cron = "0 0/2 * * * ? ") public void start()