ActiveMQ分布式网络(Forward Bridge)

    当ActiveMQ面对大量消息存储和大量Client交互时,性能消耗将会达到单个broker极限,此时我们需要对ActiveMQ进行水平扩展。ActiveMQ提供了“network”机制,可以把多个broker实例“串联”一起,形成“Forward Bridge”模型(转发桥)。这些Broker通过有向网络(networkerConnector)链接在一起,组成broker集群,任何一个Borker都可以与Client数据交互,消息也将在Broker网络中“流动”直到被消费,之所以“流动”,因为“producer”和“consumer”会与不同的broker建立链接,我们认为“转发桥”架构模式是“较大”集群数据的解决方案。

    在master-slave模式中,消息将会在多个broker上备份,即集群中每个broker上消息都一样,在故障转移时不会发生消息丢失的问题(持久化消息)。“Forward Bridge”意味着消息可以在broker间“转发”直到消息被消费,在任意时刻一条消息只会被一个broker持有;Producer发送的消息,可能会经过多个Broker转发最终才会到达Consumer,如果其中某个Broker失效,那么其上的消息将不可用(当然也可以为每个节点采用M-S架构,以提高可用性),直到它再次加入集群,因此“Forward Brige”模式并不是ActiveMQ HA(高可用)方案,但是因为集群中每个Broker都可以独立为Client服务而且消息可以动态的在网络中迁移,所以“Forward Bridge”是解决海量消息的必备策略。



ActiveMQ分布式网络(Forward Bridge)
 
 

    当network中某个Broker故障失效,failover协议可以让Client迁移到其他正常的Broker上,从整体来看,并不会影响Client的数据交互。ActiveMQ还提供了动态平衡的机制,当某个Broker加入集群,可以让其他Broker上部分Client迁移到新的Broker上,从全局实现“负载均衡”。

    对于Queue而言,消息只会转发给有消费者存在的broker上,比如有三个Broker A,B,C,它们之间使用双向网络链接,如果A,B上有Consumer,C上没有,那么消息只会在A和B之间转发,C上将不会有任何消息。

    对于Topic而言,每个Broker都可能获取消息,即使broker上没有消费者;消息转发有配置决定,比如包含在staticallyIncludeDestinations中的Topic、在dynamicOnly为false情况下包含在durableDestinations中的Topic,这些Topic中的消息总是被转发给remote Broker。

    1)多个brokers通过网络连接组成分布式网络,消息可以在多个brokers之间流动,每个broker都可以存储消息,从而降低单个broker的存取压力,在存储层面类似于“sharding”。

    2)broker的离群只会导致此节点上的数据失效,不会影响集群的其他节点。

    3)Failover协议支持:broker失效后,其上的client将会迁移到其他broker上;当broker加入集群,其他brokers上的部分clients将有迁移到此broker上;最终在Client层面,实现存取的“负载均衡”。

    4)不需要太多的人工干预。

    5)producer和consumer可以与不同的broker建立连接,但是消息的消费不会受到影响;可以支撑较大规模的客户端个数,而且客户端的个数可以在多个brokers负载均衡。

    首先需要注意,分布式网络中的所有broker都不能有相同的brokerName和brokerId,且强烈建议开启“advisorySupport=true”。

<broker brokerName="broker-0" dataDirectory="${activemq.data}">

	<networkConnectors>
	  <networkConnector name="broker0-broker1" duplex="true" networkTTL="1" uri="static:(tcp://localhost:51616)"/>
	</networkConnectors>

	<transportConnectors>
		<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
		<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
	</transportConnectors>

</broker>

    网络发现:

    networkConnector中使用uri属性来指定远端的Broker地址(下游网络节点,可以有多个),我们可以使用“static:(broker1,broker2)”格式指定候选broker列表,“static”是一个逻辑协议,和Client端使用的“failover”协议一样,可以实现链接重连和列表遍历重试特性。通常“static”协议用在networkConnector中,“failover”协议用在Client端。broker将会依次尝试与uri中每个broker建立链接,直到与其一建立成功为止。通常uri中broker列表为master-slave模式,这也是broker高可用设计要求,我们可以使用masterslave协议:

<networkConnectors>
  <networkConnector uri="masterslave:(tcp://master:61616,tcp://slave:61616)"/>
