ActiveMQ学习笔记----ActiveMQ和JBossMQ性能对比测试代码

本文描述了对ActiveMQ进行性能测试的代码。性能测试用源代码共包含3个文件,分别是:

JMS消息发送类:ActiveMQProducer.java

JMS消息接收类:ActiveMQConsumer.java

JMS消息收发测试主类:ActiveMQTest.java

下面分别介绍这三个类。

1. JMS消息发送类 ActiveMQProducer.java 的源码如下:

/**

*<p>Title:</p>

*<p>Description:</p>

*<p>Copyright:Copyright(c)2007</p>

*<p>Company:</p>

*@authormqboss

*@version1.0

 */

import javax.jms.*;

import org.apache.activemq.*;

public class ActiveMQProducer { public final static int MAX_SEND_TIMES = 100;

 private String user = ActiveMQConnection.DEFAULT_USER;

 private String password = ActiveMQConnection.DEFAULT_PASSWORD;

 private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

 private String subject = "TOOL.DEFAULT";

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageProducer producer = null;

 /**

*初始化

*

*@throwsJMSException

*@throwsjava.lang.Exception

*/

privatevoidinitialize()throwsJMSException,Exception{

ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(

user,password,url);

connection=connectionFactory.createConnection();

session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue(subject);

producer=session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

connection.start();

 }

 /**

*发送消息

*

*@parammessage

*@throwsJMSException

*@throwsjava.lang.Exception

*/

publicvoidproduceMessage(Stringmessage)throwsJMSException,Exception{

initialize();

TextMessagemsg=session.createTextMessage(message);

  long beginTime = System.currentTimeMillis();

  System.out.println("Producer:->Sending message: ");

  for (int i = 0; i < MAX_SEND_TIMES; i++) {

producer.send(msg);

if((i+1)%1000==0){

System.out.println("Thisisthe"+i+"message!");

}

  }

  System.out.println("Producer:->Message sent complete!");

longendTime=System.currentTimeMillis();

longexecuteTime=endTime-beginTime;

System.out.println("ActiveMQsend"+MAX_SEND_TIMES+"messagesused:"

+executeTime+"ms");

 }

 /**

*关闭连接

*

*@throwsJMSException

*/

publicvoidclose()throwsJMSException{

System.out.println("Producer:->Closingconnection");

if(producer!=null){

producer.close();

}

if(session!=null){

session.close();

}

if(connection!=null){

connection.close();

}

}

}

2.JMS消息接收类ActiveMQConsumer.java的源码如下

/**

*<p>Title:</p>

*<p>Description:</p>

*<p>Copyright:Copyright(c)2007</p>

*<p>Company:</p>

*@authormqboss

*@version1.0

 */

import javax.jms.*;

import org.apache.activemq.*;

public class ActiveMQConsumer implements MessageListener {

publicstaticintRECEIVED_MSG_NUM=0;

longbeginReceiveTime=0;

longendReceiveTime=0;

 long receiveDuringTime = 0;

 private String user = ActiveMQConnection.DEFAULT_USER;

 private String password = ActiveMQConnection.DEFAULT_PASSWORD;

 private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

 private String subject = "TOOL.DEFAULT";

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageConsumer consumer = null;

 /**

*初始化

*

*@throwsJMSException

*@throwsjava.lang.Exception

*/

privatevoidinitialize()throwsJMSException,Exception{

ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(

user,password,url);

connection=connectionFactory.createConnection();

session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue(subject);

consumer=session.createConsumer(destination);

 }

 /**

*接收消息

*

*@throwsJMSException

*@throwsjava.lang.Exception

*/

publicvoidconsumeMessage()throwsJMSException,Exception{

initialize();

connection.start();

System.out.println("Consumer:->Beginlistening...");

//设置消息监听

consumer.setMessageListener(this);

 }

 /**

*关闭连接

*

*@throwsJMSException

*/

publicvoidclose()throwsJMSException{

System.out.println("Consumer:->Closingconnection");

if(consumer!=null){

consumer.close();

}

if(session!=null){

session.close();

}

if(connection!=null){

connection.close();

}

 }

