ActiveMQ消息存储-Broker缓存消息

通过subscription recovery policy实现,该策略决定什么类型的消息需要缓存,怎样缓存,缓存多少时间。

1. 怎样缓存

    ActiveMQ Message Broker给使用到的每个Topic在内存中缓存消息,但有两种类型的Topic是不支持 

    的:temporary  topic和advisory topic. 同时也不支持缓存Queue中的消息。

    Broker缓存的消息仅仅分发给一个Topic Consumer,如果该Consumer重新激活;但是不分发给

    DurableTopic Subscriber. Topic Consumer被创建时需要在Destination上注:

    consumer.retroactive=true,例如:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

public void createRetroactiveConsumer() throws JMSException{
	
	ConnectionFactory fac = new ActiveMQConnectionFactory();
	Connection connection = fac.createConnection();
	connection.start();
	Session session =
	connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
	Topic topic =
	session.createTopic("TEST.TOPIC?consumer.retroactive=true");
	
	MessageConsumer consumer = session.createConsumer(topic);
}

2.  ActiveMQ subscription recovery policies(114-115)

     THE ACTIVEMQ FIXED SIZE SUBSCRIPTION RECOVERY POLICY

     THE ACTIVEMQ FIXED COUNT SUBSCRIPTION RECOVERY POLICY

     THE ACTIVEMQ QUERY-BASED SUBSCRIPTION RECOVERY POLICY

     THE ACTIVEMQ TIMED SUBSCRIPTION RECOVERY POLICY

     THE ACTIVEMQ LAST IMAGE SUBSCRIPTION RECOVERY POLICY

     THE ACTIVEMQ NO SUBSCRIPTION RECOVERY POLICY

3.  Subscription recovery policy配置

<?xml version="1.0" encoding="UTF-8"?>
<beans>
	<broker brokerName="test-broker" persistent="true"
		useShutdownHook="false" deleteAllMessagesOnStartup="true"
		xmlns="http://activemq.apache.org/schema/core">
		<transportConnectors>
			<transportConnector uri="tcp://localhost:61635" />
		</transportConnectors>
		<destinationPolicy>
			<policyMap>
				<policyEntries>
					<policyEntry topic="Topic.FixedSizedSubs.>">
						<subscriptionRecoveryPolicy>
							<fixedSizeSubscriptionRecoveryPolicy
								maximumSize="2000000" useSharedBuffer="false" />
						</subscriptionRecoveryPolicy>
					</policyEntry>
					<policyEntry topic="Topic.LastImageSubs.>">
						<subscriptionRecoveryPolicy>
							<lastImageSubscriptionRecoveryPolicy />
						</subscriptionRecoveryPolicy>
					</policyEntry>
					<policyEntry topic="Topic.NoSubs.>">
						<subscriptionRecoveryPolicy>
							<noSubscriptionRecoveryPolicy />
						</subscriptionRecoveryPolicy>
					</policyEntry>
					<policyEntry topic="Topic.TimedSubs.>">
						<subscriptionRecoveryPolicy>
							<timedSubscriptionRecoveryPolicy
								recoverDuration="25000" />
						</subscriptionRecoveryPolicy>
					</policyEntry>
				</policyEntries>
			</policyMap>
		</destinationPolicy>
	</broker>
</beans>

相关推荐