ActiveMQ学习笔记----ActiveMQ和JBossMQ性能对比测试代码
本文描述了对ActiveMQ进行性能测试的代码。性能测试用源代码共包含3个文件,分别是:
JMS消息发送类:ActiveMQProducer.java
JMS消息接收类:ActiveMQConsumer.java
JMS消息收发测试主类:ActiveMQTest.java
下面分别介绍这三个类。1. JMS消息发送类 ActiveMQProducer.java 的源码如下:
/**
*<p>Title:</p>
*<p>Description:</p>
*<p>Copyright:Copyright(c)2007</p>
*<p>Company:</p>
*@authormqboss
*@version1.0
*/import javax.jms.*;
import org.apache.activemq.*;
public class ActiveMQProducer { public final static int MAX_SEND_TIMES = 100;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
/**
*初始化
*
*@throwsJMSException
*@throwsjava.lang.Exception
*/
privatevoidinitialize()throwsJMSException,Exception{
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(
user,password,url);
connection=connectionFactory.createConnection();
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue(subject);
producer=session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
}/**
*发送消息
*
*@parammessage
*@throwsJMSException
*@throwsjava.lang.Exception
*/
publicvoidproduceMessage(Stringmessage)throwsJMSException,Exception{
initialize();
TextMessagemsg=session.createTextMessage(message);
long beginTime = System.currentTimeMillis();System.out.println("Producer:->Sending message: ");
for (int i = 0; i < MAX_SEND_TIMES; i++) {
producer.send(msg);
if((i+1)%1000==0){
System.out.println("Thisisthe"+i+"message!");
}
}System.out.println("Producer:->Message sent complete!");
longendTime=System.currentTimeMillis();
longexecuteTime=endTime-beginTime;
System.out.println("ActiveMQsend"+MAX_SEND_TIMES+"messagesused:"
+executeTime+"ms");
}/**
*关闭连接
*
*@throwsJMSException
*/
publicvoidclose()throwsJMSException{
System.out.println("Producer:->Closingconnection");
if(producer!=null){
producer.close();
}
if(session!=null){
session.close();
}
if(connection!=null){
connection.close();
}
}
}2.JMS消息接收类ActiveMQConsumer.java的源码如下
/**
*<p>Title:</p>
*<p>Description:</p>
*<p>Copyright:Copyright(c)2007</p>
*<p>Company:</p>
*@authormqboss
*@version1.0
*/import javax.jms.*;
import org.apache.activemq.*;
public class ActiveMQConsumer implements MessageListener {
publicstaticintRECEIVED_MSG_NUM=0;
longbeginReceiveTime=0;
longendReceiveTime=0;
long receiveDuringTime = 0;private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
/**
*初始化
*
*@throwsJMSException
*@throwsjava.lang.Exception
*/
privatevoidinitialize()throwsJMSException,Exception{
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(
user,password,url);
connection=connectionFactory.createConnection();
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue(subject);
consumer=session.createConsumer(destination);
}/**
*接收消息
*
*@throwsJMSException
*@throwsjava.lang.Exception
*/
publicvoidconsumeMessage()throwsJMSException,Exception{
initialize();
connection.start();
System.out.println("Consumer:->Beginlistening...");
//设置消息监听
consumer.setMessageListener(this);
}/**
*关闭连接
*
*@throwsJMSException
*/
publicvoidclose()throwsJMSException{
System.out.println("Consumer:->Closingconnection");
if(consumer!=null){
consumer.close();
}
if(session!=null){
session.close();
}
if(connection!=null){
connection.close();
}
}/**
*收到消息的处理
*
*@parammessage
*/
publicvoidonMessage(Messagemessage){
try{
if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText();
// receive the first message
if(RECEIVED_MSG_NUM==0){
beginReceiveTime=System.currentTimeMillis();
}RECEIVED_MSG_NUM++;
// print one String when received 1000 message
if((RECEIVED_MSG_NUM+1)%1000==0){
System.out.println("Consumer:->Received:"
+RECEIVED_MSG_NUM);
}// receive the last message
if(RECEIVED_MSG_NUM==ActiveMQProducer.MAX_SEND_TIMES-1){
endReceiveTime=System.currentTimeMillis();
receiveDuringTime=endReceiveTime-beginReceiveTime;
System.out.println("ActiveMQReceive"
+ActiveMQProducer.MAX_SEND_TIMES
+"messagesused:"+receiveDuringTime+"ms");
}} else {
System.out.println(System.currentTimeMillis()
+"Consumer:->Received:"+message);
}
}catch(JMSExceptione){
e.printStackTrace();
}
}
}3. JMS消息收发测试主类:ActiveMQTest.java
/**
*<p>Title:</p>
*<p>Description:</p>
*<p>Copyright:Copyright(c)2007</p>
*<p>Company:</p>
*@authormqboss
*@version1.0
*/
import javax.jms.*;public class ActiveMQTest {
/**
*
*@paramargs
*@throwsJMSException
*@throwsjava.lang.Exception
*/
publicstaticvoidmain(String[]args)throwsJMSException,Exception{
ActiveMQConsumerconsumer=newActiveMQConsumer();
ActiveMQProducerproducer=newActiveMQProducer();
char[]tempChars=newchar[1024];
for(inti=0;i<1024;i++){
tempChars[i]='a';
}
StringtempMsg=String.valueOf(tempChars);
//开始监听
consumer.consumeMessage();
producer.produceMessage(tempMsg);
producer.close();// 延时5000ms后关闭连接
Thread.sleep(5000);
consumer.close();
}
}为了与JBossMQ进行性能对比测试,下面把JBossMQ收发消息的测试代码也一并附上。
为了保证测试代码的一致性,JBossMQ性能测试的代码也包含3个文件,分别是:
JMS消息发送类:JBossMQProducer.java
JMS消息接收类:JBossMQConsumer.java
JMS消息收发测试主类:JBossMQTest.java
下面分别介绍这三个类。1. JMS消息发送类 JBossMQProducer.java 的源码如下:
/**
*<p>Title:</p>
*<p>Description:</p>
*<p>Copyright:Copyright(c)2007</p>
*<p>Company:</p>
*@authormqboss
*@version1.0
*/import java.util.Properties;
import javax.jms.*;
importjavax.naming.Context;
import javax.naming.InitialContext;public class JbossMQProducer { public final static int MAX_SEND_TIMES = 100;
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
/**
*初始化
*
*@throwsJMSException
*@throwsjava.lang.Exception
*/
privatevoidinitialize()throwsJMSException,Exception{
Propertiesprops=newProperties();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
props.put(Context.URL_PKG_PREFIXES,"org.jboss.naming");
props.put(Context.PROVIDER_URL, "localhost:1099");InitialContext jmsContext = new InitialContext(props);
ConnectionFactoryconnectionFactory=(ConnectionFactory)jmsContext
.lookup("ConnectionFactory");
connection=connectionFactory.createConnection();
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination=(Queue)jmsContext.lookup("queue/A");
producer=session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
}/**
*发送消息
*
*@parammessage
*@throwsJMSException
*@throwsjava.lang.Exception
*/
publicvoidproduceMessage(Stringmessage)throwsJMSException,Exception{
initialize();
TextMessage msg = session.createTextMessage(message);long beginTime = System.currentTimeMillis();
System.out.println("Producer:->Sending message: ");
for (int i = 0; i < MAX_SEND_TIMES; i++) {
if((i+1)%1000==0){
System.out.println("Thisisthe"+i+"message!");
}
producer.send(msg);
}System.out.println("Producer:->Message sent complete!");
longendTime=System.currentTimeMillis();
longexecuteTime=endTime-beginTime;
System.out.println("JbossMQsend"+MAX_SEND_TIMES+"messagesused:"
+executeTime+"ms");
}/**
*关闭连接
*
*@throwsJMSException
*/
publicvoidclose()throwsJMSException{
System.out.println("Producer:->Closingconnection");
if(producer!=null){
producer.close();
}
if(session!=null){
session.close();
}
if(connection!=null){
connection.close();
}
}
}2. JMS消息接收类 JBossMQConsumer.java 的源码如下
/**
*<p>Title:</p>
*<p>Description:</p>
*<p>Copyright:Copyright(c)2007</p>
*<p>Company:</p>
*@authormqboss
*@version1.0
*/import java.util.Properties;
import javax.jms.*;import javax.naming.*;
public class JbossMQConsumer implements MessageListener {
publicstaticintRECEIVED_MSG_NUM=0;
longbeginReceiveTime=0;
longendReceiveTime=0;
long receiveDuringTime = 0;private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
/**
*初始化
*
*@throwsJMSException
*@throwsjava.lang.Exception
*/
privatevoidinitialize()throwsJMSException,Exception{
Propertiesprops=newProperties();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
props.put(Context.URL_PKG_PREFIXES,"org.jboss.naming");
props.put(Context.PROVIDER_URL, "localhost:1099");// get naming context
InitialContextjmsContext=newInitialContext(props);
ConnectionFactoryconnectionFactory=(ConnectionFactory)jmsContext
.lookup("ConnectionFactory");connection = connectionFactory.createConnection();
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination=(Queue)jmsContext.lookup("queue/A");
consumer=session.createConsumer(destination);
}/**
*接收消息
*
*@throwsJMSException
*@throwsjava.lang.Exception
*/
publicvoidconsumeMessage()throwsJMSException,Exception{
initialize();
connection.start();
System.out.println("Consumer:->Beginlistening...");
//设置监听
consumer.setMessageListener(this);
}/**
*关闭连接
*
*@throwsJMSException
*/
publicvoidclose()throwsJMSException{
System.out.println("Consumer:->Closingconnection");
if(consumer!=null){
consumer.close();
}
if(session!=null){
session.close();
}
if(connection!=null){
connection.close();
}
}/**
*处理消息
*
*@parammessage
*/
publicvoidonMessage(Messagemessage){
try{
if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText();
// receive the first message
if(RECEIVED_MSG_NUM==0){
beginReceiveTime=System.currentTimeMillis();
}RECEIVED_MSG_NUM++;
//print one String when received 1000 message
if((RECEIVED_MSG_NUM+1)%1000==0){
System.out.println("Consumer:->Received:"
+RECEIVED_MSG_NUM);
}// receive the last message
if(RECEIVED_MSG_NUM==JbossMQProducer.MAX_SEND_TIMES-1){
endReceiveTime=System.currentTimeMillis();
receiveDuringTime=endReceiveTime-beginReceiveTime;
System.out.println("JbossMQreceive"
+JbossMQProducer.MAX_SEND_TIMES
+"messagesused:"+receiveDuringTime+"ms");
}
}else{
System.out.println(System.currentTimeMillis()
+"Consumer:->Received:"+message);
}
}catch(JMSExceptione){
e.printStackTrace();
}
}
}3. JMS消息收发测试主类:JBossMQTest.javaimport javax.jms.JMSException;
/**
*<p>Title:</p>
*<p>Description:</p>
*<p>Copyright:Copyright(c)2007</p>
*<p>Company:</p>
*@authormqboss
*@version1.0
*/public class JbossMQTest {
/**
*
*@paramargs
*@throwsJMSException
*@throwsjava.lang.Exception
*/
publicstaticvoidmain(String[]args)throwsJMSException,Exception{
JbossMQConsumerconsumer=newJbossMQConsumer();
JbossMQProducerproducer=newJbossMQProducer();
char[]tempChars=newchar[1024];
for(inti=0;i<1024;i++){
tempChars[i]='a';
}
StringtempMsg=String.valueOf(tempChars);
//启动消息监听
consumer.consumeMessage();
producer.produceMessage(tempMsg);
producer.close();
//5000ms后关闭连接
Thread.sleep(5000);
consumer.close();
}
}以上是把ActiveMQ集成到JBoss以后,对ActiveMQ和JBossMQ进行性能对比测试的源代码。很显然,以上源代码基本一致,所以性能测试结果可以很好的说明ActiveMQ和JBossMQ的性能对比情况。
参考文献:
[1]ActiveMQ的一个简单示例