ActiveMQ
1、消息中间件
1、通讯方式
1、点对点
2、发布订阅
2、JMS
1、jms
JMS是java消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
2、消费模型
1、点对点
1、模型:
生产者---消息队列---消费者
2、发布订阅
1、模型:
生产者---主题---消费者
3、ActiveMQ使用
1、依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
2、生产者
3、消费者
4、ActiveMQ持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT)
5、JMS可靠消息
1、ActiveMQ消息签收机制
客戶端成功接收一条消息的标志是一条消息被签收,成功应答。
2、ActiveMQ可靠消息保证
1、自动签收(拿到消息就告诉MQ签收成功)---无事务机制
2、事务消息
生产者:完成消息发送后,提交事务至队列。
消费者:获取到事务消息,如果消费者没有提交事务,默认表示没有进行消费。
3、手动签收
手动签收消息
3、代码
1、自动签收
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
2、手动签收
Session session = conn.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
消费者手动签收消息:
msg.acknowledge();
3、事务消息
Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
生产者:
session.commit()
消费者:
session.commit()
6、SpringBoot整合ActiveMQ
1、依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2、生产者
1、配置文件
spring:
activemq:
broker-url: tcp://192.168.52.128:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8080
2、配置类
@Configuration @Bean // 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
|
3、生产者类
@Component } |
3、消费者
@Component |
7、ActiveMQ注意事项
1、消费者抛出异常,默认会自动重试
2、若消费者代码问题,记录日志及报文信息,采用人工补偿,不进行ActiveMQ自动重试
3、消费者幂等性问题:使用全局ID区分消息