ActiveMQ5.2发送和接受BlobMessage

JMS消息生产者:

import java.io.File;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class BlobMessageSendTest {
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
	private String subject = "Blob Queue";
	private Destination destination = null;
	private ActiveMQConnection connection = null;
	private ActiveMQSession session = null;
	private MessageProducer producer = null;

	// 初始化
	private void initialize() throws JMSException, Exception {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		connection = (ActiveMQConnection) connectionFactory.createConnection();
		/*
		 * !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
		 * (default) the uploader is lost in translation ;)
		 * !!!!!!!!!!!!!!!!!!!!!!!!!
		 */
		connection.setCopyMessageOnSend(false);
		session = (ActiveMQSession) connection.createSession(false,
				Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue(subject);
		// destination = session.createTopic(subject);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	}

	// 发送消息
	public void produceMessage(File file) throws JMSException, Exception {
		initialize();
		BlobMessage msg = session.createBlobMessage(file);
		connection.start();
		System.out.println("Producer:->Sending message: " + file.getName());
		producer.send(msg);
		System.out.println("Producer:->Message sent complete!");
	}

	// 关闭连接
	public void close() throws JMSException {
		System.out.println("Producer:->Closing connection");
		if (producer != null)
			producer.close();
		if (session != null)
			session.close();
		if (connection != null)
			connection.close();
	}
}

JMS消息消费者:

package iprai.ace.activemq;

import java.io.File;

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class BlobMessageSendTest {

	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
	private String subject = "Blob Queue";
	private Destination destination = null;
	private ActiveMQConnection connection = null;
	private ActiveMQSession session = null;
	private MessageProducer producer = null;

	// 初始化
	private void initialize() throws JMSException, Exception {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		connection = (ActiveMQConnection) connectionFactory.createConnection();
		/*
		 * !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
		 * (default) the uploader is lost in translation ;)
		 * !!!!!!!!!!!!!!!!!!!!!!!!!
		 */
		connection.setCopyMessageOnSend(false);
		session = (ActiveMQSession) connection.createSession(false,
				Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue(subject);
		// destination = session.createTopic(subject);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	}

	// 发送消息
	public void produceMessage(File file) throws JMSException, Exception {
		initialize();
		BlobMessage msg = session.createBlobMessage(file);
		connection.start();
		System.out.println("Producer:->Sending message: " + file.getName());
		producer.send(msg);
		System.out.println("Producer:->Message sent complete!");
	}

	// 关闭连接
	public void close() throws JMSException {
		System.out.println("Producer:->Closing connection");
		if (producer != null)
			producer.close();
		if (session != null)
			session.close();
		if (connection != null)
			connection.close();
	}
}

测试代码:

import java.io.File;

public class BlobMessageTest {

	/**
	 * topic方式,必须先启动消费者,然后是生产者,否则接收不到消息。 queue方式,最好先启动生产者,然后启动消费者,否则也容易收不到消息。
	 * 
	 * @param args
	 */
	public static void main(String[] args) throws Exception {

		BlobMessageSendTest producer = new BlobMessageSendTest();
		BlobMessageReceiveTest consumer = new BlobMessageReceiveTest();

		String fileName = "D:/装Win7后装XP.txt";
		// String fileName = "d:/JAVA+开发视频会议系统详细设计.doc";
		File file = new File(fileName);
		producer.produceMessage(file);

		producer.close();
		// 延时500毫秒之后停止接受消息
		Thread.sleep(2000);
		// 开始监听
		consumer.consumeMessage();
		// 延时500毫秒之后发送消息
		Thread.sleep(2000);
		consumer.close();
	}
}

相关推荐