springboot与rabbitMQ实现延迟加载

为什么要延迟加载:

制定一项任务,在某个时间之后去执行,这种场景比较适合使用延迟加载的模式。

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

原理:

Time To Live(TTL)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

A: 通过队列属性设置,队列中所有消息都有相同的过期时间。

B: 对消息进行单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。

x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

x-dead-letter-routing-key:指定routing-key发送

队列出现dead letter的情况有:

消息或者队列的TTL过期

队列达到最大长度

消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

利用这两个特性,设置消息的过期时间,当生产的消息没有消费者去接收(这样的队列称为死信队列),消息在

rabbitServer的到达设置的过期时间时,就会将死信队列中的过期消息发送到DLX中设置的Exchange中,这样

就实现了延迟加载。

上图:

springboot与rabbitMQ实现延迟加载

集成过程如下:

生产者端配置

1.引入依赖:

[java] view plain copy

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-amqp</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-devtools</artifactId>
  18. </dependency>
  19. <dependency>
  20. <groupId>com.alibaba</groupId>
  21. <artifactId>fastjson</artifactId>
  22. <version>1.2.41</version>
  23. </dependency>
  24. </dependencies>

2.配置application.properties

[java] view plain copy

  1. server.port=10001
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. spring.rabbitmq.publisher-confirms=true
  7. spring.rabbitmq.virtual-host=/

3.配置AMQP

[java] view plain copy

  1. @Configuration
  2. public class AmqpConfig {
  3. @Bean
  4. RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  5. return new RabbitAdmin(connectionFactory);
  6. }
  7. @Bean
  8. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  9. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  10. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  11. return template;
  12. }
  13. }

4.声明队列、交换机

[java] view plain copy

  1. @Configuration
  2. public class ExchangeConfig {
  3. /******************************************死信队列***************************************************/
  4. //exchange name
  5. public static final String DEFAULT_EXCHANGE = "KSHOP";
  6. //DLX QUEUE
  7. public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "kshop.dead.letter.queue";
  8. //DLX repeat QUEUE 死信转发队列
  9. public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "kshop.repeat.trade.queue";
  10. //信道配置
  11. @Bean
  12. public DirectExchange defaultExchange() {
  13. return new DirectExchange(DEFAULT_EXCHANGE, true, false);
  14. }
  15. @Bean
  16. public Queue repeatTradeQueue() {
  17. Queue queue = new Queue(DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);
  18. return queue;
  19. }
  20. @Bean
  21. public Binding drepeatTradeBinding() {
  22. return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(DEFAULT_REPEAT_TRADE_QUEUE_NAME);
  23. }
  24. @Bean
  25. public Queue deadLetterQueue() {
  26. Map<String, Object> arguments = new HashMap<>();
  27. arguments.put("x-dead-letter-exchange", DEFAULT_EXCHANGE);
  28. arguments.put("x-dead-letter-routing-key", DEFAULT_REPEAT_TRADE_QUEUE_NAME);
  29. Queue queue = new Queue(DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);
  30. System.out.println("arguments :" + queue.getArguments());
  31. return queue;
  32. }
  33. @Bean
  34. public Binding deadLetterBinding() {
  35. return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(DEFAULT_DEAD_LETTER_QUEUE_NAME);
  36. }
  37. }

这里指定了声明了死信队列失效之后的发送的交换机和routing-key,其实这里可以指定两个交换机,一个是死信队列的交换机1绑定死信队列,一个是失效之后到达的交换机2绑定延迟队列,死信队列的交换机没有消费者去监听,而交换机2绑定的队列就是真正的延迟队列了,消费者去监听这个队列。

5.定义业务service

[java] view plain copy

  1. @Service
  2. public class DeadLetterService {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void send(LogCarrier logCarrier) {
  6. MessagePostProcessor processor = new MessagePostProcessor() {
  7. @Override
  8. public Message postProcessMessage(Message message) throws AmqpException {
  9. message.getMessageProperties().setExpiration(30000 + "");
  10. return message;
  11. }
  12. };
  13. rabbitTemplate.convertAndSend(ExchangeConfig.DEFAULT_EXCHANGE, ExchangeConfig.DEFAULT_DEAD_LETTER_QUEUE_NAME,
  14. JSON.toJSONString(logCarrier), processor);
  15. }
  16. }

这里指定了超时时间为30秒

6.创建controller去调用

[java] view plain copy

  1. @RestController
  2. public class DeadLetterController {
  3. <span style="white-space:pre;"> </span>@Autowired
  4. <span style="white-space:pre;"> </span>private DeadLetterService deadLetterService;
  5. <span style="white-space:pre;"> </span>@GetMapping("deadLetter")
  6. <span style="white-space:pre;"> </span>public void direct() throws InterruptedException {
  7. <span style="white-space:pre;"> </span>long i = 0;
  8. <span style="white-space:pre;"> </span>while(i<10) {
  9. <span style="white-space:pre;"> </span>LogCarrier contract = new LogCarrier();
  10. <span style="white-space:pre;"> </span>contract.setId(i++);
  11. <span style="white-space:pre;"> </span>contract.setType("direct");
  12. <span style="white-space:pre;"> </span>deadLetterService.send(contract);<span style="white-space:pre;"> </span>
  13. <span style="white-space:pre;"> </span>}
  14. <span style="white-space:pre;"> </span>System.out.println("消息发送时间:"+new Date());
  15. <span style="white-space:pre;"> </span>}
  16. }

