本文的ActiveMQ都基于5.10版本,参考了ActiveMQ官方文档:http://activemq.apache.org/failover-transport-reference.html。
集群是个比较广泛的概念,它有多种形式,关于消息服务的集群,大概分为Consumer集群(消费者集群)和Broker集群(消息服务器集群)两种。
对于消费者集群,对于队列消费者,主要是:1.保证如果某一个消费者死亡了,任何它没有确认完的消息会被重传别的正常的消费者来消费;2.如果一个消费者消费消息过快,就可以比别的消费者得到更多的消息;3.如果一个消费者消费消息过慢,它就会被少得到消息。第1点几乎是所有JMS提供者都有的功能——消息重传机制(可以参考我的其他ActiveMQ博文)。第2点和第3点也是很正常的,因为大多消费者和线程是一一对应的关系,你消费速率快,当然可以自己去服务器拉取更多的消息。当然ActiveMQ在队列上给消费者提供了高性能的负载均衡策略。对于主题订阅者,由于每个订阅者接受到被推送的消息都和其他订阅者无关,所以处理相对简单,JMS也有持久订阅者这一概念,这里不多说。
对于消息服务器集群,主要是指:1.如果集群中的某一台消息服务器宕机,与该台消息服务器相连接的生产者和消费者能否自动连接到其他正常工作的消息服务器。2.如果集群中的某一台消息服务器宕机,该台服务器上未消费的消息能否在该台服务器恢复正常之前由其他服务器转发。3.集群环境中会不会导致某台消息服务器上只有消费者或者某台消息服务器上只有生产者。对于1,ActiveMQ提供了一种叫做失效转移(也叫故障转移,FailOver)的策略。失效转移提供了在传输层上重新连接到其他任何传输器的功能。使用它很简单,只需要在uri中配置就行了,语法如下:
failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN
例子:
failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 (
如果这样使用报错你可以试试这个:failover://(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 (this way works in ActiveMQ 4.1.1 the one above does not)
)如果某个ActiveMQ客户端发现uri1地址失效了,它会立即转向uri地址列表中其他可以连接的消息服务器进行重连,以保证继续正常工作,请注意,并不是uri1失效了就会选则uri2重连,这种选择其他地址的方式默认是随机的,以保证负载均衡,如果你想关闭随机,可以transportOptions中加入randomize=false。
transportOptions有多种参数可以选择,如下:
initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之前等待的时间。
maxReconnectDelay:默认30000,单位毫秒,表示两次重连之间的最大时间间隔。
useExponentialBackOff:默认为true,表示重连时是否加入避让指数来避免高并发。
reconnectDelayExponent:默认为2.0,重连时使用的避让指数。
maxReconnectAttempts:5.6版本之前默认为-1,5.6版本及其以后,默认为0,0表示重连的次数无限,配置大于0可以指定最大重连次数。
startupMaxReconnectAttempts:默认为0,如果该值不为0,表示客户端接收到消息服务器发送来的错误消息之前尝试连接服务器的最大次数,一旦成功连接后,maxReconnectAttempts值开始生效,如果该值为0,则默认采用maxReconnectAttempts。详见FailoverTransport.java代码:
private int calculateReconnectAttemptLimit() { int maxReconnectValue = this.maxReconnectAttempts; if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { maxReconnectValue = this.startupMaxReconnectAttempts; } return maxReconnectValue; }
randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略,记住,这种随机策略在第一次选择URI列表中的地址时就开始生效,所以,如果为true的话,一个生产者和一个消费者的Failover连接地址都是两个URI的话,有可能生产者连接的是第一个,而消费者连接的是第二个,造成一个服务器上只有生产者,一个服务器上只有消费者的尴尬境地。
backup:默认为false,表示是否在连接初始化时将URI列表中的所有地址都初始化连接,以便快速的失效转移,默认是不开启。
timeout:默认为-1,单位毫秒,是否允许在重连过程中设置超时时间来中断的正在阻塞的发送操作。-1表示不允许,其他表示超时时间。这样说你肯定不是很明白,直接看代码吧,下面给出FailoverTransport.java类中oneway方法中的一段代码给你看你就明白了:
// Keep trying until the message is sent. for (int i = 0; !disposed; i++) { try { // Wait for transport to be connected. Transport transport = connectedTransport.get(); long start = System.currentTimeMillis(); boolean timedout = false; while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted()) { if (LOG.isTraceEnabled()) { LOG.trace("Waiting for transport to reconnect..: " + command); } long end = System.currentTimeMillis(); if (command.isMessage() && timeout > 0 && (end - start > timeout)) { timedout = true; if (LOG.isInfoEnabled()) { LOG.info("Failover timed out after " + (end - start) + "ms"); } break; } try { reconnectMutex.wait(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); if (LOG.isDebugEnabled()) { LOG.debug("Interupted: " + e, e); } } transport = connectedTransport.get(); } // 其余的代码略
trackMessages:默认值为false,是否缓存在发送中(in-flight messages)的消息,以便重连时让新的Transport继续发送。默认是不开启。
maxCacheSize:默认131072,如果trackMessages为true,该值表示缓存消息的最大尺寸,单位byte。
updateURIsSupported:默认值为true,表示重连时客户端新的连接器(Transport)是否从消息服务接受接受原来的URI列表的更新,5.4及其以后的版本可用。如果关闭的话,会导致重连后连接器没有其他的URI地址可以Failover。
updateURIsURL:默认为null,从5.4及其以后的版本,ActiveMQ支持从文件中加载Failover的URI地址列表,URI还是以逗号分隔,updateURIsURL为文件路径。详见FailoverTransport.java中代码:
private void doUpdateURIsFromDisk() { // If updateURIsURL is specified, read the file and add any new // transport URI's to this FailOverTransport. // Note: Could track file timestamp to avoid unnecessary reading. String fileURL = getUpdateURIsURL(); if (fileURL != null) { BufferedReader in = null; String newUris = null; StringBuffer buffer = new StringBuffer(); try { in = new BufferedReader(getURLStream(fileURL)); while (true) { String line = in.readLine(); if (line == null) { break; } buffer.append(line); } newUris = buffer.toString(); } catch (IOException ioe) { LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); } finally { if (in != null) { try { in.close(); } catch (IOException ioe) { // ignore } } } processNewTransports(isRebalanceUpdateURIs(), newUris); } }
nested.*:默认为null,5.9及其以后版本可用,表示给嵌套的URL添加额外的选项。 以前,如果你想检测让死连接速度更快,你必须在wireFormat.maxInactivityDuration= 1000选项添加到失效转移列表中的所有嵌套的URL。例如:
failover:(tcp://host01:61616?wireFormat.maxInactivityDuration=1000,tcp://host02:61616?wireFormat.maxInactivityDuration=1000,tcp://host03:61616?wireFormat.maxInactivityDuration=1000)
而现在,你只需要这样:
failover:(tcp://host01:61616,tcp://host02:61616,tcp://host03:61616)?nested.wireFormat.maxInactivityDuration=1000
warnAfterReconnectAttempts.*:默认为10,5.10及其以后的版本可用,表示每次重连该次数后会打印日志告警,设置<=0的值表示禁用,FailoverTransport.java类的doReconnect()部分相关代码如下
int warnInterval = getWarnAfterReconnectAttempts(); if (warnInterval > 0 && (connectFailures % warnInterval) == 0) { LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.", uris, connectFailures); }reconnectSupported:默认为true,表示客户端是否应响应经纪人 ConnectionControl事件与重新连接(参见:rebalanceClusterClients)。
如果你使用Failover失效转移,则消息服务器在死亡的那一刻,你的生产者发送消息时默认将阻塞,但你可以设置发送消息阻塞的超时时间(注:timeout参数前面已经讲过了):
failover:(tcp://primary:61616)?timeout=3000
上面的设置将导致如果3秒后连接还未建立,将导致消息发送失败,但这并不会导致该连接被kill,所以你可以过一阵子后再使用这一个连接来尝试发送消息。
如果用户希望能追踪到重连过程,可以在ActiveMQConnectionFactory设置一个TransportListener,如下所示:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false"); factory.setTransportListener(new TransportListener() { @Override public void transportResumed() { System.out.println("连接器已经恢复完毕!"); } @Override public void transportInterupted() { System.out.println("连接器被中断了!"); } @Override public void onException(IOException error) { System.out.println(error); } @Override public void onCommand(Object command) { System.out.println(command); } });
下面我们看看如何在消息服务器(broker)上配置失效转移Failover。
在消息服务器这边,有一些选项可以将客户端更新到新的消息服务器,如下所示:
updateClusterClients:默认为false,如果为true,则会将broker集群的拓扑结构的改变信息传递给连接的客户端。
rebalanceClusterClients:默认为false,如果为true,则如果有新的消息服务器加入到消息服务器集群中,则连接的客户端将被要求重新平衡(asked to rebalance)。注意, priorityBackup=true能覆盖。
updateClusterClientsOnRemove:默认为false,如果为true,则当一个集群从网络中移除的时候将更新客户端。有了这个选项,可以在消息服务器移除时更新客户端,而不是仅仅只是新增消息服务器时更新。(难道官方文档有问题:if true, will update clients when a cluster is removed from the network. Having this as separate option enables clients to be updated when new brokers join, but not when brokers leave.)
updateClusterFilter:默认为null,如果有值,将会是逗号分隔的正则表达式列表,用来过滤掉失效转移时的消息服务器集群中的服务器名称。
举例:
<broker>
...
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterFilter="*A*,*B*" />
</<transportConnectors>
...
</broker>
如果面配置所示,如果updateClusterClients设置为true,则连接到该服务器的客户端的连接器地址Failover的URI列表只需要写这个服务器地址就行,如:failover://tcp://primary:61616,当有新的消息服务器加入时,这些客户端将自动被更新添加该新消息服务器的地址,如果发生网络或消息服务器宕机的事件,就可以重连接到新的消息服务器上。
有时候我们希望客户端能优先选择某些消息服务器地址,比如既有本地服务器,又有远程服务器,我们希望本地的应用程序优先选择本地服务器进行连接,从5.6版本开始,ActiveMQ提供了优先级备份(priority backup )的特性,所以你可以让客户端自动重连到所谓的“优先级”URI,你可以在客户端如下配置URI地址:
failover:(tcp://local:61616,tcp://remote:61616)?randomize=false&priorityBackup=true
如果上面这个地址被用于客户端使用,客户端将尝试并保持连接到本地(上面local表示的地址)的消息服务器,当然,如果本地的服务器故障,将转移到远程(remote代表的地址)服务器。默认情况下,只有URI列表中的第一个被视为优先(本地)URI,在某些情况下,你希望不止一个URI地址优先,则你可以使用priorityURIs参数:
failover:(tcp://local1:61616,tcp://local2:61616,tcp://remote:61616)?randomize=false&priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616
这样,客户端将视local1和local2都为优先URI。