第十章 ActiveMq代理的属性在实际中的应用
第十章 ActiveMq代理的属性在实际中的应用
章节导读
- 使用通配符和复合的目的地
- 利用advistory消息
- 理解虚拟的topic 以及追溯消费者
- 使用activeMq插件
- apache camel的介绍
10.1 通配符和复合的destination
下面会介绍使用通配符订阅多个目的地,使用复合目的地发布消息到多个目的地.
10.1.1 使用通配符从多个目的地中消费
activemq支持层级目的地,如果你有一个应用用来描述一项运动的最新结果,你可以使用 Sport.league.team来表示 .举例说明,描述Leed队伍在足球比赛中的的最新结果,可以这样表示:football.division1.leed.如果leed同时参与足球和橄榄球,方便起见,你想要看到Leed的所有比赛的所有结果,
以下三种通配符是很有用的:
- . -- 点,用于分割元素
- * -- 用于匹配一个元素
- > -- 用于匹配一个或所有的元素
描述leed队伍参加的所有项目的最新比分,你可以这样去表示:*.*.leed:
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic allLeeds = session.createTopic("*.*.Leeds"); MessageConsumer consumer = session.createConsumer(allLeeds); Message result = consumer.receive();
如果你想得到division1联盟中所有足球比赛的结果,你可以这样表示:football.division1.*,如果你想要得到所有橄榄球比赛的最新得分,你可以这样表示:rugby.>.
PS:通配符只对消费者起作用,如果你把消息发布到一个叫做rugby.>.的主题,消息只会被发送到rugby.>.而不是所有以rugby名字开头的主题.
10.1.2 发送消息到多个目的地
符合目的地使用一种名为"逗号分离"的类型的名字命名.例如:你创建一个名为store.order.backoffice,store.order.warehouse的队列,那么发送消息的时候就会同时发送到这两个队列.复合目的地支持队列和主题的组合.要么队列名是queue://,要么主题名是topic://.
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue ordersDestination = session.createQueue("store.orders, topic:// store.orders"); MessageProducer producer = session.createProducer(ordersDestination); Message order = session.createObjectMessage(); producer.send(order);
10.2 advisory消息
advisory消息是一种代理被改变的结果的通知类型的消息,Activemq内部也使用这种类型的消息,通知连接关于临时目的地的可用性以及通知网络消费者的可用性.每个advisory消息都有一种JMS类型的Advisory和预先定义好的JMS字符属性,以及代理的唯一标示:
- originBrokerId -- 生成advisory消息的代理的id
- originBrokerName -- 代理的名字
- originBrokerUrl -- 代理的第一个传输连接器的URL
public class Consumer { private static Logger logger = Logger.getLogger(Consumer.class); private static String brokerURL = "failover:(tcp://localhost:61617,tcp://localhost:61616)"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException, InterruptedException { Consumer consumer1 = new Consumer(); Topic connectionAdvisory = AdvisorySupport.CONNECTION_ADVISORY_TOPIC; MessageConsumer consumer = consumer1.getSession().createConsumer(connectionAdvisory); //监听broker的启动和停止 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { DataStructure data = (DataStructure) ((org.apache.activemq.command.Message) message).getDataStructure(); if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) { ConnectionInfo connectionInfo = (ConnectionInfo) data; System.out.println("Connection started: " + connectionInfo); } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { RemoveInfo removeInfo = (RemoveInfo) data; System.out.println("Connection stopped: " + removeInfo.getObjectId()); } else { System.err.println("Unknown message " + data); } } catch (Exception e) { e.printStackTrace(); } } }); } public Session getSession() { return session; } }
服务启动时会打印出下面的消息:
Connection started: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:YOS-01605061430-65527-1486105925973-1:1, clientId = ID:YOS-01605061430-65527-1486105925973-0:1, clientIp = tcp://127.0.0.1:49187, userName = null, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true, faultTolerant = true, failoverReconnect = false}
其它的advisory详见书(Activemq实战)282页.
10.3 虚拟topic
现在还无法实现把消息发送给主题,然后多个消费者按队列的形式从主题中消费.
虚拟主题允许生产者给一个正常的topic发送消息,同时消费者从队列中接收消息.所以消费者订阅到一个队列接送从主题中来的消息.具体结构看下图
为了能正确使用虚拟主题,必须要遵循一些规范.首先,主题名字应该以VirtualTopic.主题名来命名.所以如果你想要为一个名为order的主题创建虚拟主题,你可以这样命名:VirtualTopic.order.要从整个队列中消费,消费者必须订阅到一个名为(Consumer.消费者名称).VirtualTopic.(虚拟主题名称)的队列.
假设你要让消费者在一个队列中竞争消息,你可以创建两个队列接受者,都从名为Consumer.Foo.VirtualTopic.order中消费.
public static void main(String[] args) throws JMSException, InterruptedException { String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); String queueName = "Consumer.Foo.VirtualTopic.orders"; // Create the first consumer for Consumer.Foo.VirtualTopic.orders Session sessionA = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue fooQueueA = sessionA.createQueue(queueName); MessageConsumer consumerA = sessionA.createConsumer(fooQueueA); consumerA.setMessageListener(getMessageListener()); // Create the second consumer for Consumer.Foo.VirtualTopic.orders Session sessionB = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue fooQueueB = sessionB.createQueue(queueName); MessageConsumer consumerB = sessionB.createConsumer(fooQueueB); consumerB.setMessageListener(getMessageListener()); // Create the sender String topicName = "VirtualTopic.orders"; Connection senderConnection = connectionFactory.createConnection(); senderConnection.start(); Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic ordersDestination = senderSession.createTopic(topicName); MessageProducer producer = senderSession.createProducer(ordersDestination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // Send 2000 messages for (int i = 0; i < 20; ++i) { TextMessage message = senderSession.createTextMessage(); message.setText("i:" + i); producer.send(message); } } private static MessageListener getMessageListener() { return new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMsg = (TextMessage) message; try { System.out.println(textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }; }
虚拟主题提供了一种方便的机制来结合负载均衡以及队列的故障转移,消费者不需要担心创建唯一的JMS client Id和名称,也可以以负债均衡的形式实现消息竞争.如果其中一个消费者死亡,其它的消费者或继续接受队列中的消息.
10.4 可追溯的消费者
假设一个应用需要消费尽可能的快的被发送和消费,那么需要关闭消息的持久化.这样就有一个缺点,如果消费者在生产者启动之后启动,那么久有可能丢失消息.为了提供一个有效的追溯消息的方法,ActiveMq可以缓存一个可配置的大小或者数量的主题消息。这些由两部分组成--消息消费者需要通知代理它对追溯信息感兴趣,还需要配置代理的目的地中多少消息需要被缓存.为了标记消费者可追溯,你需要设置可追溯标志.
Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
在代理端,你可以配置相关的策略,默认是叫做FixedSizedSubscriptionRecoveryPolicy,默认大小是64KB.具体请参考subscriptionRecoveryPolicy配置
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">"> <subscriptionRecoveryPolicy> <fixedSizedSubscriptionRecoveryPolicy maximumSize="8mb"/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
10.5 dead-letter(死信)队列
当代理上的消息过期或者不能被发送时,它们就会被移到dead-letter队列,它们可以在之后的某个时间段被管理员查看或者消费.
消息通常在以下场景中重发给客户端:
- 客户端使用事务并调用rollback方法
- 客户端使用事务却在提交事务之前关闭了.
- 客户端在session中使用了CLIENT_ACKNOWLEDGE,并调用了recover().
有时候客户端会因为消息错误的格式而不接受消息.这种情况下,一直重发消息是没有意义的.你可以配置Activemq代理重发消息之前的等待时间,在每次失败后是否应该增加时间,以及在被移到死信队列之前的最大重发次数.
以下使用程序配置的例子:
RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2);所有消息都有一个默认名为ActiveMQ.DLQ的死信队列.你可以配置死信队列的层次结构或者配置一个私有的目的地.
<broker...> <destinationPolicy> <policyMap> <policyEntries> <!— 设置所有队列,使用 '>' ,否则用队列名称 --> <policyEntry queue=">"> <deadLetterStrategy> <!-- queuePrefix:设置死信队列前缀 useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信 processExpired=false代表不处理过期的 processNonPersistent="false"代表不处理非持化的 --> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="false" processNonPersistent="false" /> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>当一条消息被发送到死信队列,一条advisory消息就会生成.你可以监听ActiveMQ.Advisory.MessageDLQd.* topic来获取死信队列中的advisory消息.