ActiveMQ中的NetworkConnector(网络连接器)详解
注:本文以ActiveMQ5.10版本为基础。
我们知道,ActiveMQ中的TransportConnector(传输连接器)主要用于配置ActiveMQ服务端和客户端之间的通信方式,NetworkConnector(网络连接器)则主要用来配置ActiveMQ服务端与服务端之间的通信。在某些场景,网络拓扑中我们可能会需要大量的生产者和消费者,也就是说我们会有大量的ActiveMQ客户端,这样,我们就需要在网络环境中有多个MQ服务器,服务器之间是可以透传消息的,这是,我们就需要用到NetworkConnector,我们先从简单的分析起,假设我们需要部署两台MQ服务器,如下图:
使用NetworkConnector的简单场景
如上图所示,服务器S1和S2通过NewworkConnector相连,则生产者P1发送消息,消费者C3和C4都可以接收到,而生产者P3发送的消息,消费者C1和C2同样也可以接收到,要使用NewworkConnector的功能,需要在服务器S1的activemq.xml中的broker节点下添加如下配置(注:10.79.11.172:61617为S2的地址):
<networkConnectors> <br> <networkConnector uri="
static:(tcp://10.79.11.172:61617)
"/> <br></networkConnectors>
<networkConnectors> <br> <networkConnector uri="
static:(tcp://10.79.11.171
:61617)
"/> <br></networkConnectors>
<networkConnectors> <br> <networkConnector uri="
static:(tcp://10.79.11.171
:61617,tcp://10.79.11.173<code class="xml spaces">:61617,tcp://10.79.11.174<code class="xml spaces">:61617
)"/> <br></networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
decreaseNetworkConsumerPriority:默认值为false,如果该值为false,网络上的消费者和本地的消费者有相同的优先级,就是说一个消息发送到本地的消费者和其他的网络消费者有相同的概率。如果该值为true,则其他网络消费者的优先级从-5开始算起,根据网络跳数(network hops)来算优先级,当然是越“远”优先级越低。
networkTTL:默认值为1,该值表示消息或订阅可以通过的消息提供者(brokers)的数量。拿上图来说,假设P1向S1发了条消息,另外有个ActiveMQ服务器S3(上图未画出)与S2相互桥接,但不直接与S1桥接,因为该值为1,即使S3上有该队列的消费者或主题的订阅者,但对于S1来说S3这些网络消费者和订阅者是不可见的,所以P1发的这条消息就不能被S3上的消费者给消费,如果想要让S3上的消费者也有机会消费,可以让S3和S1桥接,或者将S1的networkTTL参数配置为2.
messageTTL:默认为1,从5.9版本开始有效,表示消息能通过的消息提供者数量,该值可以覆盖掉networkTTL中对消息通过数量的限制。
consumerTTL:默认为1,从5.9版本开始有效,表示订阅能通过的消息提供者数量,该值可以覆盖掉networkTTL中对订阅通过数量的限制。
conduitSubscriptions:默认为true。该值表示多个消费者订阅一个相同的目的地是否在网络上被对待为一个消费者。以上图为例,如果采用默认的分配策略,P1连续发120条消息,结果应该是C1和C2都收到40条,而C3和C4共收到40条(C3和C4平分这40条),为什么不是C1、C2、C3和C4都收到大概30条呢?因为该值为true,C3和C4对于S1来说只是一个消费者,而不是两个。注意,如果你的多个网络提供者中的消费者使用了消息选择器(selector),你又设置了conduitSubscriptions为true,则有可能会导致消息不会被发送到正确的网络消费者从而消息不被消费,因为网络消费者中的消息选择器对生产者的提供者来说是透明的,所以此种情况下,请将conduitSubscriptions设置为false。
excludedDestinations:默认为空,用来配置不会在网络中转发的目的地(Destination)。
dynamicallyIncludedDestinations:默认为空,用来配置可以在网络中转发的目的地,和上面的excludedDestinations的功能相反。例如:
<networkConnector uri="static:(tcp://host)">
<dynamicallyIncludedDestinations>
<queue physicalname="queue1"/>
<topic physicalname="topic1"/>
</dynamicallyIncludedDestinations>
</networkConnector>
staticallyIncludedDestinations:默认为空,用来配置可以在网络中转发的目的地,但它和dynamicallyIncludedDestinations不同的是dynamicallyIncludedDestinations只在对方有该目的地的消费者时才将消息转发给对方,staticallyIncludedDestinations则不管对方有没有该目的地都将消息转发给对方。
举例配置如下:
<networkConnectors>
<networkConnector uri="static:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61618)" >
<staticallyIncludedDestinations>
<queue physicalname="TopicTestQueue1"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
duplex:默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接),如果该值设置为true,则一个连接上可以双向流动消息(双工连接)。可以想象,单工连接比双工连接吞吐量要高,但增加了连接数量方面的开销。如果在使用双工连接时为了增加吞吐量,可以建立多个双工连接,但每个连接必须起不同的名字,例如:
<networkConnectors>
<networkConnector name="SYSTEM1" duplex="true" uri="static:(tcp://10.79.6.56:61616)">
<dynamicallyIncludedDestinations>
<topic physicalname="outgoing.System1" />
</dynamicallyIncludedDestinations>
</networkConnector>
<networkConnector name="SYSTEM2" duplex="true" uri="static:(tcp://10.79.6.56:61616)">
<dynamicallyIncludedDestinations>
<topic physicalname="outgoing.System2"/>
</dynamicallyIncludedDestinations>
</networkConnector>
</networkConnectors>
prefetchSize:默认是1000,指网络连接中的消费者的预读数量,该值必须大于0,因为网络消费者不能采用poll来获取消息(轮询消息)。
suppressDuplicateQueueSubscriptions:(从5.3版本开始有效)默认为false,如果该参数为true,会防止提供者之间相互桥接时的队列重复订阅的问题,其实该参数为true和false只是ActiveMQ代码中的微妙的性能变化,对实际的集群结果并不会带来什么影响。
bridgeTempDestinations:默认值为true,该值用来指定在MQ服务器在网络中创建临时目的地(temp destinations)时是否需要广播公告消息(broadcast advisory messages )。如果设置为false,表示禁用此功能,则当生产者和消费者不是连接到同一个MQ服务器时,可以减少网络开销。
alwaysSyncSend:(从5.6版本开始有效)默认为false,表示是否总是异步发送。默认时,非持久化消息被发送到远程的MQ服务器时将采用请求/应答方式而不是oneway方式。此参数对待持久化和非持久化消息相同。
staticBridge:(从5.6版本开始有效)默认为false,如果设置为true,MQ服务器将不会动态应答新的消费者,也意味着该提供者将对其他远程提供者的主题不感兴趣。当设置为true是,我们可以使用staticallyIncludedDestinations来创建配置需要订阅的主题,例如:
<networkConnector uri="static:(tcp://host)" staticBridge="true">
<staticallyIncludedDestinations>
<queue physicalname="always.include.queue"/>
</staticallyIncludedDestinations>
</networkConnector>
消息如果一开始就是持久化的化的或者是被持久订阅的,则在网络经纪人中传播时也会保持这种特性,然而,如果消息一开始就不是持久化的或者不是持久订阅的,则在网络的MQ服务器中传播时也一定不会有这些特性。
总的消息顺序在网络经纪人中是不保证的,当然,即使不是网络经纪人,在单个经纪人环境下,有多个消费者时,消息被消费的顺序也是不保证的。
如果你希望提供者不受任何远程提供者上消费者的影响,或者希望不管远程提供者上是否有消费者都将所有消息转发到远程提供者,那么你可以考虑使用静态网络(static networks)。
就如前面你说看到的,如果你想增加吞吐量,或者想采用不同的通信方式(如tcp或者nio),又或者你想采用更灵活的配置,两个提供者之间可以由多个NetworkConnector相连, 例如,如果使用分布式队列(distributed queues),你可能希望在通过网络对队列接收者能采用等效加权(equivalent weighting),但只针对活跃的接收者,你可以如下配置:
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)" name="queues_only" conduitSubscriptions="false" decreaseNetworkConsumerPriority="false">
<excludedDestinations>
<topic physicalname=">"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<policyMap>
<policyEntries>
<policyEntry queue="TEST.>" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
感兴趣的童鞋可以去看看官方文档:http://activemq.apache.org/networks-of-brokers.html