</networkConnectors>

    在production环境中,我们通常不能使用“network动态发现”,所以在此不在赘言。

    当networkerConnector与remote Broker建立链接之后,那么remote Broker将会向local Broker交付订阅信息,包括remote broker持有的destinations、Consumers、持久订阅者列表等;那么此后local Broker将把remote Broker做一个消息“订阅者”,local Broker接收到的所有消息都会有条件(根据remote Broker订阅情况,以及本地的destination配置)的转发给remote Broker。如果集群中有新的Consumer、Destination创建,都会通过“Advisory Topic”发送通知,这些通知消息,和普通的消息一样也会在集群网络中转发,那么所有的broker都可以通过“Advisory”通知来获取集群动态,并调整消息转发。

    networkConnector核心属性:

    1. name:每个networkConnector都需要一个名称,而且在本地的networkConnectors列表中不能重复。

    2. dynamicOnly:用来表明connector是否只支持动态激活“durable订阅者”,默认为false,表示networkConnector在创建成功后,将立即激活所有的“durable Topic”,即使本地没有Topic的消费者(如果本地Broker已经有消费者,肯定会激活),本地Broker也会创建一个“典型的”订阅者,并将订阅信息发送给remote Broker,此后remote Broker接收到的Topic消息也将转发给本地Broker。如果为true,那么netwokrConnector将不会立即激活它们(本地没有消费者的durable Topic),直到从“Advisory”通知获取到激活消息(有Durable Consumer创建)时,才会做上述操作。它将对Durable Topic中消息在网络中转发的时机有重要影响。通常设定为false,以避免broker大面积失效时,Client迁移到那些尚没有接收到任何“Advisory”的新Broker节点上;不过如果网络比较稳定,设定dymanicOnly为true,可以有效的提升Topic消息的转发效率。

    比如A,B两个Broker,它们之间使用了“duplex”双向网络链接,并设置“dynamicOnly = true”;我们首先启动A并在其上创建一个durable订阅者,并由producer创建几个消息;然后启动B,producer继续在A上创建消息,此时我们会发现B上其实没有接受到任何消息(因为B没有接收到任何Advisory);此后我们重新在A上创建durable订阅者,这时我们会发现B上尽管没有任何消费者信息,但其也接收到了Topic消息的Copy。(如果此值为false,那么当B启动后,即使其上没有消费者,A仍然需要将durableDestinations中的Topic的消息转发给B)

