使用rabbitmq手动确认消息的,定时获取队列消息实现

描述问题

最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互。

相关业务

本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送

相关分析

网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理、性能和效率等相关基础业务的工作量,望而却步。。。。。。

还好spring有相关的 org.springframework.amqp 工具包,简化的大量麻烦>_> 让我们开始吧

了解rabbit的相关几个概念

 了解了这几个概念的时候你可能已经关注到了我们今天的主题SimpleMessageListenerContainer

 我们使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写,通过SimpleRabbitListenerContainerFactory我们可以完成相关SimpleMessageListenerContainer容器的管理,

  但对于使用此容器批量消费的方式,官方并没有相关说明,网络上你可能只找到这篇SimpleMessageListenerContainer批量消息处理对于问题描述是很清晰,但是回答只是说的比较简单

下面我们就对这个问题的答案来个coding

解决办法

首先我们因为需要失败重试,使用spring的<a target="_blank" href="https://www.ancii.com/link/v1/8D52KvqFgjIEIEpaimQOPtXA-R5xOYA-pyuU27wq0aeMNIjvgD6RTBCfaXAPB5kajjtqL9PzceNNWD5NTGYHl1I4UoGUGX_3ZHQYgUGjQUuDSysXiDeViJ0HX4FcegSYwv66seecQaj8U6VgT6MjYSGwbHd2kdm3xKBQxMGCuHl7UkYyfCphbdG_nm-NsznE/" rel="nofollow" title="RepublishMessageRecoverer">RepublishMessageRecoverer</a>可以解决这个问题,这显然有一个缺点,即将在整个重试期间占用线程。所以我们使用了死信队列

相关配置

@Bean
    ObjectMapper objectMapper() {
        ObjectMapper objectMapper = new ObjectMapper();
        DateFormat dateFormat = objectMapper.getDateFormat();
        JavaTimeModule javaTimeModule = new JavaTimeModule();

        SimpleModule module = new SimpleModule();
        module.addSerializer(new ToStringSerializer(Long.TYPE));
        module.addSerializer(new ToStringSerializer(Long.class));
        module.addSerializer(new ToStringSerializer(BigInteger.class));

        javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));

        objectMapper.registerModule(module);
        objectMapper.registerModule(javaTimeModule);
        objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化扩展日期格式支持
        objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        return objectMapper;
}



  @Bean
  RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
    return new RabbitAdmin(aConnectionFactory);
  }

  @Bean
  MessageConverter jacksonAmqpMessageConverter( ) {
    return new Jackson2JsonMessageConverter(objectMapper());
  }


  @Bean
  Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
    Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
    rabbitAdmin.declareQueue(queue);
    return queue;
  }
  @Bean
  Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
    Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
    rabbitAdmin.declareQueue(queue);
    return queue;
  }
  @Bean
  Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
    Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
    rabbitAdmin.declareQueue(queue);
    return queue;
  }
    /**
     * 声明一个交换机
     * @return
     */
  @Bean
  TopicExchange controlExchange () {
      return new TopicExchange(Exchanges.ExangeTOPIC);
  }


    /**
     * 延时重试队列
     */
    @Bean
    public Queue bcwPayControlRetryQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 10 * 1000);
        arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
//        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
        arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
        return new Queue("@retry", true, false, false, arguments);
    }
    /**
     * 延时重试队列
     */
    @Bean
    public Queue bcwPushControlRetryQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 10 * 1000);
        arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
//        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
        arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
        return new Queue("@retry", true, false, false, arguments);
    }
    /**
     * 延时重试队列
     */
    @Bean
    public Queue bcwPullControlRetryQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 10 * 1000);
        arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