 /**

*收到消息的处理

*

*@parammessage

*/

publicvoidonMessage(Messagemessage){

try{

   if (message instanceof TextMessage) {

    TextMessage txtMsg = (TextMessage) message;    String msg = txtMsg.getText();

    // receive the first message

if(RECEIVED_MSG_NUM==0){

beginReceiveTime=System.currentTimeMillis();

    }

    RECEIVED_MSG_NUM++;

    // print one String when received 1000 message

if((RECEIVED_MSG_NUM+1)%1000==0){

System.out.println("Consumer:->Received:"

+RECEIVED_MSG_NUM);

    }

    // receive the last message

if(RECEIVED_MSG_NUM==ActiveMQProducer.MAX_SEND_TIMES-1){

endReceiveTime=System.currentTimeMillis();

receiveDuringTime=endReceiveTime-beginReceiveTime;

System.out.println("ActiveMQReceive"

+ActiveMQProducer.MAX_SEND_TIMES

+"messagesused:"+receiveDuringTime+"ms");

    }

   } else {

System.out.println(System.currentTimeMillis()

+"Consumer:->Received:"+message);

}

}catch(JMSExceptione){

e.printStackTrace();

}

}

}

3. JMS消息收发测试主类:ActiveMQTest.java

/**

*<p>Title:</p>

*<p>Description:</p>

*<p>Copyright:Copyright(c)2007</p>

*<p>Company:</p>

*@authormqboss

*@version1.0

*/

import javax.jms.*;

public class ActiveMQTest {

 /**

*

*@paramargs

*@throwsJMSException

*@throwsjava.lang.Exception

*/

publicstaticvoidmain(String[]args)throwsJMSException,Exception{

ActiveMQConsumerconsumer=newActiveMQConsumer();

ActiveMQProducerproducer=newActiveMQProducer();

char[]tempChars=newchar[1024];

for(inti=0;i<1024;i++){

tempChars[i]='a';

}

StringtempMsg=String.valueOf(tempChars);

//开始监听

consumer.consumeMessage();

producer.produceMessage(tempMsg);

  producer.close();

  // 延时5000ms后关闭连接

Thread.sleep(5000);

consumer.close();

}

}

为了与JBossMQ进行性能对比测试,下面把JBossMQ收发消息的测试代码也一并附上。

为了保证测试代码的一致性,JBossMQ性能测试的代码也包含3个文件,分别是:

JMS消息发送类:JBossMQProducer.java

JMS消息接收类:JBossMQConsumer.java

JMS消息收发测试主类:JBossMQTest.java

下面分别介绍这三个类。

1. JMS消息发送类 JBossMQProducer.java 的源码如下:

/**

*<p>Title:</p>

*<p>Description:</p>

*<p>Copyright:Copyright(c)2007</p>

*<p>Company:</p>

*@authormqboss

*@version1.0

 */

import java.util.Properties;

import javax.jms.*;

importjavax.naming.Context;

import javax.naming.InitialContext;

public class JbossMQProducer { public final static int MAX_SEND_TIMES = 100;

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageProducer producer = null;

 /**

*初始化

*

*@throwsJMSException

*@throwsjava.lang.Exception

*/

privatevoidinitialize()throwsJMSException,Exception{

Propertiesprops=newProperties();

props.put(Context.INITIAL_CONTEXT_FACTORY,

"org.jnp.interfaces.NamingContextFactory");

props.put(Context.URL_PKG_PREFIXES,"org.jboss.naming");

  props.put(Context.PROVIDER_URL, "localhost:1099");

  InitialContext jmsContext = new InitialContext(props);

ConnectionFactoryconnectionFactory=(ConnectionFactory)jmsContext

.lookup("ConnectionFactory");

connection=connectionFactory.createConnection();

session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

destination=(Queue)jmsContext.lookup("queue/A");

producer=session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

connection.start();

 }

 /**

*发送消息

*

*@parammessage

*@throwsJMSException

*@throwsjava.lang.Exception

*/

publicvoidproduceMessage(Stringmessage)throwsJMSException,Exception{

initialize();

  TextMessage msg = session.createTextMessage(message);

  long beginTime = System.currentTimeMillis();

  System.out.println("Producer:->Sending message: ");

  for (int i = 0; i < MAX_SEND_TIMES; i++) {

if((i+1)%1000==0){

System.out.println("Thisisthe"+i+"message!");

}

producer.send(msg);

  }

  System.out.println("Producer:->Message sent complete!");

longendTime=System.currentTimeMillis();

longexecuteTime=endTime-beginTime;

System.out.println("JbossMQsend"+MAX_SEND_TIMES+"messagesused:"

+executeTime+"ms");

 }

 /**

*关闭连接

*

*@throwsJMSException

*/

publicvoidclose()throwsJMSException{

System.out.println("Producer:->Closingconnection");

if(producer!=null){

producer.close();

}

if(session!=null){

session.close();

}

if(connection!=null){

connection.close();

}

}

}

