RabbitMQ——Java客户端访问

RabbitMQ是一个轻量级的消息代理中间件,它支持多种消息通信协议,支持分布式部 署,同时也支持运行于多个操作系统,具有灵活、高可用等特性

RabbitMQ支持多协议其中最为重要的是高级消息队列协议(AMQP), AMQP 是Advanced Message Queuing Protocol 的缩写 它定义了“消息客户端”与“消息代理中间件”之间的通信协议。基于该协议,消息客户端与消息代理中间件可以不受开发语言具体产品的约束

java客户端访问

1.maven工程的pom文件中添加依赖

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>4.1.0</version>
</dependency>

2.新建消息生产者

package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
 public static void main(String[] args) throws IOException, TimeoutException {
 //创建连接工厂
 ConnectionFactory factory = new ConnectionFactory();
 factory.setUsername("guest");
 factory.setPassword("guest");
 //设置 RabbitMQ 地址
 factory.setHost("localhost");
 //建立到代理服务器到连接
 Connection conn = factory.newConnection();
 //获得信道
 Channel channel = conn.createChannel();
 //声明交换器
 String exchangeName = "hello-exchange";
 channel.exchangeDeclare(exchangeName, "direct", true);
 String routingKey = "hola";
 //发布消息
 byte[] messageBodyBytes = "quit".getBytes();
 channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
 channel.close();//关闭通道和链接
 conn.close();
 }
}

在生产者中,先创建服务器链接,再建立通道和队列,最后向服务器发布一条"hola"的消息。

3.编写消费者

package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
 public static void main(String[] args) throws IOException, TimeoutException {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setUsername("guest");
 factory.setPassword("guest");
 factory.setHost("localhost");
 //建立到代理服务器到连接
 Connection conn = factory.newConnection();
 //获得信道
 final Channel channel = conn.createChannel();
 //声明交换器
 String exchangeName = "hello-exchange";
 channel.exchangeDeclare(exchangeName, "direct", true);
 //声明队列
 String queueName = channel.queueDeclare().getQueue();
 String routingKey = "hola";
 //绑定队列,通过键 hola 将队列和交换器绑定起来
 channel.queueBind(queueName, exchangeName, routingKey);
 while(true) {
 //消费消息
 boolean autoAck = false;
 String consumerTag = "";
 channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag,
 Envelope envelope,
 AMQP.BasicProperties properties,
 byte[] body) throws IOException {
 String routingKey = envelope.getRoutingKey();
 String contentType = properties.getContentType();
 System.out.println("消费的路由键:" + routingKey);
 System.out.println("消费的内容类型:" + contentType);
 long deliveryTag = envelope.getDeliveryTag();
 //确认消息
 channel.basicAck(deliveryTag, false);
 System.out.println("消费的消息体内容:");
 String bodyStr = new String(body, "UTF-8");
 System.out.println(bodyStr);
 }
 });
 }
 }
}

5.消费者与生产者类似,先创建链接、通道、再声明接收的队列,最后使用一个Consumer来接收处理的消息。(程序会一直阻塞,一旦接收到服务器发来的消息就会进行处理)

6.接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:

RabbitMQ——Java客户端访问

相关推荐