这样,生产者端的基本都配置完成。

消费者端配置,消费者端的配置很简单:

1.依赖

[java] view plain copy

  1. <dependencies>
  2. <!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  3. </dependency> -->
  4. <dependency>
  5. <groupId>org.springframework.boot</groupId>
  6. <artifactId>spring-boot-starter-web</artifactId>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-test</artifactId>
  11. <scope>test</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-amqp</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-devtools</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.alibaba</groupId>
  23. <artifactId>fastjson</artifactId>
  24. <version>1.2.41</version>
  25. </dependency>
  26. </dependencies>

2.application.properties

[java] view plain copy

  1. server.port=0
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. spring.rabbitmq.publisher-confirms=true
  7. spring.rabbitmq.virtual-host=/

3.AMQP配置

[java] view plain copy

  1. @Configuration
  2. @EnableRabbit
  3. public class ConsumerConfig implements RabbitListenerConfigurer {
  4. @Bean
  5. public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
  6. DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
  7. factory.setMessageConverter(new MappingJackson2MessageConverter());
  8. return factory;
  9. }
  10. @Bean
  11. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  12. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  13. factory.setConnectionFactory(connectionFactory);
  14. factory.setPrefetchCount(1);//设置预读取数,可以进行有效的负载均衡。
  15. factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//自动ask
  16. return factory;
  17. }
  18. @Override
  19. public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
  20. registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
  21. }
  22. }

4.监听service

[java] view plain copy

  1. /**
  2. * 死信队列
  3. *
  4. * @author cfyj 2017年11月24日 下午3:11:05
  5. *
  6. */
  7. @Service
  8. public class CustomService4 {
  9. private static int num = 0;
  10. @RabbitListener(queues = "kshop.repeat.trade.queue")
  11. @RabbitHandler
  12. public void process(String obj) {
  13. LogCarrier logCarrier= JSON.parseObject(obj, LogCarrier.class);
  14. System.out.println(num+":------消息接收时间"+new Date()+logCarrier);
  15. }
  16. }

传输实体:

[java] view plain copy

  1. package com.cfyj.demo.domain;
  2. public class LogCarrier {
  3. private Long id;
  4. private String type;
  5. public String getType() {
  6. return type;
  7. }
  8. public void setType(String type) {
  9. this.type = type;
  10. }
  11. public Long getId() {
  12. return id;
  13. }
  14. public void setId(Long id) {
  15. this.id = id;
  16. }
  17. @Override
  18. public String toString() {
  19. return "LogCarrier [id=" + id + ", type=" + type + "]";
  20. }
  21. }

这样生产者和消费者都配置完成了。注意生产者和消费者端传输的对象实体类信息必须一致。

这样就开始测试吧,测试之前我们带着几个问题去测试:

只启动生产者,然后向死信队列发送信息,消息失效后会怎么样?

如果指定交换机的类型为fanout,没有消费者监听是否会将信息直接丢弃呢?

1.测试,启动生产者和消费者(先启动生产者来声明交换机和队列)

发送消息的时间

springboot与rabbitMQ实现延迟加载

死信队列中的消息数

springboot与rabbitMQ实现延迟加载

延迟队列的消息,这时因为过期时间还没到,所以死信队列中的信息还没有到达延迟队列中

springboot与rabbitMQ实现延迟加载

消费者收到延迟队列的时间

springboot与rabbitMQ实现延迟加载

接收-发送的时间正好为过期时间30s,这样就实现了消息的延迟消费,在到达过期时间后,死信队列的消息会发送到指定x-dead-letter-exchange的交换机中,由交换机发送到设置的延迟队列。

2.当我们只启动生产者时,发送消息,消息会怎么样?(topic类型和direct类型的测试结果一致)

发送请求时间:

springboot与rabbitMQ实现延迟加载

死信队列中的消息:

springboot与rabbitMQ实现延迟加载

当达到过期时间后,延迟队列的消息(注意两个队列收到消息的时间):

springboot与rabbitMQ实现延迟加载

当只启动生产者服务然后发送消息到死信队列时,消息会先堆积到死信队列,然后到达过期时间后重发到延迟队列中。

3.如果指定交换机的类型为fanout,没有消费者监听是否会将信息直接丢弃呢?

发送信息后,消息会先进入死信队列中,并没有直接丢弃消息

死信队列,注意接收消息的时间:

springboot与rabbitMQ实现延迟加载

死信队列中的消息到达过期时间后:

springboot与rabbitMQ实现延迟加载

延迟队列:

springboot与rabbitMQ实现延迟加载

测试结果与direct类型相同。

相关推荐