<networkConnectors>
  <networkConnector name="broker0-broker1" duplex="true" dynamicOnly="true" networkTTL="1" uri="static:(tcp://localhost:51616)">
	<!-- dynamicOnly只对durableDestinations有效 -->
	<durableDestinations>
		<topic physicalName="order.dbcenter"/>
	</durableDestinations>
 </networkConnector>

    3. decreaseNetworkConsumerPriority:是否降低网络消费者(Queue)的权重,默认为false,即remote Broker中的Consumers与本地Consumer具有相同的权重(默认值=0,我们可以为每个消费者指定权重),消费者“权重”决定了它获取消息的优先级,较高权重的消费者将优先消费消息。如果此值为true,那么remote Broker中的Consumer将具有最低的优先级(-7),那么只有当Queue在Local Broker中没有Consumer,或者所有的Consumers都满负荷(prefetch Buffe已满)时,才会将消息转发给remote Broker中的Consumer。这是一种较好的调优策略,不仅可以降低消息转发的开支,而且还能较好的确保消息顺序,通常我们设置为true,唯一的缺点就是不利于Queue consumers在全局范围内负载均衡。 (解决的问题:首先保证消息被当前broker上的consumer消费,只有当本地所有的消费者都繁忙时才转发给remote broker上的consumer。)

    4. networkTTL:消息在网络中转发的最大生命周期,默认为1,即消息只会在网络中转发一次。消息在网络中每转发一次,都会将TTL-1,并且将当前的brokerId添加到路由信息中,当TTL=0或者当前brokerId已经在路由表中(此broker已经转发过此消息,避免loop),那么消息将不会继续转发下去。通常此值小与broker节点的个数,如果集群中brokers有回环网络,则建议为1,比如:A->B,B->C,C->A。(避免networkConnector是链状,且有回环情况,良好的架构下,应该是星状结构,即每个broker都与其他brokers建立连接,而不是链状结构--pipleline)

    5. conduitSubscriptions: 是否将remote Broker下同一个Destination中的多个Consumer,作为一个Consumer对待。默认为true。(conduit:管线)

    6. duplex:网络是否为双向的,默认为false。如果为true,则remote Broker不仅是“订阅者”,还可以作为消息的“生产者”,即remote Broker也会向Local Broker转发消息。我们通常是,每个broker均与其他broker建立单向的networkConnector,比如有三个broker A/B/C,那么在A上配置2个单向的:A->B,A->C,那么在B上配置:B->A,B->C,以此论推。

    7. alwaysSyncSend: 向remote Broker转发消息时,是否总是同步发送。默认为false,即只对持久化消息采用同步发送;如果为true,那么对于非持久化消息也将采用同步发送。所谓同步发送,就是local Broker通过networkerConnector将消息发送给remote Broker后,阻塞并等到remote Broker返回ACK指令。这是消息存储担保的方式。

    8. staticBridge:是否为“静态”Bridge,默认false,即networkConnector可以通过Advisory通知来获取Consumer的变更,并调整转发策略。如果为true,那么broker将不关注任何Advisory(Consumer变更),只会使用“staticallyIncludeDestination”配置创建必要的订阅。

    9. brigeTempDestinations:当Temp通道创建时,是否发送Advisory通知,并支持其消息的转发,默认为true。在Queue中,在使用request-reply(请求--应答)模式的消息消费时,reply通道通常是一个temp destination(与Connection同生命周期,其他Connection可见,不可跨Broker,其他特性和正常的Destination一样),这种destination在分布式网络中会有些局限性,有可能Producer和Consumer分布在不同的broker上, 如果禁用此属性,那么Consumer的reply消息将无法到达Producer(temp 通道无法跨越多个broker或者Connection,Consumer将会因为找不到Temp Destination而发生异常)。建议此值为true,分布式网络将通过Advisory来感知temp通道的创建(producer端创建),并在Consumer端消息转发时支持Temp通道中的消息(判定当前Broker是否已经接收到了Advisory,如果接收到此通知,那么broker将会在本地保留temp destination信息)。参见源码【AdvisoryBroker】

    可靠性:

    网络中的Broker总是首先保存消息,然后转发;消息的可靠性依赖于消息本身是否为持久化类型或者订阅者是否为duarable,非持久化消息和临时订阅者的消息仍然在broker失效时有丢失的风险;消息的属性不会在转发时修改。

    消息顺序(Queue):

    即使单个Broker,也无法保证消费者获取消息的顺序是严格一致的,除非只有一个Consumer(其实即使只有一个Consumer,如果发生消息重发的情况,顺序性会打破);那么对于分布式网络中的Broker,这种顺序性就更加难以保证,因为remote Broker相对于Local Broker,它就是一个逻辑上的Consumer,Local Broker从Queue的头部移除消息,转发给Remote Broker,那么remote Broker接受到消息之后,将消息添加到本地Queue的尾部,很明显,这个转发过程已经打破了Queue中消息的有序性(双端都有一个本地Queue),如果有多个Producer且分布在多个Broker上,而且还指定了Consumer Proirity,那么情况可能就更加复杂。

    Conduit Subscriptions:

    上述已经提到networkConnector中有个属性为“conduitSubscriptions”,当networkConnector创建成功后,local Broker已经知道remote Broker所持有的consumer列表(包括此后在remote broker上新加入的Consumer),当Local Broker接收到新消息时,也会将有关的消息转发给remote Broker。对于Topic订阅而言,如果remote Broker上有多个订阅者时,local broker将会把消息转发给每个订阅者,事实上同一条消息多次转发给了remote Broker,remote Broker接收到消息之后将会重新整理,并发送到本地订阅者时,就会出现重复消息的问题。所以在Topic中,我们通常设定“conduitSubscriptions=true”,同时为了避免重复消息的检测对Client造成影响,我们还需要在每个broker上的Topic策略上添加“enableAudit=false”。

<policyEntry topic=">" enableAudit="false">
....

    对于Queue而言,“conduitSubscriptions”却有不同的解释,如果此值为true,那么remote Broker上有多个consumer时也将按照看做一个consumer,这对消息的全局负载均衡是不利的。比如A,B两个Broker,如果A上有一个Consumer,B上有2个Consumers,如果Queue中有30条消息且Consumer的权重都一样的情况下,在conduitSubscriptions=true时,那么对于A而言它只认为B上有一个Consnumer,所以A和B各得到15条消息;如果conduitSubscriptions为false,那么对于A而言它认为B上有2个Consumer,即全局有3个Consumers,那么将会转发给B共20条消息, 那么三个消费者均会获得10条消息。(源码参见:DurableConduitBridge,DemandForwardingBridge)

    不过对于Queue而言,还有一个非常麻烦的事情,就是“selector”问题,网络Broker在转发消息时不会关注selector,即有可能不符合remote Broker中consumer selector的消息也会转发,此时这些消息无法被remote consumer所消费,而且也不会再次回传;如果remote consumer中使用了selector,暂时的策略就是让conduitSubscriptions=false;事实上,ActiveMQ是不希望消息不断回传,而是希望使用“reblanceClient”。

    通常,我们对Topic和Queue采用单独配置的原则:

<networkConnector name="broker0-broker1-topic" uri="static:(tcp://broker1:61616)" conduitSubscriptions="true">
	<excludedDestinations>
		<queue physicalName=">"/>
	</excludedDestinations>
</networkConnector>

<networkConnector name="broker0-broker1-queue" uri="static:(tcp://broker1:61616)" conduitSubscriptions="false">
	<excludedDestinations>
		<topic physicalName=">"/>
	</excludedDestinations>
