activeMQ集群的使用与配置

Clustering(集群)

ActiveMQ从多种不同的方面提供了集群的支持。

1、Queueconsumerclusters

ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。

需要注意的是,笔者发现AcitveMQ5.0版本的Queueconsumerclusters存在一个bug:采用AMQMessageStore,运行一个producer,两个consumer,并采用如下的配置文件:

Xml代码

<beans>

<brokerxmlns="http://activemq.org/config/1.0"brokername="BugBroker1"useJmx="true">

<transportConnectors>

<transportConnectoruri="tcp://localhost:61616"/>

</transportConnectors>

<persistenceAdapter>

<amqPersistenceAdapterdirectory="activemq-data/BugBroker1"maxFileLength="32mb"/>

</persistenceAdapter>

</broker>

</beans>

那么经过一段时间后可能会报出如下错误:

ERROR[ActiveMQTransport:tcp:///127.0.0.1:1843-RecoveryListenerAdapter.java:58-RecoveryListenerAdapter]MessageidID:versus-1837-1203915536609-0:2:1:1:419couldnotberecoveredfromthedatastore!

Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现。

2、Brokerclusters

一个常见的场景是有多个JMSbroker,有一个客户连接到其中一个broker。如果这个broker失效,那么客户会自动重新连接到其它的broker。在ActiveMQ中使用failover://协议来实现这个功能。ActiveMQ3.x版本的reliable://协议已经变更为failover://。

如果某个网络上有多个brokers而且客户使用静态发现(使用StaticTransport或FailoverTransport)或动态发现(使用DiscoveryTransport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。然而,standalonebrokers并不了解其它brokers上的consumers,也就是说如果某个broker上没有consumers,那么这个broker上的消息可能会因得不到处理而积压起来。目前的解决方案是使用Networkofbrokers,以便在broker之间存储转发消息。ActiveMQ在未来会有更好的特性,用来在客户端处理这个问题。

从ActiveMQ1.1版本起,ActiveMQ支持networksofbrokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。有两种方式配置Networkofbrokers,一种是使用statictransport,如下:

Xml代码

1.<brokerbrokername="receiver"persistent="false"useJmx="false">

2.<transportConnectors>

3.<transportConnectoruri="tcp://localhost:62002"/>

4.</transportConnectors>

5.<networkConnectors>

6.<networkConnectoruri="static:(tcp://localhost:61616,tcp://remotehost:61616)"/>

7.</networkConnectors>

8.…

9.</broker>

另外一种是使用multicastdiscovery,如下:

Xml代码

1.<brokername="sender"persistent="false"useJmx="false">

2.<transportConnectors>

3.<transportConnectoruri="tcp://localhost:0"discoveryUri="multicast://default"/>

4.</transportConnectors>

5.<networkConnectors>

6.<networkConnectoruri="multicast://default"/>

7.</networkConnectors>

8....

9.</broker>

NetworkConnector有以下属性:

PropertyDefaultValueDescription

namebridgenameofthenetwork-formorethanonenetworkconnectorbetweenthesametwobrokers-usedifferentnamesdynamicOnlyfalseiftrue,onlyforwardmessagesifaconsumerisactiveontheconnectedbrokerdecreaseNetworkConsumerPriorityfalsedecreasethepriorityfordispatchingtoaQueueconsumerthefurtherawayitis(innetworkhops)fromtheproducernetworkTTL1thenumberofbrokersinthenetworkthatmessagesandsubscriptionscanpassthroughconduitSubscriptionstruemultipleconsumerssubscribingtothesamedestinationaretreatedasoneconsumerbythenetworkexcludedDestinationsemptydestinationsmatchingthislistwon'tbeforwardedacrossthenetworkdynamicallyIncludedDestinationsemptydestinationsthatmatchthislistwillbeforwardedacrossthenetworkn.b.anemptylistmeansalldestinationsnotintheexcludedlistwillbeforwardedstaticallyIncludedDestinationsemptydestinationsthatmatchwillalwaysbepassedacrossthenetwork-evenifnoconsumershaveeverregisteredaninterestduplexfalseiftrue,anetworkconnectionwillbeusedtobothproduceANDConsumemessages.Thisisusefulforhubandspokescenarioswhenthehubisbehindafirewalletc.

关于conduitSubscriptions属性,这里稍稍说明一下。设想有两个brokers,分别是brokerA和brokerB,它们之间用forwardingbridge连接。有一个consumer连接到brokerA并订阅queue:Q.TEST。有两个consumers连接到brokerB,也是订阅queue:Q.TEST。这三个consumers有相同的优先级。然后启动一个producer,它发送了30条消息到brokerA。如果conduitSubscriptions=true,那么brokerA上的consumer会得到15条消息,另外15条消息会发送给brokerB。此时负载并不均衡,因为此时brokerA将brokerB上的两个consumers视为一个;如果conduitSubscriptions=false,那么每个consumer上都会收到10条消息。以下是关于NetworkConnector属性的一个例子:

Xml代码

1.<networkConnectors>

2.<networkConnectoruri="static://(tcp://localhost:61617)"

3.name="bridge"dynamicOnly="false"conduitSubscriptions="true"

4.decreaseNetworkConsumerPriority="false">

5.<excludedDestinations>

6.<queuephysicalname="exclude.test.foo"/>

7.<topicphysicalname="exclude.test.bar"/>

8.</excludedDestinations>

9.<dynamicallyIncludedDestinations>

10.<queuephysicalname="include.test.foo"/>

11.<topicphysicalname="include.test.bar"/>

12.</dynamicallyIncludedDestinations>

13.<staticallyIncludedDestinations>

14.<queuephysicalname="always.include.queue"/>

15.<topicphysicalname="always.include.topic"/>

16.</staticallyIncludedDestinations>

17.</networkConnector>

18.</networkConnectors>

3、MasterSlave

在一个网络内运行多个brokers或者standalonebrokers时存在一个问题,这就是消息在物理上只被一个broker持有,因此当某个broker失效,那么你只能等待直到它重启后,这个broker上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。MasterSlave背后的想法是,消息被复制到slavebroker,因此即使masterbroker遇到了像硬件故障之类的错误,你也可以立即切换到slavebroker而不丢失任何消息。

MasterSlave是目前ActiveMQ推荐的高可靠性和容错的解决方案。以下是几种不同的类型:

MasterSlaveTypeRequirementsProsCons

PureMasterSlaveNoneNocentralpointoffailureRequiresmanualrestarttobringbackafailedmasterandcanonlysupport1slaveSharedFileSystemMasterSlaveASharedFilesystemsuchasaSANRunasmanyslavesasrequired.AutomaticrecoveryofoldmastersRequiressharedfilesystemJDBCMasterSlaveAShareddatabaseRunasmanyslavesasrequired.AutomaticrecoveryofoldmastersRequiresashareddatabase.Alsorelativelyslowasitcannotusethehighperformancejournal

3.1PureMasterSlave

PureMasterSlave的工作方式如下:

•Slavebroker消费masterbroker上所有的消息状态,例如消息、确认和事务状态等。只要slavebroker连接到了masterbroker,它不会(也不被允许)启动任何networkconnectors或者transportconnectors,所以唯一的目的就是复制masterbroker的状态。

•Masterbroker只有在消息成功被复制到slavebroker之后才会响应客户。例如,客户的commit请求只有在masterbroker和slavebroker都处理完毕commit请求之后才会结束。

•当masterbroker失效的时候,slavebroker有两种选择,一种是slavebroker启动所有的networkconnectors和transportconnectors,这允许客户端切换到slavebroker;另外一种是slavebroker停止。这种情况下,slavebroker只是复制了masterbroker的状态。

•客户应该使用failovertransport并且应该首先尝试连接masterbroker。例如:

failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false

设置randomize为false就可以让客户总是首先尝试连接masterbroker(slavebroker并不会接受任何连接,直到它成为了masterbroker)。

PureMasterSlave具有以下限制:

•只能有一个slavebroker连接到masterbroker。

•在因masterbroker失效而导致slavebroker成为master之后,之前的masterbroker只有在当前的masterbroker(原slavebroker)停止后才能重新生效。

•Masterbroker失效后而切换到slavebroker后,最安全的恢复masterbroker的方式是人工处理。首先要停止slavebroker(这意味着所有的客户也要停止)。然后把slavebroker的数据目录中所有的数据拷贝到masterbroker的数据目录中。然后重启masterbroker和slavebroker。

Masterbroker不需要特殊的配置。Slavebroker需要进行以下配置

Xml代码:

1.<brokermasterConnectorURI="tcp://masterhost:62001"shutdownOnMasterFailure="false">

2....

3.<transportConnectors>

4.<transportConnectoruri="tcp://slavehost:61616"/>

5.</transportConnectors>

6.</broker>

其中的masterConnectorURI用于指向masterbroker,shutdownOnMasterFailure用于指定slavebroker在masterbroker失效的时候是否需要停止。此外,也可以使用如下配置:

Xml代码

1.<brokerbrokername="slave"useJmx="false"deleteAllMessagesOnStartup="true"xmlns="http://activemq.org/config/1.0">

2....

3.<services>

4.<masterConnectorremoteURI="tcp://localhost:62001"username="user"

password="password"/>

5.</services>

6.</broker>

需要注意的是,笔者认为ActiveMQ5.0版本的PureMasterSlave仍然不够稳定。

3.2SharedFileSystemMasterSlave

如果你使用SAN或者共享文件系统,那么你可以使用SharedFileSystemMasterSlave。基本上,你可以运行多个broker,这些broker共享数据目录。当第一个broker得到文件上的排他锁之后,其它的broker便会在循环中等待获得这把锁。客户端使用failovertransport来连接到可用的broker。当masterbroker失效的时候会释放这把锁,这时候其中一个slavebroker会得到这把锁从而成为masterbroker。以下是ActiveMQ配置的一个例子:

Xml代码

1.<brokeruseJmx="false"xmlns="http://activemq.org/config/1.0">

2.<persistenceAdapter>

3.<journaledJDBCdataDirectory="/sharedFileSystem/broker"/>

4.</persistenceAdapter>

5.…

6.</broker>

3.3JDBCMasterSlave

JDBCMasterSlave的工作原理跟SharedFileSystemMasterSlave类似,只是采用了数据库作为持久化存储。以下是ActiveMQ配置的一个例子:

Xml代码

1.<beans>

2.<brokerxmlns="http://activemq.org/config/1.0"brokername="JdbcMasterBroker">

3....

4.<persistenceAdapter>

5.<jdbcPersistenceAdapterdataSource="#mysql-ds"/>

6.</persistenceAdapter>

8.</broker>

10.<beanid="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">

11.<propertyname="driverClassName"value="com.mysql.jdbc.Driver"/>

12.<propertyname="url"value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>

13.<propertyname="username"value="username"/>

14.<propertyname="password"value="passward"/>

15.<propertyname="poolPreparedStatements"value="true"/>

16.</bean>

17.</beans>

需要注意的是,如果你使用MySQL数据库,需要首先执行以下三条语句:(Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现)

Sql代码

1.ALTERTABLEactivemq_acksENGINE=InnoDB;

2.ALTERTABLEactivemq_lockENGINE=InnoDB;

3.ALTERTABLEactivemq_msgsENGINE=InnoDB;

相关推荐