2. JMS消息接收类 JBossMQConsumer.java 的源码如下

/**

*<p>Title:</p>

*<p>Description:</p>

*<p>Copyright:Copyright(c)2007</p>

*<p>Company:</p>

*@authormqboss

*@version1.0

 */

import java.util.Properties;

import javax.jms.*;import javax.naming.*;

public class JbossMQConsumer implements MessageListener {

publicstaticintRECEIVED_MSG_NUM=0;

longbeginReceiveTime=0;

longendReceiveTime=0;

 long receiveDuringTime = 0;

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageConsumer consumer = null;

 /**

*初始化

*

*@throwsJMSException

*@throwsjava.lang.Exception

*/

privatevoidinitialize()throwsJMSException,Exception{

Propertiesprops=newProperties();

props.put(Context.INITIAL_CONTEXT_FACTORY,

"org.jnp.interfaces.NamingContextFactory");

props.put(Context.URL_PKG_PREFIXES,"org.jboss.naming");

  props.put(Context.PROVIDER_URL, "localhost:1099");

  // get naming context

InitialContextjmsContext=newInitialContext(props);

ConnectionFactoryconnectionFactory=(ConnectionFactory)jmsContext

    .lookup("ConnectionFactory");

  connection = connectionFactory.createConnection();

session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

destination=(Queue)jmsContext.lookup("queue/A");

consumer=session.createConsumer(destination);

 }

 /**

*接收消息

*

*@throwsJMSException

*@throwsjava.lang.Exception

*/

publicvoidconsumeMessage()throwsJMSException,Exception{

initialize();

connection.start();

System.out.println("Consumer:->Beginlistening...");

//设置监听

consumer.setMessageListener(this);

 }

 /**

*关闭连接

*

*@throwsJMSException

*/

publicvoidclose()throwsJMSException{

System.out.println("Consumer:->Closingconnection");

if(consumer!=null){

consumer.close();

}

if(session!=null){

session.close();

}

if(connection!=null){

connection.close();

}

 }

 /**

*处理消息

*

*@parammessage

*/

publicvoidonMessage(Messagemessage){

try{

   if (message instanceof TextMessage) {

    TextMessage txtMsg = (TextMessage) message;    String msg = txtMsg.getText();

    // receive the first message

if(RECEIVED_MSG_NUM==0){

beginReceiveTime=System.currentTimeMillis();

    }

    RECEIVED_MSG_NUM++;

    //print one String when received 1000 message

if((RECEIVED_MSG_NUM+1)%1000==0){

System.out.println("Consumer:->Received:"

+RECEIVED_MSG_NUM);

    }

    // receive the last message

if(RECEIVED_MSG_NUM==JbossMQProducer.MAX_SEND_TIMES-1){

endReceiveTime=System.currentTimeMillis();

receiveDuringTime=endReceiveTime-beginReceiveTime;

System.out.println("JbossMQreceive"

+JbossMQProducer.MAX_SEND_TIMES

+"messagesused:"+receiveDuringTime+"ms");

}

}else{

System.out.println(System.currentTimeMillis()

+"Consumer:->Received:"+message);

}

}catch(JMSExceptione){

e.printStackTrace();

}

}

}

3. JMS消息收发测试主类:JBossMQTest.javaimport javax.jms.JMSException;

/**

*<p>Title:</p>

*<p>Description:</p>

*<p>Copyright:Copyright(c)2007</p>

*<p>Company:</p>

*@authormqboss

*@version1.0

 */

public class JbossMQTest {

 /**

*

*@paramargs

*@throwsJMSException

*@throwsjava.lang.Exception

*/

publicstaticvoidmain(String[]args)throwsJMSException,Exception{

JbossMQConsumerconsumer=newJbossMQConsumer();

JbossMQProducerproducer=newJbossMQProducer();

char[]tempChars=newchar[1024];

for(inti=0;i<1024;i++){

tempChars[i]='a';

}

StringtempMsg=String.valueOf(tempChars);

//启动消息监听

consumer.consumeMessage();

producer.produceMessage(tempMsg);

producer.close();

//5000ms后关闭连接

Thread.sleep(5000);

consumer.close();

}

}

以上是把ActiveMQ集成到JBoss以后,对ActiveMQ和JBossMQ进行性能对比测试的源代码。很显然,以上源代码基本一致,所以性能测试结果可以很好的说明ActiveMQ和JBossMQ的性能对比情况。

参考文献:

[1]ActiveMQ的一个简单示例

相关推荐