RabbitMQ——Java客户端访问
RabbitMQ是一个轻量级的消息代理中间件,它支持多种消息通信协议,支持分布式部 署,同时也支持运行于多个操作系统,具有灵活、高可用等特性
RabbitMQ支持多协议其中最为重要的是高级消息队列协议(AMQP), AMQP 是Advanced Message Queuing Protocol 的缩写 它定义了“消息客户端”与“消息代理中间件”之间的通信协议。基于该协议,消息客户端与消息代理中间件可以不受开发语言具体产品的约束
java客户端访问
1.maven工程的pom文件中添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
2.新建消息生产者
package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //获得信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close();//关闭通道和链接 conn.close(); } }
在生产者中,先创建服务器链接,再建立通道和队列,最后向服务器发布一条"hola"的消息。
3.编写消费者
package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //获得信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //绑定队列,通过键 hola 将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消费消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } }
5.消费者与生产者类似,先创建链接、通道、再声明接收的队列,最后使用一个Consumer来接收处理的消息。(程序会一直阻塞,一旦接收到服务器发来的消息就会进行处理)
6.接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:
相关推荐
shenzhenzsw 2020-10-09
shyoldboy 2020-09-27
ljcsdn 2020-07-27
waitzkj 2020-07-25
woaishanguosha 2020-07-18
shenzhenzsw 2020-06-21
OnMyHeart 2020-06-20
waitzkj 2020-06-20
OnMyHeart 2020-06-16
cj0 2020-06-07
Soongp 2020-06-07
cj0 2020-06-06
cj0 2020-06-01
OnMyHeart 2020-05-27
xx0cw 2020-05-16
liym 2020-05-16
leihui00 2020-09-16