消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)
前言
1. AMQP 核心组件
- RabbitAdmin
- SpringAMQP声明
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
2. RabbitAdmin
RabbitAdmin类可以很好的才注意RabbitMQ,在Spring中直接进行诸如即可。
注意:
- autoStartUp必须要设置为true,否则Spring容器不会加载RabbitAdmin类
- RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
- 使用RabbitTemplate的execute方法执行对应的什么、修改、删除等一系列RabbitMQ基础功能操作
- 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等
2.1 代码演示
2.1.1 引入Pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cp</groupId> <artifactId>rabbitmq-spring</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq-spring</name> <description>rabbitmq-spring</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.14.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.1.2 配置Bean
@Configuration @ComponentScan({"com.cp.spring.*"}) public class RabbitMQConfig { //相当于<Bean id="connectionFactory"></Bean> @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("user_cp"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/vhost_cp"); return connectionFactory; } //形参名称要与bean的方法名保持一致 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.1.3 测试类
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { //直连监听 rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); //第一个参数:具体的队列 第二个参数:绑定的类型 第三个参数:交换机 第四个参数:路由key 第五个参数:arguments 参数 rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //BindingBuilder 链式编程 rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.topic.queue", false)) //直接创建队列 .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系 .with("user.#")); //指定路由Key rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue", false); } }
通过以上代码,可以自行测试一下结果。
RabbitAdmin源码
实现了InitializingBean
接口,表明在Bean配置加载完后再加载RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。
this.applicationContext.getBeansOfType(Collection.class, false, false).values()
可以看到Exchange、Queue、Binding都是从Spring容器中获取三种类型,加载到上方定义的contextExchanges、contextQueues、contextBindings三种容器中。
后续的源码中,也可以看出通过筛选Spring容器中RabbitMQ的信息之后,再去建立RabbitMQ服务器的连接。主要通过Spring以@Bean的方式,将配置加载到Spring容器之后,再从容器中获取相关信息,再去建立连接。
3. SpringAMQP声明
- 在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列
-使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
3.1 代码演示
@Configuration @ComponentScan({"com.cp.spring.*"}) public class RabbitMQConfig { //相当于<Bean id="connectionFactory"></Bean> @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("user_cp"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/vhost_cp"); return connectionFactory; } //形参名称要与bean的方法名保持一致 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 针对消费者配置 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue003() { return new Queue("queue003", true); //队列持久 } @Bean public Binding binding003() { //同一个Exchange绑定了2个队列 return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*"); } }
再次运行ApplicationTests类中testAdmin()方法,可以在控制台中,查看到一个Exchange绑定两个Queue。
4. RabbitTemplate
RabbitTemplate,即消息模板
- 我们在与SpringAMQP整合的时候进行发送消息的关键词
- 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进行注入到Spring容器中,然后直接使用
- 在与SPring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可
4.1 代码演示
4.1.1 RabbitMQConfig类
在RabbitMQConfig类中写RabbitTemplate配置
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; }
4.1.2 ApplicationTests类
在ApplicationTests测试类中添加测试方法,进行测试。
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); //消息体,与参数 Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); //转换并发送 //MessagePostProcessor 在消息发送完毕后再做一次转换进行再加工,匿名接口,需要重写方法 rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加额外的设置---------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); }
4.1.3 查看管控台
运行前,可以看到queue001
中是没有消息的。
运行testSendMessage()方法。并获取消息。
4.1.4 简单写法
@Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); }
我们往topic001中发送了两条消息,topic002中发送了一条消息。运行testSendMessage2() 接下来再查看下管控台
。
可以看到topic001中已经有了三条消息,刚才发送的消息也还在。GetMessage并不是消费消息,而只是获取消息。
5. SimpleMessageListenerContainer
简单消息监听容器
- 这个类非常的强大,我们可以对它进行很多设置,对于消费者的配置项,这个类都可以满足
- 监听队列(多个队列)、自动启动、自动声明功能
- 设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等
- 设置消费者数量、最小最大数量、批量消费
- 设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等等。
注意:
- SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等
- 很多机遇RabbitMQ的自制定话后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大
思考
SimpleMessageListenerContainer为什么可以动态感知配置变更?
5.1 代码演示
5.1.1 RabbitMQConfig类
配置中添加如下代码:
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //添加多个队列进行监听 container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); //当前消费者数量 container.setConcurrentConsumers(1); //最大消费者数量 container.setMaxConcurrentConsumers(5); //设置重回队列,一般设置false container.setDefaultRequeueRejected(false); //设置自动签收机制 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //设置listener外露 container.setExposeListenerChannel(true); //消费端标签生成策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { //每个消费端都有自己独立的标签 return queue + "_" + UUID.randomUUID().toString(); } }); //消息监听 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消费者: " + msg); } }); return container; }
运行之前写的testSendMessage2()方法,查看管控台中的相关信息以及控制台打印信息
6. MessageListenerAdapter
MessageListenerAdapter 即消息监听适配器
6.1 代码演示
6.1.1 适配器使用方式1
我们把之前的消息监听代码注释,可以不用直接加消息监听,而是采用MessageListenerAdapter的方式,通过适配器方式1,我们来学习下如何使用默认的handleMessage,自定义方法名,自定义转换器。
使用默认handleMessage
//消息监听 /*container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消费者: " + msg); } });*/ MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); container.setMessageListener(adapter);
MessageListenerAdapter 适配器类,熟悉适配器模式的朋友肯定了解适配器模式的话,可以通过适配器,适配自己的实现,这里我们适配自定义的MessageDelegate
类。我们就可以不采用监听的方式,采用适配的方式。
自定义MessageDelegate
public class MessageDelegate { public void handleMessage(byte[] messageBody) { System.err.println("默认方法, 消息内容:" + new String(messageBody)); } }
MessageDelegate类中,方法名与参数handleMessage(byte[] messageBody)
是固定的。为什么呢?
MessageListenerAdapter源码分析
我们来看下MessageListenerAdapter底层代码
MessageListenerAdapter类中
public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
默认方法名就是叫handleMessage。当然也可以自己去指定设置。通过messageListenerAdapter的代码我们可以看出如下核心属性
- defaultListenerMethod默认监听方法名称:用于设置监听方法名称
- Delegate 委托对象:实际真实的委托对象,用于处理消息
- queueOrTagToMethodName 队列标识与方法名称组成集合
- 可以一一进行队列与方法名称的匹配
- 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理
测试一下默认使用的handleMessage方法。启动ApplicationTests类,运行testSendMessage()测试方法。
自定义方法名
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); container.setMessageListener(adapter);
修改MessageDelegate()类
public class MessageDelegate { public void consumeMessage(byte[] messageBody) { System.err.println("字节数组方法, 消息内容:" + new String(messageBody)); } }
自定义TextMessageConverter转换器
public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
修改RabbitMQConfig类
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter);
修改MessageDelegate类
public class MessageDelegate { public void consumeMessage(String messageBody) { System.err.println("字符串方法, 消息内容:" + messageBody); } }
运行testSendMessage4Text()测试方法
@Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); }
注意:在发消息的时候,必须符合自己的转换器。
打印结果
6.1.2 适配器使用方式2
自定义队列名称和方法名称。
/** * 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配 * / MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setMessageConverter(new TextMessageConverter()); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001", "method1"); queueOrTagToMethodName.put("queue002", "method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); container.setMessageListener(adapter);
public class MessageDelegate { public void method1(String messageBody) { System.err.println("method1 收到消息内容:" + new String(messageBody)); } public void method2(String messageBody) { System.err.println("method2 收到消息内容:" + new String(messageBody)); } }
运行 测试方法
@Test public void testSendMessage4Text() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); }
运行结果:
7. MessageConverter消息转换器
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter
- 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
- 重写下面两个方法:
toMessage:java对象转换为Message
fromMessage:Message对象转换为java对象 - Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能
- DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
- 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体
7.1 代码演示
其实我们在介绍MessageListenerAdapter的时候,中间就介绍到了TextMessageConverter转换器,将二进制数据转换成字符串数据。
7.1.1 添加json格式的转换器
修改RabbitMQConfig类
// 1.1 支持json格式的转换器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //重点,加入json格式的转换器 json对应Map对象 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { //json对应Map对象 public void consumeMessage(Map messageBody) { System.err.println("map方法, 消息内容:" + messageBody); } }
定义一个Order对象
public class Order { private String id; private String name; private String content; ...省略get/set等方法 }
定义测试方法
@Test public void testSendJsonMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("消息订单"); order.setContent("描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); }
打印结果:
7.1.2 添加支持Java对象转换
修改RabbitMQConfig类
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //需要将javaTypeMapper放入到Jackson2JsonMessageConverter对象中 DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { public void consumeMessage(Order order) { System.err.println("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } }
定义测试方法
@Test public void testSendJavaMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("订单消息"); order.setContent("订单描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); //添加typeid 与类的全路径 messageProperties.getHeaders().put("__TypeId__", "com.cp.spring.entity.Order"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); }
打印结果:
7.1.3 添加支持java对象多映射转换
修改RabbitMQConfig类
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); //key表示标签 对应一个类的具体全路径。类和标签绑定之后,标签是order,意思就是转换成order类 Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); idClassMapping.put("order", com.cp.spring.entity.Order.class); idClassMapping.put("packaged", com.cp.spring.entity.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); //一层套一层 jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { //json对应Map对象 public void consumeMessage(Order order) { System.err.println("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } public void consumeMessage(Packaged pack) { System.err.println("package对象, 消息内容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); } }
定义一个Packaged对象
public class Packaged { private String id; private String name; private String description; ...省略get/set等方法 }
定义测试方法
@Test public void testSendMappingMessage() throws Exception { ObjectMapper mapper = new ObjectMapper(); Order order = new Order(); order.setId("001"); order.setName("订单消息"); order.setContent("订单描述信息"); String json1 = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json1); MessageProperties messageProperties1 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties1.setContentType("application/json"); //设置的是标签,而不是全路径 messageProperties1.getHeaders().put("__TypeId__", "order"); Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); Packaged pack = new Packaged(); pack.setId("002"); pack.setName("包裹消息"); pack.setDescription("包裹描述信息"); String json2 = mapper.writeValueAsString(pack); System.err.println("pack 4 json: " + json2); MessageProperties messageProperties2 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties2.setContentType("application/json"); //设置的是标签,而不是全路径 messageProperties2.getHeaders().put("__TypeId__", "packaged"); Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); }
打印结果:
在通过单元测试运行testSendMappingMessage()方法时会存在一个问题:委派对象MessageDelegate可能会收不到对象。
因为单元测试spring容器在运行完毕之后就停止,不会等到消费者消费完消息之后再停止,所以需要通过正常启动springboot项目,可以看到正常消费消息。
7.1.4 添加全局转换器
修改RabbitMQConfig类
@Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } //1.4 ext convert MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的转换器:所有小的Converter都可以放到这个大的Converter中 ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); //text走文本转换器 convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); //json走json转换器 Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); //图片走图片转换器 ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); //pdf走pdf转换器 PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { public void handleMessage(byte[] messageBody) { System.err.println("默认方法, 消息内容:" + new String(messageBody)); } public void consumeMessage(byte[] messageBody) { System.err.println("字节数组方法, 消息内容:" + new String(messageBody)); } public void consumeMessage(String messageBody) { System.err.println("字符串方法, 消息内容:" + messageBody); } public void method1(String messageBody) { System.err.println("method1 收到消息内容:" + new String(messageBody)); } public void method2(String messageBody) { System.err.println("method2 收到消息内容:" + new String(messageBody)); } //json对应Map对象 public void consumeMessage(Map messageBody) { System.err.println("map方法, 消息内容:" + messageBody); } public void consumeMessage(Order order) { System.err.println("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } public void consumeMessage(Packaged pack) { System.err.println("package对象, 消息内容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); } public void consumeMessage(File file) { System.err.println("文件对象 方法, 消息内容:" + file.getName()); } }
添加PDFMessageConverter
public class PDFMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------PDF MessageConverter----------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "d:/010_test/" + fileName + ".pdf"; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
添加ImageMessageConverter
public class ImageMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------Image MessageConverter----------"); Object _extName = message.getMessageProperties().getHeaders().get("extName"); String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); //将接受到的图片放到该位置 String path = "d:/010_test/" + fileName + "." + extName; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
定义测试方法
@Test public void testSendExtConverterMessage() throws Exception { // byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png")); // MessageProperties messageProperties = new MessageProperties(); // messageProperties.setContentType("image/png"); // messageProperties.getHeaders().put("extName", "png"); // Message message = new Message(body, messageProperties); // rabbitTemplate.send("", "image_queue", message); byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "pdf_queue", message); }
可以自己测试下图片和pdf的保存。
源码地址:https://gitee.com/573059382/r...
文末
欢迎关注个人微信公众号:Coder编程
获取最新原创技术文章和免费学习资料,更有大量精品思维导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识!
新建了一个qq群:315211365,欢迎大家进群交流一起学习。谢谢了!也可以介绍给身边有需要的朋友。文章收录至
Github: https://github.com/CoderMerli...
Gitee: https://gitee.com/573059382/c...
欢迎关注并star~
参考文章:
《RabbitMQ消息中间件精讲》
推荐文章:
消息中间件——RabbitMQ(六)理解Exchange交换机核心概念!