MQTT服务器的搭建与测试pub/sub通信过程
MQTT是一个即时通讯协议,采用轻量级发布和订阅消息传输机制。专门设计用于低带宽或者高昂的网络费用的通信过程中。以及提供三种不同质量的消息服务:
- 1.”至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- 2.”至少一次”,确保消息到达,但消息重复可能会发生。
- 3.”只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
对于实现了MQTT协议的消息代理软件有众多。Mosquitto,npm社区的mosca,Apache社区的ActivityMQ等等
分别尝试了这三种的搭建过程,最Mosquitto容易上手。
安装Mosquitto
#下载源代码包 wget http://mosquitto.org/files/source/mosquitto-1.4.5.tar.gz # 解压 tar zxfv mosquitto-1.4.5.tar.gz # 进入目录 cd mosquitto-1.4.5 # 编译 make # 安装 sudo make install
运行Mosquitto
Mosquitto的配置文件存放在/etc/mosquitto/mosquitto.conf
配置文件具体的配置内容为:
# ================================================================= # General configuration # ================================================================= # 客户端心跳的间隔时间 #retry_interval 20 # 系统状态的刷新时间 #sys_interval 10 # 系统资源的回收时间,0表示尽快处理 #store_clean_interval 10 # 服务进程的PID #pid_file /var/run/mosquitto.pid # 服务进程的系统用户 #user mosquitto # 客户端心跳消息的最大并发数 #max_inflight_messages 10 # 客户端心跳消息缓存队列 #max_queued_messages 100 # 用于设置客户端长连接的过期时间,默认永不过期 #persistent_client_expiration # ================================================================= # Default listener # ================================================================= # 服务绑定的IP地址 #bind_address # 服务绑定的端口号 #port 1883 # 允许的最大连接数,-1表示没有限制 #max_connections -1 # cafile:CA证书文件 # capath:CA证书目录 # certfile:PEM证书文件 # keyfile:PEM密钥文件 #cafile #capath #certfile #keyfile # 必须提供证书以保证数据安全性 #require_certificate false # 若require_certificate值为true,use_identity_as_username也必须为true #use_identity_as_username false # 启用PSK(Pre-shared-key)支持 #psk_hint # SSL/TSL加密算法,可以使用“openssl ciphers”命令获取 # as the output of that command. #ciphers # ================================================================= # Persistence # ================================================================= # 消息自动保存的间隔时间 #autosave_interval 1800 # 消息自动保存功能的开关 #autosave_on_changes false # 持久化功能的开关 persistence true # 持久化DB文件 #persistence_file mosquitto.db # 持久化DB文件目录 #persistence_location /var/lib/mosquitto/ # ================================================================= # Logging # ================================================================= # 4种日志模式:stdout、stderr、syslog、topic # none 则表示不记日志,此配置可以提升些许性能 log_dest none # 选择日志的级别(可设置多项) #log_type error #log_type warning #log_type notice #log_type information # 是否记录客户端连接信息 #connection_messages true # 是否记录日志时间 #log_timestamp true # ================================================================= # Security # ================================================================= # 客户端ID的前缀限制,可用于保证安全性 #clientid_prefixes # 允许匿名用户 #allow_anonymous true # 用户/密码文件,默认格式:username:password #password_file # PSK格式密码文件,默认格式:identity:key #psk_file # pattern write sensor/%u/data # ACL权限配置,常用语法如下: # 用户限制:user <username> # 话题限制:topic [read|write] <topic> # 正则限制:pattern write sensor/%u/data #acl_file # ================================================================= # Bridges # ================================================================= # 允许服务之间使用“桥接”模式(可用于分布式部署) #connection <name> #address <host>[:<port>] #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] # 设置桥接的客户端ID #clientid # 桥接断开时,是否清除远程服务器中的消息 #cleansession false # 是否发布桥接的状态信息 #notifications true # 设置桥接模式下,消息将会发布到的话题地址 # $SYS/broker/connection/<clientid>/state #notification_topic # 设置桥接的keepalive数值 #keepalive_interval 60 # 桥接模式,目前有三种:automatic、lazy、once #start_type automatic # 桥接模式automatic的超时时间 #restart_timeout 30 # 桥接模式lazy的超时时间 #idle_timeout 60 # 桥接客户端的用户名 #username # 桥接客户端的密码 #password # bridge_cafile:桥接客户端的CA证书文件 # bridge_capath:桥接客户端的CA证书目录 # bridge_certfile:桥接客户端的PEM证书文件 # bridge_keyfile:桥接客户端的PEM密钥文件 #bridge_cafile #bridge_capath #bridge_certfile #bridge_keyfile # 自己的配置可以放到以下目录中 include_dir /etc/mosquitto/conf.d
启动Mosquitto服务:
mosquitto -c /etc/mosquitto/mosquitto.conf -d (MQTT协议使用1883端口,查看该端口验��是否启动成功)
java使用MQTT的订阅发布
使用 https://github.com/fusesource/mqtt-client 来实现java客户端的调用。
Maven依赖为:
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.12</version> </dependency>
订阅(Sub)端HelloWorld:
订阅一个名为foo的主题,消息级别为 AT_LEAST_ONCE
MQTT mqtt = new MQTT(); try { mqtt.setHost("tcp://192.168.2.112:1883"); System.out.println("start"); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); System.out.println(connection == null); connection.subscribe(new Topic[] { new Topic("foo", QoS.AT_LEAST_ONCE) }); while (true) { Message message = connection.receive(); System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic() + " context :" + String.valueOf(message.getPayloadBuffer())); } } catch (Exception e) { } System.out.println("end");
发布(pub)端:
给一个名为foo的主题推送消息,消息级别为 AT_LEAST_ONCE
MQTT mqtt = new MQTT(); try { mqtt.setHost("tcp://192.168.2.112:1883"); System.out.println("start"); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); System.out.println(connection == null); for (int i = 0; i <100000; i++) { connection.publish("foo", "HelloWQEQWEQ".getBytes(), QoS.AT_LEAST_ONCE, false); } } catch (Exception e) { } System.out.println("end");
通过运行便可以看到消息成功pub到sub端。
至于连接类型,该客户端提供了三种
- BlockingConnection 阻塞式
- CallbackConnection 回调函数式
- FutureConnection 异步式
java客户端还能制定更多的通讯细节
// 连接前清空会话信息 ,若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true mqtt.setCleanSession(CLEAN_START); // 设置心跳时间 // ,定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待 mqtt.setKeepAlive(KEEP_ALIVE); // 设置客户端id,用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。 // 此ID应少于23个字符,默认根据本机地址、端口和时间自动生成 mqtt.setClientId(CLIENT_ID); //设置“遗嘱”消息的内容,默认是长度为零的消息 mqtt.setWillMessage("willMessage"); * //设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE * mqtt.setWillQos(QoS.AT_LEAST_ONCE); * //若想要在发布“遗嘱”消息时拥有retain选项,则为true mqtt.setWillRetain(true); * //设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 * mqtt.setWillTopic("willTopic"); */ // ==失败重连接设置说明 // 设置重新连接的次数 // ,客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); // 设置重连的间隔时间 ,首次重连接间隔毫秒数,默认为10ms mqtt.setReconnectDelay(RECONNECTION_DELAY); // 客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1 // mqtt.setConnectAttemptsMax(10L);
MQTT提供的pub/sub确实比redis的pub/sub机制强大些。
相关推荐
neverstopforcode 2020-06-18
拓网科技 2020-11-23
85433664 2020-11-17
拓网科技 2020-11-13
mspgqrs 2020-10-19
xiaotutu0000 2020-10-15
kjyiyi 2020-10-10
大白机器人 2020-09-30
lifan0 2020-09-25
kunyus 2020-09-25
移动互联技术酒歌 2020-09-18
何砝 2020-09-16
anyvip 2020-09-15
zrhCSDN 2020-09-11
myCat 2020-09-09
lantingyue 2020-08-15
SanBa 2020-08-14
hiarxiaoliang 2020-08-05
urmsone 2020-08-03