</networkConnector>

    duplex网络:

    默认Duplex为false,即单向网络,此时remote Broker作为订阅者,Local Broker负责向remote Broker转发消息;如果duplex为true,那么Local Broker也作为订阅者,此时remote Broker也可以通过链接转发消息,相当于在A上配置了A->B和在B上配置了B->A效果是一样的,只使用了不过2个单向的网络链接;如果2个broker之间数据传输量较大或者开发者希望个性化配置时,我们可以配置2个单向网络。

    此外需要提醒,不要在2个broker之间重复配置duplex=true的网络链接。

<!-- broker0-broker1 -->
<networkConnector name="broker0-broker1" uri="static:(tcp://broker1:61616)" duplex="false" />

<!-- broker1-broker0 -->
<networkConnector name="broker1-broker0" uri="static:(tcp://broker0:61616)" duplex="false" />

    Advisory:

    ActiveMQ提供了“Advisory”机制,通常ActiveMQ内部将某些事件作为“advisory”在全局广播,比如destination的创建、consumer的加入、DLQ的产生等,这将额外的消耗极小的性能;我们可以在ActiveMQ的监控页面上看到影响的消息,开发者也可以View这些消息(通道名称以“ActiveMQ.Advisory.”开头)。对于分布式网络中的broker,将严重依赖“Advisory”,特别是“dynamic network”,我们建议无论如何都要开启它。(在一个broker上发生事件,都会以“通知”的方式发送给配置文件中指定的所有networkConnector)

<!-- 默认为true -->
<broker advisorySupport="true" ...>

    Dynamic networks:

    “动态网络”表明当remote Broker持有通道的消费者时,local Broker才会转发相应的消息;此时我们需要开启advisorySupport。当remote broker上有Consumer创建时,Advisory中将会广播消息,消息为ConsumerInfo类型,它将包括consumer所在的broker path,如果local broker与此path建立了networkConnector,那么此后local Broker将会启动响应的消息转发。

<networkConnector uri="static:(tcp://broker1:61616)">
  <dynamicallyIncludedDestinations>
    <queue physicalName="include.test.foo"/>
    <topic physicalName="include.test.bar"/>
  </dynamicallyIncludedDestinations>
</networkConnector>

    Static networks:

    相对于“动态网络”而言,“静态网络”将不依赖Advisory,在任何时候,即使remote Broker中没有相应的consumer,消息也将转发给remote Broker。为了支持这一特性,我们还需要“staticBridge=true”,此参数限定当前networkConnector将不使用advisory感知consumer变更,只关注staticallyIncludeDestinations中的通道。

<networkConnector uri="static:(tcp://broker1:61616)" staticBridge="true">
	<staticallyIncludedDestinations>
		<queue physicalName="always.include.queue"/>
	</staticallyIncludedDestinations>
</networkConnector>

    excludeDestinations:

    “排除” 指定的通道,这些通道中的消息将不会转发给remote Broker。在消息转发时或者Advisory中有consumer创建时,都会首先检测其通道是否在excludeDestinations中,如果在,则将不会与remote Broker建立此通道上的任何订阅关系;即使它也存在于staticallyIncludeDestinations或者dynamicIncludeDestinations中。

    Failover协议与分布式网络:

    我们可以在Client端使用failover协议,Client可以随机选择指定uri中的任何一个broker进行交互,当broker链接失效后,Client可以尝试重连或者尝试与其他Broker建立链接,failover协议与分布式网络搭配,不仅可以解决Client端故障转移,而且还能配合Broker端消息来实现Client全局负载均衡,让多个Client均衡的分布在多个Broker上。

<broker>
  ...
  <transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" rebalanceClusterClients="true" />
  </<transportConnectors>
  ...
</broker>

    在broker端,transportConnector上支持如下几个属性用来动态调整Client与Broker的链接情况:

    1) updateClusterClients:当网络中,broker节点失效或者broker加入集群,是否通知Client,默认为false;如果希望开启Client动态平衡,首先开启此参数。

    2) rebalanceClusterClients:当有新的Broker加入集群时,是否重新平衡Client链接,默认为false。建议开启为true,此后如果有新Broker加入,那么将有一部分Client断开链接并与新的Broker建立链接。

    3) updateClusterClientsOnRemove:当集群中有broker失效时,是否通知Client,默认为false;建议为true。

    当集群中Broker变动时,Client将会在链接上接收到通知,Client可以从通知中获取最新的broker列表。所以,Client甚至不需要配置所有的broker uri,只需要配置一个能够发送通知的broker uri即可(前提是这个broker首次链接必须可用)。

failover:(tcp://primary:61616)

    不过,我们还是希望尽可能的多配置几个broker url,以避免primary失效的问题。此外,关于failover协议的更多配置参数,可以参看:【failover协议