Activemq持久化消息到MySql数据库中

1.添加MySql数据源

       打开Activemq安装目录下的conf/activemq.xml文件,添加MySql数据源。默认情况下Activemq使用KahaDB存储,注解掉KahaDB配置,改为mysql配置如下:

<!--

<persistenceAdapter>

<kahaDBdirectory="${activemq.base}/data/kahadb"/>

</persistenceAdapter>

-->

<persistenceAdapter>

<jdbcPersistenceAdapterdataSource="#MySQL-DS"/>

</persistenceAdapter>

该配置表示,我们将要使用名称为“MySQL-DS”的作为mysql数据源。

2. 配置MySql数据源

      在</broker>节点后面,增加MySQL数据源配置:

<!-- MySQL DataSource -->

<beanid="MySQL-DS"class="org.apache.commons.dbcp.BasicDataSource"destroy-ethod="close">

<propertyname="driverClassName"value="com.mysql.jdbc.Driver"/>

<propertyname="url"value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/>

<propertyname="username"value="用户名"/>

<propertyname="password"value="登录密码"/>

<propertyname="poolPreparedStatements"value="true"/>

</bean>

此处配置类似于spring的Bean配置,id 要与上面的保持一致。

3. 配置完成后,将mysql驱动包加入到 Activemq 的 lib目录下,启动Activemq。查看activemq数据库会有三张表:activemq_acks  ,activemq_lock  ,activemq_msgs 

数据表activemq_msgs即为持久化消息表;

整个activemq持久消息到mysql数据库配置如下:

 

<?xml version="1.0" encoding="UTF-8"?>

<beansxmlns="http://www.springframework.org/schema/beans"xmlns:amq="http://activemq.apache.org/schema/core"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd

http://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core.xsd">

<!--Allowsustousesystempropertiesasvariablesinthisconfigurationfile-->

<beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

<propertyname="locations">

<value>classpath:/META-INF/credentials.properties</value>

</property>

</bean>

<!--

The<broker>elementisusedtoconfiguretheActiveMQbroker.

-->

<brokerxmlns="http://activemq.apache.org/schema/core"brokername="localhost">

<!--

ForbetterperformancesuseVMcursorandsmallmemorylimit.Formoreinformation,see:http://activemq.apache.org/message-cursors.htmlAlso,ifyourproduceris"hanging",it'sprobablyduetoproducerflowcontrol.Formoreinformation,see:

http://activemq.apache.org/producer-flow-control.html

-->

<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntrytopic=">"producerFlowControl="true"memoryLimit="1mb">

<pendingSubscriberPolicy>

<vmCursor/>

</pendingSubscriberPolicy>

</policyEntry>

<policyEntryqueue=">"producerFlowControl="true"memoryLimit="1mb">

<!--

UseVMcursorforbetterlatencyFormoreinformation,see:http://activemq.apache.org/message-cursors.html<pendingQueuePolicy><vmQueueCursor/></pendingQueuePolicy>

-->

</policyEntry>

</policyEntries>

</policyMap>

</destinationPolicy>

<!--

ThemanagementContextisusedtoconfigurehowActiveMQisexposedin

JMX.Bydefault,ActiveMQusestheMBeanserverthatisstartedby

theJVM.Formoreinformation,see:

http://activemq.apache.org/jmx.html

-->

<managementContext>

<managementContextcreateConnector="false"/>

</managementContext>

<!--

Configuremessagepersistenceforthebroker.Thedefaultpersistence

mechanismistheKahaDBstore(identifiedbythekahaDBtag).

Formoreinformation,see:

http://activemq.apache.org/persistence.html

-->

<!--

<persistenceAdapter>

<kahaDBdirectory="${activemq.base}/data/kahadb"/>

</persistenceAdapter>

-->

<persistenceAdapter>

<jdbcPersistenceAdapterdataSource="#MySQL-DS"/>

</persistenceAdapter>

<!--

ThetransportconnectorsexposeActiveMQoveragivenprotocolto

clientsandotherbrokers.Formoreinformation,see:

http://activemq.apache.org/configuring-transports.html

-->

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616"/>

</transportConnectors>

</broker>

<!--MySQLDataSource-->

<beanid="MySQL-DS"class="org.apache.commons.dbcp.BasicDataSource"destroy-thod="close">

<propertyname="driverClassName"value="com.mysql.jdbc.Driver"/>

<propertyname="url"value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/>

<propertyname="username"value="用户名"/>

<propertyname="password"value="登录密码"/>

<propertyname="poolPreparedStatements"value="true"/>

</bean>

<!--

Enablewebconsoles,RESTandAjaxAPIsanddemos

ItalsoincludesCamel(withitswebconsole),see${ACTIVEMQ_HOME}/conf/camel.xmlformoreinfo

Takealookat${ACTIVEMQ_HOME}/conf/jetty.xmlformoredetails

-->

<importresource="jetty.xml"/>

</beans>

相关推荐