消息队列学习(二)
今天我们以ActiveMQ为例进行一系列的学习
Apache ActiveMQ是Apache软件基金会所研发的开放源码消息中间件;由于ActiveMQ是一个纯Java框架,那么我们先要了解下java中的JMS。
JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS基本构成
1、连接工厂
连接工厂是客户用来创建连接的对象,例如 ActiveMQ 提供的
ActiveMQConnectionFactory。
2、 连接
JMS Connection 封装了客户与 JMS 提供者之间的一个虚拟的连接。
3、会话
JMS Session 是生产和消费消息的一个单线程上下文。会话用于创建消息生
产者(producer)、消息消费者(consumer)和消息(message)等。会话提供
了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子
操作中。
4、 目的地
目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。
JMS1.0.2 规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅
消息传递域。
5、消息生产者
消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。
6、消息消费者
消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。
7、消息
JMS 消息由以下三部分组成:
• 消息头。每个消息头字段都有相应的 getter 和 setter 方法。
• 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。
• 消息体。JMS 定义的消息类型有 TextMessage、MapMessage、BytesMessage、
StreamMessage 和 ObjectMessage。
来看下具体代码:
生产者
// 1.初始化connection工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKE_URL); // 2.创建Connection connection = factory.createConnection(); // 3.打开连接 connection.start(); // 4.创建session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 5.创建消息目的地 Destination destination = session.createQueue(DESTINATION); //6.创建生产者 MessageProducer producer = session.createProducer(destination); //7.配置消息持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //8.发送消息 sendMessage(session, producer); //9.事务提交 session.commit();
消费者
// 1.初始化connection工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 2.创建Connection connection = connectionFactory.createConnection(); // 3.打开连接 connection.start(); // 4.创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建消息目标 Destination destination = session.createTopic("YD"); //6.创建消费者 MessageConsumer consumer = session.createConsumer(destination); //7.配置监听 consumer.setMessageListener(this);