RabbitMQ之发布/订阅
场景: 一个发送日志,一个接受者把接收到的数据写到磁盘.另一个接受者把接收到的消息,打在控制台上
Exchanges
先来回顾一下之前的概念
生产者:发送消息的用户应用程序
队列:消息存储缓冲区
消费者:接收消息的用户应用程序
对于rabbitmq来说,生产者不会将任何消息直接发送给队列的. 生产者甚至不知道消息是否会被传递到哪个队列,生产者,只是把消息发送给交换器(exchanges),交换器,用来接收生产者的消息,还把消息推送到队列.具体怎么处理生产者的消息, 是发给所有队列,还是特定队列等等,取决于交换器的类型
交换器类型:direct topic headers fanout
重点学习一下fanout,最简单最快的,发给所有的队列
创建一个交换器
//建立一个fanout类型转发器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- 1
- 2
匿名转发
//指定转发器名字:logs,空字符串表示模式或者匿名的转发器
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
- 1
- 2
- 3
临时队列
//无参queueDeclare(),创建一个非持久化,独立的,自动删除的队列,且名字是随机生成的
//queueName随机生成, 类似这样的:amq.gen-57B3VSVx-l1jawmOoRBYLw
String queueName = channel.queueDeclare().getQueue();
- 1
- 2
- 3
Binding
上面创建了一个广播转发器和一个随机队列,现在需要告诉转发器转发消息到队列.这个就叫binding
//日志转发器附加到日志队列上
channel.queueBind(queueName, EXCHANGE_NAME, "");
- 1
- 2
看看代码吧
生产者
package com.tgb.kwy.publish;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Description
*
* @author kongwy [email protected]
* @version 1.0
* @date 2018-07-09-08 -41
*/
public class EmitLog {
private static final String EXCHANGE_name="logs";
public static void main(String[] args) throws java.io.IOException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/
/*指定用户名和密码*/
factory.setUsername("admin");
factory.setPassword("admin");
try{
Connection connection = factory.newConnection();
Channel channel=connection.createChannel();
//建立一个fanout类型转发器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = getMessage(args);
//指定转发器名字:logs,空字符串表示模式或者匿名的转发器
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}catch (Exception e){
}
}
private static String getMessage(String[] strings){
if(strings.length<1){
return "info: Hello World!";
}
return joinStrings(strings," ");
}
private static String joinStrings(String[] strings,String delimiter){
int length=strings.length;
if(length==0){
return "";
}
StringBuilder words=new StringBuilder(strings[0]);
for (int i = 0; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
消费者1(显示)
package com.tgb.kwy.publish;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Description
*
* @author kongwy [email protected]
* @version 1.0
* @date 2018-07-09-08 -46
*/
public class ReceiveLogs {
private static final String EXCHANGE_name="logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/
/*指定用户名和密码*/
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//建立一个fanout类型转发器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//无参queueDeclare(),创建一个非持久化,独立的,自动删除的队列,且名字是随机生成的
String queueName = channel.queueDeclare().getQueue();
//queueName随机生成, 类似这样的:amq.gen-57B3VSVx-l1jawmOoRBYLw
//日志转发器附加到日志队列上
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
消费者2(记录文件)
package com.tgb.kwy.publish;
import com.rabbitmq.client.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Description
*
* @author kongwy [email protected]
* @version 1.0
* @date 2018-07-09-11 -41
*/
public class ReceiveLogFile {
private static final String EXCHANGE_name="logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/
/*指定用户名和密码*/
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//建立一个fanout类型转发器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//无参queueDeclare(),创建一个非持久化,独立的,自动删除的队列,且名字是随机生成的
String queueName = channel.queueDeclare().getQueue();
//queueName随机生成, 类似这样的:amq.gen-57B3VSVx-l1jawmOoRBYLw
//日志转发器附加到日志队列上
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
printFile(message);
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
private static void printFile(String msg){
try{
String dir=ReceiveLogFile.class.getClassLoader().getResource("").getPath();
String logFileName=new SimpleDateFormat("yyyy-MM-dd").format(new Date());
File file = new File(dir, logFileName + ".log");
FileOutputStream fos = new FileOutputStream(file, true);
fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "").getBytes());
fos.flush();
fos.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
当消费者启动之后,管理页面可以看到,随机队列
注: 运行的时候,先运行消费者,再运行生产者,否则获取不到实例
注:代码来自官网.官网对于概念性的内容,讲解的还是很清楚的