ActiveMQ消息选择器与异步接收
一、消息选择器
消息选择器:过滤消息属性与设置条件相等的消息进行消费。语义与sql一致。
private final String selector_1 = "sex='w'"; this.consumer = session.createConsumer(destination, selector_1);
二、消息异步接收
消息异步接收:当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息
示例:
public class Producer { // 建立connectionFactory工厂对象 private ActiveMQConnectionFactory connectionFactory; // 连接对象 private Connection connection; // session对象 private Session session; // 生产者 private MessageProducer producer; public Producer() { this.connectionFactory = new ActiveMQConnectionFactory(); try { this.connection = connectionFactory.createConnection("fu", "fu"); this.connection.start(); //参一:开启事务,参二,手工签收 this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); this.producer = session.createProducer(null); } catch (Exception e) { e.printStackTrace(); } } public void send() throws Exception { Destination destination = this.session.createQueue("first"); MapMessage map = this.session.createMapMessage(); //设置消息 map.setString("name", "zs"); map.setString("age", "40"); //设置消息属性 map.setStringProperty("sex", "m"); MapMessage map1 = this.session.createMapMessage(); map1.setString("name", "ls"); map1.setString("age", "20"); map1.setStringProperty("sex", "w"); MapMessage map2 = this.session.createMapMessage(); map2.setString("name", "ww"); map2.setString("age", "35"); map2.setStringProperty("sex", "w"); //参一目标,参二数据,参三非持久化,参四做优先及,参五失效时间 this.producer.send(destination, map, DeliveryMode.PERSISTENT, 2, 1000*10); this.producer.send(destination, map1, DeliveryMode.PERSISTENT, 2, 1000*10); this.producer.send(destination, map2, DeliveryMode.PERSISTENT, 9, 1000*10); //提交事务 this.session.commit(); //关闭连接 this.connection.close(); } public static void main(String[] args) throws Exception { Producer p = new Producer(); p.send(); } }
public class Comsumer { // private final String selector_0 = "age>30"; // 消息过滤的不是消息本身,而是过滤消息附带的某些属性 private final String selector_1 = "sex='w'"; // 建立connectionFactory工厂对象 private ActiveMQConnectionFactory connectionFactory; // 连接对象 private Connection connection; // session对象 private Session session; // 生产者 private MessageConsumer consumer; // 目标地址 private Destination destination; public Comsumer() { this.connectionFactory = new ActiveMQConnectionFactory(); try { this.connection = connectionFactory.createConnection("fu", "fu"); this.connection.start(); this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); this.destination = this.session.createQueue("first"); // 消息过滤的不是消息本身,而是过滤消息附带的某些属性 this.consumer = session.createConsumer(destination, selector_1); } catch (Exception e) { e.printStackTrace(); } } public void recever() throws Exception { // 消息异步接收:当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息 this.consumer.setMessageListener(new Listener()); } class Listener implements MessageListener { public void onMessage(Message message) { try { if (message instanceof TextMessage) { } else if (message instanceof MapMessage) { MapMessage m = (MapMessage) message; System.out.println(m.toString()); System.out.println(m.getString("name")); System.out.println(m.getString("age")); // 手工签收消息 m.acknowledge(); } } catch (JMSException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Comsumer c = new Comsumer(); c.recever(); } }
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30