RabbitMQ(三) RabbitMQ高级整合应用
RabbitMQ整合Spring AMQP实战
常用组件介绍
RabbitAdmin
Spring AMQP声明 通过@Bean注解进行声明
RabbitTemplate
SimpleMessageListenerContainer 对消息消费进行详细配置和优化
MessageListenerAdapter 消息监听适配器,建立在监听器基础之上
MessageConverter
RabbitAdmin
RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
注意:autoSatrtup必须设置为true,否则spring容器不会加载RabbitAdmin类
RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明;
底层使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作;
RabbitMQ简单使用
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置
@Configuration public class RabbitMqConfig1 { /** * 设置连接 * @return ConnectionFactory */ @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } /** * 创建RabbitAdmin * @return RabbitAdmin */ @Bean public RabbitAdmin rabbitAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); //默认就是true rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
测试
@Autowired private RabbitAdmin rabbitAdmin; /** * RabbitAdmin api应用 */ @Test public void testAdmin() { rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); //绑定 rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //使用 BindingBuilder 创建绑定 // https://docs.spring.io/spring-amqp/docs/2.1.16.BUILD-SNAPSHOT/reference/html/#builder-api rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.topic.queue", false)) //直接创建队列 .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系 .with("user.#")); //指定路由Key //FanoutExchange 类型exchange不走路由键 rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 // rabbitAdmin.purgeQueue("test.topic.queue", false); }
SpringAMQP声明(Exchange、Queue、Binding)
在RabbitMQ基础AP里面声明一个Exchange、声明一个绑定、一个队列
//基础API声明一个exchange channel.exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) //基础API 声明一个队列 channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) //基础API 声明binding channel.queueBind(String queue, String exchange, String routingKey)
使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
//声明Topic 类型的exchange @Bean public TopicExchange topicExchange() { //exchange 持久化 // Exchange springEchange = ExchangeBuilder.topicExchange("spring_amqp_test_echange").durable(true).build(); return new TopicExchange("spring_amqp_test_echange", true, false); } //声明队列 @Bean public Queue queue() { // Queue spring_amqp_test_echange = QueueBuilder.durable("spring_amqp_test_echange").build(); return new Queue("spring_amqp_test_queue"); } //建立绑定 @Bean public Binding binding(TopicExchange topicExchange, Queue queue) { return BindingBuilder.bind(queue).to(topicExchange).with("spring.*"); }
消息模板 RabbitTemplate
RabbitTemplate,即消息模板。
在与SpringAMQP整合的时候进行发送消息的关键类
该类提供了丰富的发送消息的方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进入注入到Spring容器中,然后直接使用;
在与Spring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可;
RabbitTemplate简单使用
配置
@Configuration public class RabbitMqConfig3 { /** * 设置连接 * * @return ConnectionFactory */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } /** * 创建RabbitAdmin * * @return RabbitAdmin */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); //默认就是true rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 消息模板 * * @param connectionFactory connectionFactory * @return RabbitTemplate */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } /** * 针对消费者配置 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001(TopicExchange exchange001, Queue queue001) { return BindingBuilder.bind(queue001).to(exchange001).with("spring.*"); } }
测试
@Test public void testSendMessage() { //1 创建消息 //AMQP消息的消息属性 //MessageBuilder(也可以构建Message) 使用流利的API从byte[]主体或其他消息构建Spring AMQP消息。 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加额外的设置---------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); }
队列queue001
@Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); }
队列queue001
队列queue002
简单消息监听容器:SimpleMessageListenerContainer
- 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
- 监听队列(多个队列)、自动启动、自动声明功能
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
- 设置消费者数量、最小最大数量、批量消费
- 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等等
注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大;
思考一下:SimpleMessageListenerContainer为什么可以动态感知配置变更?
配置
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // container.setQueueNames(); 接收字符串的队列名 // container.setQueues(queue001(), queue002(), queue003()); //当前消费者数量 container.setConcurrentConsumers(1); //最大消费者数量 container.setMaxConcurrentConsumers(5); //是否使用重队列 container.setDefaultRequeueRejected(false); //自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setExposeListenerChannel(true); //消费端的标签策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); //设置消息监听 //必须设置消息监听 否则 报 No message listener specified - see property ‘messageListener‘ container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消费者: " + msg); //做消息处理.... } }); return container; }
消息监听适配器:MessageListenerAdapter
通过MessageListenerAdapter的代码我们可以看出如下核心属性:
- defaultListenerMethod默认监听方法名称:用于设置监听方法名称
- Delegate委托对象:实际真实的委托对象,用于处理消息、
- queueOrTagToMethodName: 队列标识与方法名称组成的集合
- 可以一一进行队列与方法名称的匹配;
- 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接收处理;
配置
public class MessageDelegate1 { public void handleMessage(byte[] messageBody) { System.err.println("默认方法, 消息内容:" + new String(messageBody)); } public void consumeMessage(byte[] messageBody) { System.err.println("字节数组方法, 消息内容:" + new String(messageBody)); } public void consumeMessage(String messageBody) { System.err.println("字符串方法, 消息内容:" + messageBody); } public void method1(String messageBody) { System.err.println("method1 收到消息内容:" + new String(messageBody)); } public void method2(String messageBody) { System.err.println("method2 收到消息内容:" + new String(messageBody)); } public void consumeMessage(Map messageBody) { System.err.println("map方法, 消息内容:" + messageBody); } }
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // container.setQueueNames(); 接收字符串的队列名 // container.setQueues(queue001(), queue002(), queue003()); //当前消费者数量 container.setConcurrentConsumers(1); //最大消费者数量 container.setMaxConcurrentConsumers(5); //是否使用重队列 container.setDefaultRequeueRejected(false); //自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setExposeListenerChannel(true); //消费端的标签策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); //1 适配器方式. 默认是有自己的方法名字的:handleMessage // 可以自己指定一个方法的名字: consumeMessage // 也可以添加一个转换器: 从字节数组转换为String //MessageDelegate1如何写 MessageListenerAdapter 源码里面也给出了一些建议 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate1()); //默认的方法是 public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage"; adapter.setDefaultListenerMethod("consumeMessage"); //TextMessageConverter 自定义的消息转换器 //new TextMessageConverter()-->consumeMessage(byte[] messageBody))->MessageProperties.setContentType("text/plian") //new Jackson2JsonMessageConverter()--->consumeMessage(Map messageBody))->MessageProperties.setContentType("application/json") // adapter.setMessageConverter(new Jackson2JsonMessageConverter()); container.setMessageListener(adapter); return container; }
MessageConverter消息转换器
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter;
自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
重写下面两个方法:
- toMessage:java对象转换为Message
- fromMessage:Message对象转换为java对象
MessageConverter消息转换器:
Json转换器:Jackson2JsonMessageConverter:可以进行java对象的转换功能;
DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系;
自定义二进制转换器:比如图片类型、PDF、PPT、流媒体等
使用转换器的目的是当传入不同的类型的数据(如json,类,PDF,图片等)时,在消息的接收方接收到时也总是以传入的类型接收结果对象;我们通过写入不同的转换器以达到此种效果。具体可百度。
JSON格式转换
默认监听方法的参数为Map
public class Order { private String id; private String name; private String content; public Order() { } public Order(String id, String name, String content) { this.id = id; this.name = name; this.content = content; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
配置
// 1.1 支持json格式的转换器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); // public void consumeMessage(Map messageBody) { // System.err.println("map方法, 消息内容:" + messageBody); // } //对应map参数方法 adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
测试
@Test public void testSendJsonMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("消息订单"); order.setContent("描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); }
JSON格式转换支持Java对象
默认监听方法的参数为Java对象
委托对象方法
public void consumeMessage(Order order) { System.err.println("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); }
配置
// 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); //信任所有的包,否则会报 报不信任 javaTypeMapper.setTrustedPackages("*"); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
测试
@Test public void testSendJavaMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("订单消息"); order.setContent("订单描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); //__TypeId__ 这个是固定写法 messageProperties.getHeaders().put("__TypeId__", "com.niugang.spring.entity.Order"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); }
输出
order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
JSON格式转换支持Java对象(二)
委托对象方法
public void consumeMessage(Order order) { System.err.println("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } public void consumeMessage(Packaged pack) { System.err.println("package对象, 消息内容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); }
配置
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); idClassMapping.put("order", com.niugang.spring.entity.Order.class); idClassMapping.put("packaged", com.niugang.spring.entity.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
测试
@Test public void testSendMappingMessage() throws Exception { ObjectMapper mapper = new ObjectMapper(); Order order = new Order(); order.setId("001"); order.setName("订单消息"); order.setContent("订单描述信息"); String json1 = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json1); MessageProperties messageProperties1 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties1.setContentType("application/json"); messageProperties1.getHeaders().put("__TypeId__", "order"); Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); Packaged pack = new Packaged(); pack.setId("002"); pack.setName("包裹消息"); pack.setDescription("包裹描述信息"); String json2 = mapper.writeValueAsString(pack); System.err.println("pack 4 json: " + json2); MessageProperties messageProperties2 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties2.setContentType("application/json"); messageProperties2.getHeaders().put("__TypeId__", "packaged"); Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); }
全局消息转化器与自定义转化器
自定义文本转化器
public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
自定义图片转化器
/** * 图片转化器 */ public class ImageMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------Image MessageConverter----------"); Object _extName = message.getMessageProperties().getHeaders().get("extName"); String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); //目录必须存在 String path = "d:/springbootlog/" + fileName + "." + extName; File f = new File(path); try { //拷贝到指定路径 Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
自定义pdf转化器
public class PDFMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------PDF MessageConverter----------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "d:/springbootlog/" + fileName + ".pdf"; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
委托对象
public void consumeMessage(File file) { System.err.println("文件对象 方法, 消息内容:" + file.getName()); }
配置
//1.4 ext convert MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter);
测试
@Test public void testSendExtConverterMessage() throws Exception { byte[] body = Files.readAllBytes(Paths.get("C:\\Users\\Administrator\\Desktop\\公众号", "spring.png")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("image/png"); messageProperties.getHeaders().put("extName", "png"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "image_queue", message); byte[] body1 = Files.readAllBytes(Paths.get("D:\\Documents\\技术书籍", "Java huashan-2019-06-20.pdf")); MessageProperties messageProperties1 = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message1 = new Message(body1, messageProperties); rabbitTemplate.send("", "pdf_queue", message1); }
SpringBoot整合配置详解(生产端)
- publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback
- publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功: RabbitTemplate.ReturnCallback
注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效;生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等。
生产端代码示例
application.properties
spring.rabbitmq.addresses=localhost:5672 #spring.rabbitmq.host=localhost #spring.rabbitmq.port=5762 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # 消息确认模式 spring.rabbitmq.publisher-confirms=true # 消息返回模式 spring.rabbitmq.publisher-returns=true # 为true 消息返回模式才生效 spring.rabbitmq.template.mandatory=true
配置
/** * springboot 消息生产者 * * @author niugang */ @Configuration public class RabbitMqConfig { /** * 自动注入RabbitTemplate模板类 */ @Autowired private RabbitTemplate rabbitTemplate; /** * 回调函数: confirm确认 */ final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); System.err.println("ack: " + ack); if (!ack) { System.err.println("异常处理...."); } } }; /** * 回调函数: return返回 */ final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } }; /* 队列监听在消费者端配置,没有将会自动创建 @Bean public TopicExchange topicExchange() { return new TopicExchange("exchange-1"); } @Bean public Queue queue() { return new Queue("queue-1"); } @Bean public Binding binding(Queue queue, TopicExchange topicExchange) { return BindingBuilder.bind(queue).to(topicExchange).with("springboot.#"); }*/ /** * 发送消息方法调用: 构建Message消息 * * @param message 消息体 * @param properties 消息属性 */ public void send(Object message, Map<String, Object> properties) { MessageProperties messageProperties = new MessageProperties(); if (properties != null && properties.size() > 0) { Set<Map.Entry<String, Object>> entries = properties.entrySet(); for (Map.Entry<String, Object> entry : entries) { String key = entry.getKey(); Object value = entry.getValue(); messageProperties.setHeader(key, value); } } //org.springframework.amqp.core Message msg = MessageBuilder.withBody(message.toString().getBytes()).andProperties(messageProperties).build(); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //routingKey修改 为 spring.abc 消息将走 returnCallback rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData); } }
测试
在rabbitmq控制台新建,Exchange名为exchange-1,新建队列queue-1,并建立两者之间的绑定,routingKey为springboot.#
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { } @Autowired private RabbitMqConfig rabbitMqConfig ; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Test public void testSender1() throws Exception { Map<String, Object> properties = new HashMap<>(); properties.put("number", "12345"); properties.put("send_time", simpleDateFormat.format(new Date())); rabbitMqConfig.send("Hello RabbitMQ For Spring Boot!"+System.currentTimeMillis(), properties); } }
注意:进行单元测试,ack一直是false;改为url请求,ack就正常了
SpringBoot整合配置详解(消费端)
消费端核心配置
# NONE, MANUAL, AUTO; 手工消息消息确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual #监听器调用程序线程的最小数量。 spring.rabbitmq.listener.simple.concurrency=5 #监听器调用程序线程的最大数量。 spring.rabbitmq.listener.simple.max-concurrency=10 # spring.rabbitmq.listener.type=simple 默认为 SimpleContainer 模式对应 spring.rabbitmq.listener.simple 前缀相关的
注意点
- 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理
- 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况。
@RabbitListener注解的使用
- 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用。
- @RabbitListener是一个组合注解,里面可以注解配置
- @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。
消费者端代码示例
类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
properties
#spring.rabbitmq.addresses=localhost:5672 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # NONE, MANUAL, AUTO; 手工消息消息确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual #监听器调用程序线程的最小数量。 spring.rabbitmq.listener.simple.concurrency=5 #监听器调用程序线程的最大数量。 spring.rabbitmq.listener.simple.max-concurrency=10 # spring.rabbitmq.listener.type=simple 默认为 SimpleContainer 模式对应 spring.rabbitmq.listener.simple 前缀相关的 spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-2 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.*
配置
public class Order implements Serializable { private String id; private String name; public Order() { } public Order(String id, String name) { super(); this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
/** * 消费者类 * * @author niugang */ @Configuration public class RabbitMQReceiver { /** * 从1.5.0版开始,您可以在类级别指定@RabbitListener注释。 * 与新的@RabbitHandler批注一起,这使单个侦听器可以根据传入消息的有效负载类型调用不同的方法。 * * @RabbitListener(id="multi", queues = "someQueue") * @SendTo("my.reply.queue") public class MultiListenerBean { * @RabbitHandler public String thing2(Thing2 thing2) { * ... * } * @RabbitHandler public String cat(Cat cat) { * ... * } * @RabbitHandler public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) { * ... * } * @RabbitHandler(isDefault = true) * public String defaultMethod(Object object) { * ... * } * } * 在这种情况下,如果转换后的有效负载是Thing2,Cat或Hat,则会调用各个@RabbitHandler方法。 * 您应该了解,系统必须能够根据有效负载类型识别唯一方法。 * 检查该类型是否可分配给没有注释或带有@Payload注释的单个参数。 * 请注意,如方法级别@RabbitListener(前面所述)中所述,应用了相同的方法签名。 */ //队列 exchange 绑定 没有 自动创建 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable = "true"), exchange = @Exchange(value = "exchange-1", durable = "true", type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"), key = "springboot.*" //routing key ) ) @RabbitHandler //@RabbitListener 提供了很多灵活的签名 如Message Channel @Payload @Header 等 具体可查看源码 // org.springframework.amqp.core.Message // org.springframework.messaging.Message public void onMessage(Message message, Channel channel) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端Payload: " + new String(message.getBody())); System.err.println("消费端MessageProperties.: " + message.getMessageProperties()); //AmqpHeaders header属性封装 //手工ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } /** * spring.rabbitmq.listener.order.queue.name=queue-2 * spring.rabbitmq.listener.order.queue.durable=true * spring.rabbitmq.listener.order.exchange.name=exchange-1 * spring.rabbitmq.listener.order.exchange.durable=true * spring.rabbitmq.listener.order.exchange.type=topic * spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true * spring.rabbitmq.listener.order.key=springboot.* * * @param order order * @param channel channel * @param headers headers * @throws Exception Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable = "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) ) @RabbitHandler //@Headers 必须通过Map接收 //@Header("amqp_receivedRoutingKey") String rk 直接获取header中某一个key //默认前缀为amqp_ /** * {amqp_receivedDeliveryMode=PERSISTENT, * amqp_receivedExchange=exchange-2, * amqp_deliveryTag=1, * amqp_consumerQueue=queue-2, * amqp_redelivered=false, amqp_receivedRoutingKey=springboot.def, spring_listener_return_correlation=175a21c4-ffd5-4a3e-ac3a-2f63d60c18a5, spring_returned_message_correlation=0987654321, id=53443ced-0b23-3079-71c2-09997897a553, amqp_consumerTag=amq.ctag-V0hqyVObrHXJeC60MwPSVQ, contentType=application/x-java-serialized-object, timestamp=1591240122842} */ public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端order: " + order.getId()); System.err.println("消费端headers: " + headers); Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); } }