ActiveMQ+Mysql持久化存储
本文简单介绍ActiveMQ使用Mysql数据库实现消息的持久化存储
一、ActiveMQ配置修改
二、代码示例
一、ActiveMQ配置修改
对于ActiveMQ需要保证消息的可靠性,需要持久化进行存储,默认情况下使用kahadb进行数据的默认持久化存储技术,同时也可以使用leveldb、mysql、oracle
此次,使用mysql对消息进行持久化操作。
1、active.xml文件的修改
(1)数据源配置信息
<persistenceAdapter> <!-- 默认使用kahadb进行持久化操作,保证消息的可靠性 --> <kahaDB directory="${activemq.data}/kahadb"/> --> <!-- 当前使用mysql进行数据的持久化操作 --> <jdbcPersistenceAdapter dataSource="#mysql-ds" /> </persistenceAdapter>
<!-- 设置mysql数据源的配置信息 --> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="***"/> <property name="maxActive" value="200" /> <property name="poolPreparedStatements" value="true" /> </bean>
(2)消息队列中消息的优先级
<policyEntries> <policyEntry topic=">" > <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> <!-- 指定队列中消息的优先级 queue 自定义的队列的名称--> <policyEntry queue="persistMysql" prioritizedMessages="true"/> </policyEntries>
消息优先级说明:
消息的优先级有0-9十个级别的优先级,0-4为普通的消息,5-9为加急消息,如果不指定优先级,默认为4。JMS不严格按照这十个优先级发送消息,但必须保证单次加急消息要先于普通消息到达,并不能保证顺序消费机制。
2、ActiveMQ安转目录lib下需要添加如下jar
mysql-connector-java-5.1.45.jar
commons-dbcp-1.4.jar
commons-pool-1.5.4.jar
二、代码示例
(1)生产者
package com.chinasoft.activemqv1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ消息的生产者 * * 存储技术使用Mysql保证数据的可靠性 * * @author Freedom * */ public class Producer { private ConnectionFactory f = null; private Connection c = null; private Session session = null; private Destination d = null; private MessageProducer p = null; public Producer() { try { f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); c = f.createConnection(); // 创建连接对象 // 创建好连接对象后要打开连接 c.start(); session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); d = session.createQueue("persistMysql"); p = session.createProducer(null); // 发送消息时指定消息的目的地 } catch (JMSException e) { e.printStackTrace(); } } public void sender() { // 创建一个 MapMessage类型的消息 try { MapMessage msg = session.createMapMessage(); msg.setStringProperty("name", "cc");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg.setIntProperty("age", 26); msg.setIntProperty("salary", 5600); MapMessage msg1 = session.createMapMessage(); msg1.setStringProperty("name", "zs");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg1.setIntProperty("age", 22); msg1.setIntProperty("salary", 4000); MapMessage msg2 = session.createMapMessage(); msg2.setStringProperty("name", "lsi");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg2.setIntProperty("age", 21); msg2.setIntProperty("salary", 9100); MapMessage msg3 = session.createMapMessage(); msg3.setStringProperty("name", "nb");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg3.setIntProperty("age", 19); msg3.setIntProperty("salary", 3600); MapMessage msg4 = session.createMapMessage(); msg4.setStringProperty("name", "ww");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg4.setIntProperty("age", 27); msg4.setIntProperty("salary", 7600); // 生产者发送消息 // 默认情况,数据时需要进行持久化操作,可以指定DeliveryMode不进行初始化操作 // 2指定消息的优先级,需要active.xml配置 <policyEntity /> p.send(d, msg1, DeliveryMode.PERSISTENT, 2, 1000 * 60 * 1L); p.send(d, msg2); p.send(d, msg3); p.send(d, msg4); } catch (JMSException e) { e.printStackTrace(); } finally { if (c != null) { try { c.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) { Producer pro = new Producer(); pro.sender(); } }
(2)消费者
消费者中使用了消息监听机制,监听MQ上的消息,并处理消息
package com.chinasoft.activemqv1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ消费者 * * @author Freedom * */ public class Consumer { private static final String SELECTOR = "age>21 and salary>4000"; private ConnectionFactory f = null; private Connection c = null; private Session session = null; private Destination d = null; private MessageConsumer mc = null; public Consumer() { try { f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); c = f.createConnection(); c.start(); session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); d = session.createQueue("persistMysql"); mc = session.createConsumer(d, SELECTOR);// 第二个参数为一个selctor选择器用于筛选数据,满足SQL92规范 } catch (JMSException e) { e.printStackTrace(); } } public void recevice() { // 消费端创建一个监听类,监听MQ上消息并读取消息 try { mc.setMessageListener(new MessageListener() { @Override public void onMessage(Message m) { if (m instanceof MapMessage) { System.out.println("消费者接受到的消息**** " + m); } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { Consumer c = new Consumer(); c.recevice(); } }
执行完成,打开数据库表,会生成三个表
xxx_msgs表记录生产者发送的消息信息,如果消费者消费完成,则会清空表中的消息
相关推荐
胡献根 2020-04-10
姚强 2020-03-01
Java高知 2020-02-15
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20