Activemq持久化消息到MySql数据库中
1.添加MySql数据源
打开Activemq安装目录下的conf/activemq.xml文件,添加MySql数据源。默认情况下Activemq使用KahaDB存储,注解掉KahaDB配置,改为mysql配置如下:
<!-- <persistenceAdapter> <kahaDBdirectory="${activemq.base}/data/kahadb"/> </persistenceAdapter> --> <persistenceAdapter> <jdbcPersistenceAdapterdataSource="#MySQL-DS"/> </persistenceAdapter> |
该配置表示,我们将要使用名称为“MySQL-DS”的作为mysql数据源。
2. 配置MySql数据源
在</broker>节点后面,增加MySQL数据源配置:
<!-- MySQL DataSource --> <beanid="MySQL-DS"class="org.apache.commons.dbcp.BasicDataSource"destroy-ethod="close"> <propertyname="driverClassName"value="com.mysql.jdbc.Driver"/> <propertyname="url"value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/> <propertyname="username"value="用户名"/> <propertyname="password"value="登录密码"/> <propertyname="poolPreparedStatements"value="true"/> </bean> |
此处配置类似于spring的Bean配置,id 要与上面的保持一致。
3. 配置完成后,将mysql驱动包加入到 Activemq 的 lib目录下,启动Activemq。查看activemq数据库会有三张表:activemq_acks ,activemq_lock ,activemq_msgs
数据表activemq_msgs即为持久化消息表;
整个activemq持久消息到mysql数据库配置如下:
<?xml version="1.0" encoding="UTF-8"?> <beansxmlns="http://www.springframework.org/schema/beans"xmlns:amq="http://activemq.apache.org/schema/core"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core.xsd"> <!--Allowsustousesystempropertiesasvariablesinthisconfigurationfile--> <beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <propertyname="locations"> <value>classpath:/META-INF/credentials.properties</value> </property> </bean> <!-- The<broker>elementisusedtoconfiguretheActiveMQbroker. --> <brokerxmlns="http://activemq.apache.org/schema/core"brokername="localhost"> <!-- ForbetterperformancesuseVMcursorandsmallmemorylimit.Formoreinformation,see:http://activemq.apache.org/message-cursors.htmlAlso,ifyourproduceris"hanging",it'sprobablyduetoproducerflowcontrol.Formoreinformation,see: http://activemq.apache.org/producer-flow-control.html --> <destinationPolicy> <policyMap> <policyEntries> <policyEntrytopic=">"producerFlowControl="true"memoryLimit="1mb"> <pendingSubscriberPolicy> <vmCursor/> </pendingSubscriberPolicy> </policyEntry> <policyEntryqueue=">"producerFlowControl="true"memoryLimit="1mb"> <!-- UseVMcursorforbetterlatencyFormoreinformation,see:http://activemq.apache.org/message-cursors.html<pendingQueuePolicy><vmQueueCursor/></pendingQueuePolicy> --> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- ThemanagementContextisusedtoconfigurehowActiveMQisexposedin JMX.Bydefault,ActiveMQusestheMBeanserverthatisstartedby theJVM.Formoreinformation,see: http://activemq.apache.org/jmx.html --> <managementContext> <managementContextcreateConnector="false"/> </managementContext> <!-- Configuremessagepersistenceforthebroker.Thedefaultpersistence mechanismistheKahaDBstore(identifiedbythekahaDBtag). Formoreinformation,see: http://activemq.apache.org/persistence.html --> <!-- <persistenceAdapter> <kahaDBdirectory="${activemq.base}/data/kahadb"/> </persistenceAdapter> --> <persistenceAdapter> <jdbcPersistenceAdapterdataSource="#MySQL-DS"/> </persistenceAdapter> <!-- ThetransportconnectorsexposeActiveMQoveragivenprotocolto clientsandotherbrokers.Formoreinformation,see: http://activemq.apache.org/configuring-transports.html --> <transportConnectors> <transportConnectorname="openwire"uri="tcp://0.0.0.0:61616"/> </transportConnectors> </broker> <!--MySQLDataSource--> <beanid="MySQL-DS"class="org.apache.commons.dbcp.BasicDataSource"destroy-thod="close"> <propertyname="driverClassName"value="com.mysql.jdbc.Driver"/> <propertyname="url"value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/> <propertyname="username"value="用户名"/> <propertyname="password"value="登录密码"/> <propertyname="poolPreparedStatements"value="true"/> </bean> <!-- Enablewebconsoles,RESTandAjaxAPIsanddemos ItalsoincludesCamel(withitswebconsole),see${ACTIVEMQ_HOME}/conf/camel.xmlformoreinfo Takealookat${ACTIVEMQ_HOME}/conf/jetty.xmlformoredetails --> <importresource="jetty.xml"/> </beans> |