apollo mqtt client instance
packagecom.intbull.mqtt.client;
importjava.awt.Container;
importjava.awt.event.ActionEvent;
importjava.awt.event.ActionListener;
importjavax.swing.JButton;
importjavax.swing.JEditorPane;
importjavax.swing.JFrame;
importjavax.swing.JPanel;
importjavax.swing.JTextArea;
importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
importorg.eclipse.paho.client.mqttv3.MqttCallback;
importorg.eclipse.paho.client.mqttv3.MqttClient;
importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;
importorg.eclipse.paho.client.mqttv3.MqttDeliveryToken;
importorg.eclipse.paho.client.mqttv3.MqttException;
importorg.eclipse.paho.client.mqttv3.MqttMessage;
importorg.eclipse.paho.client.mqttv3.MqttTopic;
importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
publicclassServerextendsJFrame{
privatestaticfinallongserialVersionUID=1L;
privateJPanelpanelNorth,panelSouth;
privateJButtonbutton,buttonSend;
privateJEditorPaneeditUserName,editPassword,editTopic,editPublish,editMessage;
privateJTextAreaarea;
privateMqttClientclient;
privateStringhost="tcp://127.0.0.1:61613";
//privateStringhost="tcp://localhost:1883";
publicServer(){
Containercontainer=this.getContentPane();
panelNorth=newJPanel();
button=newJButton("连接&绑定主题");
button.addActionListener(newActionListener(){
@Override
publicvoidactionPerformed(ActionEventae){
connect(editUserName.getText(),editPassword.getText(),editTopic.getText());
}
});
panelNorth.add(button);
editUserName=newJEditorPane();
editUserName.setText("用户名");
panelNorth.add(editUserName);
editPassword=newJEditorPane();
editPassword.setText("密码");
panelNorth.add(editPassword);
editTopic=newJEditorPane();
editTopic.setText("订阅主题");
panelNorth.add(editTopic);
editPublish=newJEditorPane();
editPublish.setText("发布主题");
panelNorth.add(editPublish);
container.add(panelNorth,"North");
panelSouth=newJPanel();
editMessage=newJEditorPane();
buttonSend=newJButton("发送");
buttonSend.addActionListener(newActionListener(){
@Override
publicvoidactionPerformed(ActionEventae){
try{
Stringcontent=editMessage.getText();
if(content!=null){
MqttTopictopic=client.getTopic(editPublish.getText());
MqttMessagemessage=newMqttMessage();
message.setQos(1);
message.setRetained(true);
System.out.println(message.isRetained()+"------ratained状态");
message.setPayload(content.getBytes());
MqttDeliveryTokentoken=topic.publish(message);
token.waitForCompletion();
System.out.println(token.isComplete()+"========");
}
}catch(Exceptione){
e.printStackTrace();
}
}
});
panelSouth.add(editMessage);
panelSouth.add(buttonSend);
container.add(panelSouth,"South");
area=newJTextArea();
area.setText("record");
container.add(area,"Center");
}
privatevoidconnect(StringuserName,StringpassWord,Stringtopic){
MqttConnectOptionsoptions=newMqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
//设置超时时间
options.setConnectionTimeout(10);
//设置会话心跳时间
options.setKeepAliveInterval(20);
try{
client=newMqttClient(host,userName,
newMemoryPersistence());
client.setCallback(newMqttCallback(){
@Override
publicvoidconnectionLost(Throwablecause){
System.out.println("connectionLost-----------");
area.append("\nconnectionLost-----------");
}
@Override
publicvoiddeliveryComplete(IMqttDeliveryTokentoken){
System.out.println("deliveryComplete---------"+token.isComplete());
area.append("\ndeliveryComplete---------"+token.isComplete());
}
@Override
publicvoidmessageArrived(Stringtopic,MqttMessagearg1)
throwsException{
System.out.println("messageArrived----------"+topic+"----------"+arg1.toString());
area.append("\nmessageArrived----------"+topic+"----------"+arg1.toString());
}
});
client.connect(options);
client.subscribe(topic,1);
}catch(Exceptione){
e.printStackTrace();
}
}
publicstaticvoidmain(String[]args){
Servers=newServer();
s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
s.setSize(600,370);
s.setLocationRelativeTo(null);
s.setVisible(true);
}
}