Mqtt搭建代理服务器进行通信-浅析
本文基于Windows系统操作
MQTT简介:
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议!
MQTT消息的主要特点:
使用(publish/subscribe)消息模式,简称p/s模式,即发布/订阅!提供一对多的发送方式!
MQTT根据QoS定义的等级来传输消息:
- level 0:最多一次的传输
消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。
- level 1:至少一次的传输
服务器接收到消息会被确认,通过传输一个PUBACK信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的DUP位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。 如果客户端没有接收到PUBACK信息(无论是应用定义的超时,还是检测到失败然后通讯session重启),客户端都会再次发送PUBLISH信息,并且将DUP位置1。 当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个PUBACK消息。
- level 2: 只有一次的传输
在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。 QoS level 2在消息头有Message ID。
接下来开始我们的表演:
下载代理服务器
本文使用mqtt代理服务器是apache下的apollo代理服务器
下载地址:http://www.apache.org/dyn/closer.cgi?path=activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-unix-distro.tar.gz
创建代理服务器
下载完成然后解压目录
打开dos窗口进入到apache-apollo-1.7.1\bin目录下
执行apollo create testbroker命令创建一个名称为testbroker的代理服务器
下面就是我们创建的代理服务器
启动代理服务器
使用dos进入testbroker目录中的bin目录下
执行apollo-broker run命令启动代理服务器
通过HTTP访问代理服务器
现在我们可以打开浏览器看下我们的代理服务器输入网址http://127.0.0.1:61680/
用户名密码可到配置文件中查看
进入testbroker目录下的etc目录
- users.properties中配置的用户名和密码
默认有个用户名为admin,密码为password的用户
我们也可以自己配置用户
现在就用默认用户登陆
OK登陆成功
接下来我们编写Android客户端
- 首先准备mqtt jar包
- 不容易呀,mqtt这个jar真难找
- https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/
一定保证客户端和服务端以及代理服务器所在的电脑在同一网段下
- 可以在电脑上生成wifi热点,手机客户端连接热点即可
- 接下来直接贴Android客户端代码
MqttService.java
package com.example.jingwc.mqtt_demo; import android.app.Service; import android.content.Intent; import android.os.Binder; import android.os.IBinder; import android.util.Log; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttService extends Service { /** * 代理服务器ip地址 */ public static final String MQTT_BROKER_HOST = "tcp://192.168.1.107:61613"; /** * 客户端唯一标识 */ public static final String MQTT_CLIENT_ID = "android-jingwc"; /** * 订阅标识 */ public static final String MQTT_TOPIC = "jingwc"; /** * 用户名 */ public static final String USERNAME = "admin"; /** * 密码 */ public static final String PASSWORD = "password"; private MqttClient mqttClient; public MqttService() { } @Override public IBinder onBind(Intent intent) { return binder; } /** * 连接mqtt */ public void connect(){ try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 mqttClient = new MqttClient(MQTT_BROKER_HOST,MQTT_CLIENT_ID,new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(USERNAME); // 设置密码 options.setPassword(PASSWORD.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe(MQTT_TOPIC); // 设置回调 mqttClient.setCallback(new MqttCallback() { //连接丢失后,一般在这里面进行重连 @Override public void connectionLost(Throwable throwable) { Log.d("test","connectionLost"); } //subscribe后得到的消息会执行到这里面 @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { Log.d("test","messageArrived"+mqttMessage.toString()); } //publish后会执行到这里 @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { Log.d("test","deliveryComplete"); } }); } catch (MqttException e) { e.printStackTrace(); }catch (Exception e) { e.printStackTrace(); } } /** * 断开连接 */ public void disconnect(){ if(mqttClient != null){ if(mqttClient.isConnected()){ try { mqttClient.disconnect(); mqttClient = null; } catch (MqttException e) { e.printStackTrace(); } } } } private final Binder binder = new MyBinder(); class MyBinder extends Binder{ public MqttService getService(){ return MqttService.this; } } }
MainActivity.java
package com.example.jingwc.mqtt_demo; import android.content.ComponentName; import android.content.Intent; import android.content.ServiceConnection; import android.os.IBinder; import android.support.v7.app.AppCompatActivity; import android.os.Bundle; import android.view.View; import android.widget.Button; import android.widget.EditText; public class MainActivity extends AppCompatActivity { MqttService service = null; private ServiceConnection mConnection = new ServiceConnection() { @Override public void onServiceConnected(ComponentName componentName, IBinder iBinder) { service = ((MqttService.MyBinder)iBinder).getService(); } @Override public void onServiceDisconnected(ComponentName componentName) { service = null; } }; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Button bt_connect = (Button) findViewById(R.id.bt_connect); Button bt_disconnect = (Button) findViewById(R.id.bt_disconnect); bt_connect.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { // 连接 service.connect(); } }); bt_disconnect.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { // 断开连接 service.disconnect(); } }); bindService(new Intent(this,MqttService.class),mConnection,BIND_AUTO_CREATE); } }
服务端代码
- 也可以在写一个android程序当作服务端
- 我这里写的是java项目
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttServer { // 代理服务器ip地址 private static String host = "tcp://192.168.1.107:61613"; private static String userName = "admin"; private static String password = "password"; private static MqttClient client; // 主题 private static MqttTopic topic; private static MqttMessage message; // 订阅标识 private static String topicStr = "jingwc"; public static void main(String[] args) throws MqttException{ client = new MqttClient(host,"java-server-jingwc",new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); topic = client.getTopic(topicStr); message = new MqttMessage(); message.setQos(1); message.setRetained(true); message.setPayload("from server message".getBytes()); client.connect(options); MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("token:"+token.isComplete()); } }
服务端通过代理服务器发送客户端订阅的消息图 ( 个人理解 )
相关推荐
neverstopforcode 2020-06-18
lynxnative 2020-08-16
胡献根 2020-07-05
LandryBean 2020-06-14
xasdfg 2020-04-25
vanturman 2020-04-10
80981934 2020-03-05
xzkjgw 2020-02-03
fengyun 2020-01-18
昭君出塞 2019-12-27
jocleyn 2019-12-02
leodengzx 2019-11-10
forrestou 2019-11-04
veforever 2019-05-05
Dandelionlcp 2019-05-05
ICAOYS的搬砖日常 2019-06-28
wodetian 2019-06-28
Donutsapps 2019-06-26
cumtzdlxm 2019-06-21