//        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
//        arguments.put("x-dead-letter-routing-key", "queue_bcw");
        return new Queue("@retry", true, false, false, arguments);
    }
    @Bean
    public Binding  bcwPayControlRetryBinding() {
        return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
    }
    @Bean
    public Binding  bcwPushControlRetryBinding() {
        return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
    }
    @Bean
    public Binding   bcwPullControlRetryBinding() {
        return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
    }

  /**
   * 队列绑定并关联到RoutingKey
   *
   * @param queueMessages 队列名称
   * @param exchange      交换机
   * @return 绑定
   */
  @Bean
  Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
    return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
  }
  /**
   * 队列绑定并关联到RoutingKey
   *
   * @param queueMessages 队列名称
   * @param exchange      交换机
   * @return 绑定
   */
  @Bean
  Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
    return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
  }
  /**
   * 队列绑定并关联到RoutingKey
   *
   * @param queueMessages 队列名称
   * @param exchange      交换机
   * @return 绑定
   */
  @Bean
  Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
    return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
  }

  @Bean
  @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
          SimpleRabbitListenerContainerFactoryConfigurer configurer,
          ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setMessageConverter(jacksonAmqpMessageConverter());
    return factory;
  }

下面就是我们的主题,定时任务使用的是org.springframework.scheduling

/**
 * 手动确认消息的,定时获取队列消息实现
 */
public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private List<Message> body = new LinkedList<>();
    public long start_time;
    private Channel channel;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public QuartzSimpleMessageListenerContainer() {
        // 手动确认
        this.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        this.setMessageListener((ChannelAwareMessageListener)  (message,channel)  -> {
            long current_time = System.currentTimeMillis();
            int time = (int) ((current_time - start_time)/1000);
            logger.info("====接收到{}队列的消息=====",message.getMessageProperties().getConsumerQueue());
            Long retryCount = getRetryCount(message.getMessageProperties());
            if (retryCount > 3) {
                logger.info("====此消息失败超过三次{}从队列的消息删除=====",message.getMessageProperties().getConsumerQueue());
                try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
                return;
            }

            this.body.add(message);
            /**
             * 判断数组数据是否满了,判断此监听器时间是否大于执行时间
             * 如果在最后延时时间段内没有业务消息,此监听器会一直开着
             */
            if(body.size()>=3 || time>60){
                this.channel = channel;
                callback();
            }
        });



    }
    private void callback(){
//         channel = getChannel(getTransactionalResourceHolder());
        if(body.size()>0 && channel !=null &&  channel.isOpen()){
            try {
                callbackWork();
            }catch (Exception e){
                logger.error("推送数据出错:{}",e.getMessage());

                body.stream().forEach(message -> {
                    Long retryCount = getRetryCount(message.getMessageProperties());
                    if (retryCount <= 3) {
                        logger.info("将消息置入延时重试队列,重试次数:" + retryCount);
                        rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
                    }
                });

            } finally{

                logger.info("flsher too data");

                body.stream().forEach(message -> {
                    //手动acknowledge
                    try {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    } catch (IOException e) {
                        logger.error("手动确认消息失败!");
                        e.printStackTrace();
                    }
                });

                body.clear();
                this.stop();

            }
        }

    }
    abstract void callbackWork() throws Exception;
    /**
     * 获取消息失败次数
     * @param properties
     * @return
     */
    private long getRetryCount(MessageProperties properties){
        long retryCount = 0L;
        Map<String,Object> header = properties.getHeaders();
        if(header != null && header.containsKey("x-death")){
            List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
            if(deaths.size()>0){
                Map<String,Object> death = deaths.get(0);
                retryCount = (Long)death.get("count");
            }
        }
        return retryCount;
    }

    @Override
    @Scheduled(cron = "0 0/2 * * * ? ")
    public void start() {
        logger.info("start push data scheduled!");
        //初始化数据,将未处理的调用stop方法,返还至rabbit
        body.clear();
        super.stop();
        start_time = System.currentTimeMillis();
        super.start();

        logger.info("end push data scheduled!");
    }

    public List<WDNJPullOrder> getBody() {

        List<WDNJPullOrder> collect = body.stream().map(data -> {
                    byte[] body = data.getBody();
                    WDNJPullOrder readValue = null;
                    try {
                        readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() {
                        });
                    } catch (IOException e) {
                        logger.error("处理数据出错{}",e.getMessage());
                    }
                    return readValue;
                }
        ).collect(Collectors.toList());

        return collect;


    }

}

后续

当然定时任务的启动,你可以写到相关rabbit容器实现的里面,但是这里并不是很需要,所以对于这个的小改动,同学你可以自己实现

@Scheduled(cron = "0 0/2 * * * ? ")

public void start()
 

相关推荐