ActiveMQ5.2发送和接受BlobMessage
JMS消息生产者:
import java.io.File; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; public class BlobMessageSendTest { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/"; private String subject = "Blob Queue"; private Destination destination = null; private ActiveMQConnection connection = null; private ActiveMQSession session = null; private MessageProducer producer = null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = (ActiveMQConnection) connectionFactory.createConnection(); /* * !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true * (default) the uploader is lost in translation ;) * !!!!!!!!!!!!!!!!!!!!!!!!! */ connection.setCopyMessageOnSend(false); session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); // destination = session.createTopic(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // 发送消息 public void produceMessage(File file) throws JMSException, Exception { initialize(); BlobMessage msg = session.createBlobMessage(file); connection.start(); System.out.println("Producer:->Sending message: " + file.getName()); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
JMS消息消费者:
package iprai.ace.activemq; import java.io.File; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; public class BlobMessageSendTest { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/"; private String subject = "Blob Queue"; private Destination destination = null; private ActiveMQConnection connection = null; private ActiveMQSession session = null; private MessageProducer producer = null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = (ActiveMQConnection) connectionFactory.createConnection(); /* * !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true * (default) the uploader is lost in translation ;) * !!!!!!!!!!!!!!!!!!!!!!!!! */ connection.setCopyMessageOnSend(false); session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); // destination = session.createTopic(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // 发送消息 public void produceMessage(File file) throws JMSException, Exception { initialize(); BlobMessage msg = session.createBlobMessage(file); connection.start(); System.out.println("Producer:->Sending message: " + file.getName()); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
测试代码:
import java.io.File; public class BlobMessageTest { /** * topic方式,必须先启动消费者,然后是生产者,否则接收不到消息。 queue方式,最好先启动生产者,然后启动消费者,否则也容易收不到消息。 * * @param args */ public static void main(String[] args) throws Exception { BlobMessageSendTest producer = new BlobMessageSendTest(); BlobMessageReceiveTest consumer = new BlobMessageReceiveTest(); String fileName = "D:/装Win7后装XP.txt"; // String fileName = "d:/JAVA+开发视频会议系统详细设计.doc"; File file = new File(fileName); producer.produceMessage(file); producer.close(); // 延时500毫秒之后停止接受消息 Thread.sleep(2000); // 开始监听 consumer.consumeMessage(); // 延时500毫秒之后发送消息 Thread.sleep(2000); consumer.close(); } }
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30