RabbitMQ初体验

这里官方使用的Pom是4.0.2版本

1 <dependencies>
 2   <dependency>
 3     <groupId>com.rabbitmq</groupId>
 4     <artifactId>amqp-client</artifactId>
 5     <version>4.0.2</version>
 6   </dependency>
 7    <dependency>
 8     <groupId>org.slf4j</groupId>
 9     <artifactId>slf4j-api</artifactId>
10     <version>1.7.10</version>
11   </dependency>
12   <dependency>
13     <groupId>org.slf4j</groupId>
14     <artifactId>slf4j-log4j12</artifactId>
15     <version>1.7.5</version>
16   </dependency>
17   <dependency>
18     <groupId>log4j</groupId>
19     <artifactId>log4j</artifactId>
20     <version>1.2.17</version>
21   </dependency>
22   <dependency>
23     <groupId>junit</groupId>
24     <artifactId>junit</artifactId>
25     <version>4.11</version>
26   </dependency>
27 </dependencies>

简单队列 hello word

RabbitMQ初体验

P:消息的生产者

C:消息的消费者

红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

那么我们根据以上的模型,咱们抽取出 3 个对象 生产者(用户发送消息) 队列(中间件):类似于容器(存储消息) 消费者(获取队列中的消息)

JAVA 操作 获取 MQ 连接

类似于我们在操作数据库的时候的要获取到连接然后才对数据进行

1 package cn.wh.util;
 2 
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 
 6 import java.io.IOException;
 7 import java.util.concurrent.TimeoutException;
 8 
 9 public class RabbitMqConnectionUtil {
10 /**
11 * 获取mq的连接
12 * @return
13 */
14 public static Connection getConnection() throws IOException, TimeoutException {
15 //定义一个连接工厂
16 ConnectionFactory factory=new ConnectionFactory();
17 //设置服务器的地址
18 factory.setHost("192.168.152.5");
19 //AMQP 5672
20 factory.setPort(5672);
21 //设置哪一个数据库 vhost
22 factory.setVirtualHost("/vhost_wh");
23 //设置用户名
24 factory.setUsername("wh");
25 factory.setPassword("123");
26 
27 return factory.newConnection();
28 }
29 }

生产者发送数据到消息队列

1 package cn.wh.simple;
 2 
 3 import cn.wh.util.RabbitMqConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Send {
11 private static final String QUEVE_NAME = "test_simple_queue";
12 
13 public static void main(String[] args) throws IOException {
14 //获取一个连接
15 Connection connection = null;
16 try {
17 
18 connection = RabbitMqConnectionUtil.getConnection();
19 } catch (IOException e) {
20 e.printStackTrace();
21 } catch (TimeoutException e) {
22 e.printStackTrace();
23 }
24 
25 //创建一个通道
26 Channel channel = connection.createChannel();
27 // 创建队列声明
28 channel.queueDeclare(QUEVE_NAME, false, false, false, null);
29 
30 //发送的消息
31 String msg="hello simple";
32 channel.basicPublish("",QUEVE_NAME,null,msg.getBytes());
33 System.out.println("发送成功===============");
34 try {
35 channel.close();
36 connection.close();
37 } catch (TimeoutException e) {
38 e.printStackTrace();
39 }
40 }
41 }

查看消消费者消费消费者消费消费者消费消费者消消费者消费

RabbitMQ初体验

消费者消费

1 package cn.wh.simple;
 2 
 3 import cn.wh.util.RabbitMqConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 import java.io.IOException;
 9 import java.util.concurrent.TimeoutException;
10 
11 public class Accept {
12 private static final java.lang.String QUEVE_NAME = "test_simple_queue";
13 public static void main(String[] args) throws IOException, InterruptedException {
14 
15 //获取一个连接
16 Connection connection=null;
17 {
18 try {
19 connection = RabbitMqConnectionUtil.getConnection();
20 } catch (IOException e) {
21 e.printStackTrace();
22 } catch (TimeoutException e) {
23 e.printStackTrace();
24 }
25 //定义管道
26 Channel channel = connection.createChannel();
27 //定义队列的消费者
28 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
29 channel.basicConsume(QUEVE_NAME,true,queueingConsumer);
30 while (true){
31 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
32 String msg = new String(delivery.getBody());
33 
34 
35 System.out.println("msg"+ msg);
36 }
37 }
38 }
39 }

简单队列的不足

耦合性高 生产消费一一对应(如果有多个消费者想都消费这个消息,就不行了) 队列名称变更时需要同时更改

关注作者

私信回复关键词:“ 架构 ” 获取更多关于RabbitMQ的知识!

相关推荐