RabbitMQ之发布/订阅

场景: 一个发送日志,一个接受者把接收到的数据写到磁盘.另一个接受者把接收到的消息,打在控制台上

Exchanges

先来回顾一下之前的概念

生产者:发送消息的用户应用程序

队列:消息存储缓冲区

消费者:接收消息的用户应用程序

对于rabbitmq来说,生产者不会将任何消息直接发送给队列的. 生产者甚至不知道消息是否会被传递到哪个队列,生产者,只是把消息发送给交换器(exchanges),交换器,用来接收生产者的消息,还把消息推送到队列.具体怎么处理生产者的消息, 是发给所有队列,还是特定队列等等,取决于交换器的类型

交换器类型:direct topic headers fanout

重点学习一下fanout,最简单最快的,发给所有的队列

创建一个交换器

//建立一个fanout类型转发器

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

  • 1
  • 2

匿名转发

//指定转发器名字:logs,空字符串表示模式或者匿名的转发器

channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + message + "'");

  • 1
  • 2
  • 3

临时队列

//无参queueDeclare(),创建一个非持久化,独立的,自动删除的队列,且名字是随机生成的

//queueName随机生成, 类似这样的:amq.gen-57B3VSVx-l1jawmOoRBYLw

String queueName = channel.queueDeclare().getQueue();

  • 1
  • 2
  • 3

Binding

上面创建了一个广播转发器和一个随机队列,现在需要告诉转发器转发消息到队列.这个就叫binding

//日志转发器附加到日志队列上

channel.queueBind(queueName, EXCHANGE_NAME, "");

  • 1
  • 2

看看代码吧

生产者

package com.tgb.kwy.publish;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

* Description

*

* @author kongwy [email protected]

* @version 1.0

* @date 2018-07-09-08 -41

*/

public class EmitLog {

private static final String EXCHANGE_name="logs";

public static void main(String[] args) throws java.io.IOException{

ConnectionFactory factory=new ConnectionFactory();

factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/

/*指定用户名和密码*/

factory.setUsername("admin");

factory.setPassword("admin");

try{

Connection connection = factory.newConnection();

Channel channel=connection.createChannel();

//建立一个fanout类型转发器

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

String message = getMessage(args);

//指定转发器名字:logs,空字符串表示模式或者匿名的转发器

channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + message + "'");

channel.close();

connection.close();

}catch (Exception e){

}

}

private static String getMessage(String[] strings){

if(strings.length<1){

return "info: Hello World!";

}

return joinStrings(strings," ");

}

private static String joinStrings(String[] strings,String delimiter){

int length=strings.length;

if(length==0){

return "";

}

StringBuilder words=new StringBuilder(strings[0]);

for (int i = 0; i < length; i++) {

words.append(delimiter).append(strings[i]);

}

return words.toString();

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

消费者1(显示)

package com.tgb.kwy.publish;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* Description

*

* @author kongwy [email protected]

* @version 1.0

* @date 2018-07-09-08 -46

*/

public class ReceiveLogs {

private static final String EXCHANGE_name="logs";

public static void main(String[] args) throws Exception{

ConnectionFactory factory=new ConnectionFactory();

factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/

/*指定用户名和密码*/

factory.setUsername("admin");

factory.setPassword("admin");

Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

//建立一个fanout类型转发器

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

//无参queueDeclare(),创建一个非持久化,独立的,自动删除的队列,且名字是随机生成的

String queueName = channel.queueDeclare().getQueue();

//queueName随机生成, 类似这样的:amq.gen-57B3VSVx-l1jawmOoRBYLw

//日志转发器附加到日志队列上

channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

channel.basicConsume(queueName, true, consumer);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

消费者2(记录文件)

package com.tgb.kwy.publish;

import com.rabbitmq.client.*;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

/**

* Description

*

* @author kongwy [email protected]

* @version 1.0

* @date 2018-07-09-11 -41

*/

public class ReceiveLogFile {

private static final String EXCHANGE_name="logs";

public static void main(String[] args) throws Exception{

ConnectionFactory factory=new ConnectionFactory();

factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/

/*指定用户名和密码*/

factory.setUsername("admin");

factory.setPassword("admin");

Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

//建立一个fanout类型转发器

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

//无参queueDeclare(),创建一个非持久化,独立的,自动删除的队列,且名字是随机生成的

String queueName = channel.queueDeclare().getQueue();

//queueName随机生成, 类似这样的:amq.gen-57B3VSVx-l1jawmOoRBYLw

//日志转发器附加到日志队列上

channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

printFile(message);

System.out.println(" [x] Received '" + message + "'");

}

};

channel.basicConsume(queueName, true, consumer);

}

private static void printFile(String msg){

try{

String dir=ReceiveLogFile.class.getClassLoader().getResource("").getPath();

String logFileName=new SimpleDateFormat("yyyy-MM-dd").format(new Date());

File file = new File(dir, logFileName + ".log");

FileOutputStream fos = new FileOutputStream(file, true);

fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "").getBytes());

fos.flush();

fos.close();

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

当消费者启动之后,管理页面可以看到,随机队列

RabbitMQ之发布/订阅

注: 运行的时候,先运行消费者,再运行生产者,否则获取不到实例

注:代码来自官网.官网对于概念性的内容,讲解的还是很清楚的

相关推荐