activeMQ集群的使用与配置
Clustering(集群)
ActiveMQ从多种不同的方面提供了集群的支持。
1、Queueconsumerclusters
ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。
需要注意的是,笔者发现AcitveMQ5.0版本的Queueconsumerclusters存在一个bug:采用AMQMessageStore,运行一个producer,两个consumer,并采用如下的配置文件:
Xml代码
<beans>
<brokerxmlns="http://activemq.org/config/1.0"brokername="BugBroker1"useJmx="true">
<transportConnectors>
<transportConnectoruri="tcp://localhost:61616"/>
</transportConnectors>
<persistenceAdapter>
<amqPersistenceAdapterdirectory="activemq-data/BugBroker1"maxFileLength="32mb"/>
</persistenceAdapter>
</broker>
</beans>
那么经过一段时间后可能会报出如下错误:
ERROR[ActiveMQTransport:tcp:///127.0.0.1:1843-RecoveryListenerAdapter.java:58-RecoveryListenerAdapter]MessageidID:versus-1837-1203915536609-0:2:1:1:419couldnotberecoveredfromthedatastore!
Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现。
2、Brokerclusters
一个常见的场景是有多个JMSbroker,有一个客户连接到其中一个broker。如果这个broker失效,那么客户会自动重新连接到其它的broker。在ActiveMQ中使用failover://协议来实现这个功能。ActiveMQ3.x版本的reliable://协议已经变更为failover://。
如果某个网络上有多个brokers而且客户使用静态发现(使用StaticTransport或FailoverTransport)或动态发现(使用DiscoveryTransport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。然而,standalonebrokers并不了解其它brokers上的consumers,也就是说如果某个broker上没有consumers,那么这个broker上的消息可能会因得不到处理而积压起来。目前的解决方案是使用Networkofbrokers,以便在broker之间存储转发消息。ActiveMQ在未来会有更好的特性,用来在客户端处理这个问题。
从ActiveMQ1.1版本起,ActiveMQ支持networksofbrokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。有两种方式配置Networkofbrokers,一种是使用statictransport,如下:
Xml代码
1.<brokerbrokername="receiver"persistent="false"useJmx="false">
2.<transportConnectors>
3.<transportConnectoruri="tcp://localhost:62002"/>
4.</transportConnectors>
5.<networkConnectors>
6.<networkConnectoruri="static:(tcp://localhost:61616,tcp://remotehost:61616)"/>
7.</networkConnectors>
8.…
9.</broker>
另外一种是使用multicastdiscovery,如下:
Xml代码
1.<brokername="sender"persistent="false"useJmx="false">
2.<transportConnectors>
3.<transportConnectoruri="tcp://localhost:0"discoveryUri="multicast://default"/>
4.</transportConnectors>
5.<networkConnectors>
6.<networkConnectoruri="multicast://default"/>
7.</networkConnectors>
8....
9.</broker>
NetworkConnector有以下属性:
PropertyDefaultValueDescription
namebridgenameofthenetwork-formorethanonenetworkconnectorbetweenthesametwobrokers-usedifferentnamesdynamicOnlyfalseiftrue,onlyforwardmessagesifaconsumerisactiveontheconnectedbrokerdecreaseNetworkConsumerPriorityfalsedecreasethepriorityfordispatchingtoaQueueconsumerthefurtherawayitis(innetworkhops)fromtheproducernetworkTTL1thenumberofbrokersinthenetworkthatmessagesandsubscriptionscanpassthroughconduitSubscriptionstruemultipleconsumerssubscribingtothesamedestinationaretreatedasoneconsumerbythenetworkexcludedDestinationsemptydestinationsmatchingthislistwon'tbeforwardedacrossthenetworkdynamicallyIncludedDestinationsemptydestinationsthatmatchthislistwillbeforwardedacrossthenetworkn.b.anemptylistmeansalldestinationsnotintheexcludedlistwillbeforwardedstaticallyIncludedDestinationsemptydestinationsthatmatchwillalwaysbepassedacrossthenetwork-evenifnoconsumershaveeverregisteredaninterestduplexfalseiftrue,anetworkconnectionwillbeusedtobothproduceANDConsumemessages.Thisisusefulforhubandspokescenarioswhenthehubisbehindafirewalletc.
关于conduitSubscriptions属性,这里稍稍说明一下。设想有两个brokers,分别是brokerA和brokerB,它们之间用forwardingbridge连接。有一个consumer连接到brokerA并订阅queue:Q.TEST。有两个consumers连接到brokerB,也是订阅queue:Q.TEST。这三个consumers有相同的优先级。然后启动一个producer,它发送了30条消息到brokerA。如果conduitSubscriptions=true,那么brokerA上的consumer会得到15条消息,另外15条消息会发送给brokerB。此时负载并不均衡,因为此时brokerA将brokerB上的两个consumers视为一个;如果conduitSubscriptions=false,那么每个consumer上都会收到10条消息。以下是关于NetworkConnector属性的一个例子:
Xml代码
1.<networkConnectors>
2.<networkConnectoruri="static://(tcp://localhost:61617)"
3.name="bridge"dynamicOnly="false"conduitSubscriptions="true"
4.decreaseNetworkConsumerPriority="false">
5.<excludedDestinations>
6.<queuephysicalname="exclude.test.foo"/>
7.<topicphysicalname="exclude.test.bar"/>
8.</excludedDestinations>
9.<dynamicallyIncludedDestinations>
10.<queuephysicalname="include.test.foo"/>
11.<topicphysicalname="include.test.bar"/>
12.</dynamicallyIncludedDestinations>
13.<staticallyIncludedDestinations>
14.<queuephysicalname="always.include.queue"/>
15.<topicphysicalname="always.include.topic"/>
16.</staticallyIncludedDestinations>
17.</networkConnector>
18.</networkConnectors>
3、MasterSlave
在一个网络内运行多个brokers或者standalonebrokers时存在一个问题,这就是消息在物理上只被一个broker持有,因此当某个broker失效,那么你只能等待直到它重启后,这个broker上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。MasterSlave背后的想法是,消息被复制到slavebroker,因此即使masterbroker遇到了像硬件故障之类的错误,你也可以立即切换到slavebroker而不丢失任何消息。
MasterSlave是目前ActiveMQ推荐的高可靠性和容错的解决方案。以下是几种不同的类型:
MasterSlaveTypeRequirementsProsCons
PureMasterSlaveNoneNocentralpointoffailureRequiresmanualrestarttobringbackafailedmasterandcanonlysupport1slaveSharedFileSystemMasterSlaveASharedFilesystemsuchasaSANRunasmanyslavesasrequired.AutomaticrecoveryofoldmastersRequiressharedfilesystemJDBCMasterSlaveAShareddatabaseRunasmanyslavesasrequired.AutomaticrecoveryofoldmastersRequiresashareddatabase.Alsorelativelyslowasitcannotusethehighperformancejournal
3.1PureMasterSlave
PureMasterSlave的工作方式如下:
•Slavebroker消费masterbroker上所有的消息状态,例如消息、确认和事务状态等。只要slavebroker连接到了masterbroker,它不会(也不被允许)启动任何networkconnectors或者transportconnectors,所以唯一的目的就是复制masterbroker的状态。
•Masterbroker只有在消息成功被复制到slavebroker之后才会响应客户。例如,客户的commit请求只有在masterbroker和slavebroker都处理完毕commit请求之后才会结束。
•当masterbroker失效的时候,slavebroker有两种选择,一种是slavebroker启动所有的networkconnectors和transportconnectors,这允许客户端切换到slavebroker;另外一种是slavebroker停止。这种情况下,slavebroker只是复制了masterbroker的状态。
•客户应该使用failovertransport并且应该首先尝试连接masterbroker。例如:
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false
设置randomize为false就可以让客户总是首先尝试连接masterbroker(slavebroker并不会接受任何连接,直到它成为了masterbroker)。
PureMasterSlave具有以下限制:
•只能有一个slavebroker连接到masterbroker。
•在因masterbroker失效而导致slavebroker成为master之后,之前的masterbroker只有在当前的masterbroker(原slavebroker)停止后才能重新生效。
•Masterbroker失效后而切换到slavebroker后,最安全的恢复masterbroker的方式是人工处理。首先要停止slavebroker(这意味着所有的客户也要停止)。然后把slavebroker的数据目录中所有的数据拷贝到masterbroker的数据目录中。然后重启masterbroker和slavebroker。
Masterbroker不需要特殊的配置。Slavebroker需要进行以下配置
Xml代码:
1.<brokermasterConnectorURI="tcp://masterhost:62001"shutdownOnMasterFailure="false">
2....
3.<transportConnectors>
4.<transportConnectoruri="tcp://slavehost:61616"/>
5.</transportConnectors>
6.</broker>
其中的masterConnectorURI用于指向masterbroker,shutdownOnMasterFailure用于指定slavebroker在masterbroker失效的时候是否需要停止。此外,也可以使用如下配置:
Xml代码
1.<brokerbrokername="slave"useJmx="false"deleteAllMessagesOnStartup="true"xmlns="http://activemq.org/config/1.0">
2....
3.<services>
4.<masterConnectorremoteURI="tcp://localhost:62001"username="user"
password="password"/>
5.</services>
6.</broker>
需要注意的是,笔者认为ActiveMQ5.0版本的PureMasterSlave仍然不够稳定。
3.2SharedFileSystemMasterSlave
如果你使用SAN或者共享文件系统,那么你可以使用SharedFileSystemMasterSlave。基本上,你可以运行多个broker,这些broker共享数据目录。当第一个broker得到文件上的排他锁之后,其它的broker便会在循环中等待获得这把锁。客户端使用failovertransport来连接到可用的broker。当masterbroker失效的时候会释放这把锁,这时候其中一个slavebroker会得到这把锁从而成为masterbroker。以下是ActiveMQ配置的一个例子:
Xml代码
1.<brokeruseJmx="false"xmlns="http://activemq.org/config/1.0">
2.<persistenceAdapter>
3.<journaledJDBCdataDirectory="/sharedFileSystem/broker"/>
4.</persistenceAdapter>
5.…
6.</broker>
3.3JDBCMasterSlave
JDBCMasterSlave的工作原理跟SharedFileSystemMasterSlave类似,只是采用了数据库作为持久化存储。以下是ActiveMQ配置的一个例子:
Xml代码
1.<beans>
2.<brokerxmlns="http://activemq.org/config/1.0"brokername="JdbcMasterBroker">
3....
4.<persistenceAdapter>
5.<jdbcPersistenceAdapterdataSource="#mysql-ds"/>
6.</persistenceAdapter>
8.</broker>
10.<beanid="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">
11.<propertyname="driverClassName"value="com.mysql.jdbc.Driver"/>
12.<propertyname="url"value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>
13.<propertyname="username"value="username"/>
14.<propertyname="password"value="passward"/>
15.<propertyname="poolPreparedStatements"value="true"/>
16.</bean>
17.</beans>
需要注意的是,如果你使用MySQL数据库,需要首先执行以下三条语句:(Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现)
Sql代码
1.ALTERTABLEactivemq_acksENGINE=InnoDB;
2.ALTERTABLEactivemq_lockENGINE=InnoDB;
3.ALTERTABLEactivemq_msgsENGINE=InnoDB;