当前位置: 代码迷 >> 综合 >> java+mqtt 实现异步回调
  详细解决方案

java+mqtt 实现异步回调

热度:4   发布时间:2023-12-03 02:52:09.0

一、controller

 @RequestMapping(value = "/updateNetWork",method = RequestMethod.POST)@ResponseBodypublic Object updateNetWork(String device,String ssid,String password,String host,String port,String mqttUser,String mqttPassword) throws MqttException, InterruptedException {Map<String,Object> map = new HashMap<>();JSONObject jsonObject = new JSONObject();StringBuffer spg = new StringBuffer();spg.append(topic).append("/").append(device);jsonObject.put("device",Integer.parseInt(device));jsonObject.put("funcType","network_config");jsonObject.put("wifi_ssid",ssid);jsonObject.put("wifi_password",password);jsonObject.put("server_host",host);jsonObject.put("server_port",Integer.parseInt(port));jsonObject.put("mqtt_user",mqttUser);jsonObject.put("mqtt_password",mqttPassword);MqttConnectionUtils.publish(spg.toString(), JSONArray.toJSONString(jsonObject));PushCallback.pushCallback.listens(new Listen() {@Overridepublic void netSuccess(String success) {if(success != null){UploadQuestionController.success = success;}}});int i = 3;while (i>0){i--;Thread.sleep(1000);if(!success .equals("")){break;}}map.put("success",success);return map;}public interface Listen{void netSuccess(String success);}

二、发送类

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zskx.deanswerquipment.controller.UploadQuestionController;
import com.zskx.deanswerquipment.dao.ApplicationRedis;
import com.zskx.deanswerquipment.dao.CommitUserInfo;
import com.zskx.deanswerquipment.dao.UserInfoDao;
import com.zskx.deanswerquipment.pojo.Equipmentinfo;
import com.zskx.deanswerquipment.service.JudgeEquipmentService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.*;public class PushCallback implements MqttCallback{private static final Logger LOG = LogManager.getLogger(PushCallback.class);public static PushCallback pushCallback;@Autowiredprivate UserInfoDao userInfoDao;  // 数据库CRUD接口@Autowiredprivate CommitUserInfo commitUserInfol;private UploadQuestionController.Listen listen;public PushCallback(UserInfoDao userInfoDao,ApplicationRedis applicationRedis,JudgeEquipmentService judgeEquipmentService, CommitUserInfo commitUserInfol){this.userInfoDao = userInfoDao;this.applicationRedis = applicationRedis;this.judgeEquipmentService = judgeEquipmentService;this.commitUserInfol = commitUserInfol;pushCallback=this;}@Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后,一般在这里面进行重连System.out.println("WIFI版======连接断开,可以做重连");MqttConnectionUtils.reConnect();}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面String messages = new String(message.getPayload());if(!messages.equals("close")){System.out.println("接收消息主题 : " + topic);System.out.println("接收消息Qos : " + message.getQos());System.out.println("接收消息内容 : " + new String(message.getPayload()));try {JSONObject json = JsonUtil.StringtoJson(new String(message.getPayload()));if(json.get("funcType") != null){}}catch (Exception e){}}}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}public  void listens(UploadQuestionController.Listen listens){this.listen=listens;}}

采用的是java观察者模式  用于监听一个变量发生变化后立即返回状态给前端。我这里做了三秒的延迟!!用于等待响应

  相关解决方案