package org.jeecg.modules.iot.mqtt.callback; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.util.DateUtils; import org.jeecg.modules.iot.entity.MqttParameter; import org.jeecg.modules.iot.entity.ServerDeploy; import org.jeecg.modules.iot.mqtt.config.MqttConfiguration; import org.jeecg.modules.iot.mqtt.config.MqttCustomerClient; import org.jeecg.modules.iot.service.IServerDeployService; import org.jeecg.modules.message.websocket.WebSocket; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.List; /** * 消费监听 * * @author: cuikaidong * @create: 2023-09-27 11:36 */ @Slf4j @Component public class PushCallback implements MqttCallback { private static MqttClient client; @Lazy @Autowired private MqttConfiguration mqttConfiguration; @Autowired private IServerDeployService serverDeployService; @Autowired private WebSocket webSocket; @Override public void connectionLost(Throwable throwable) { log.error("MQTT连接已断开", throwable); int retryCount = 0; final long baseDelay = 1000; // 初始延迟1秒 final long maxDelay = 60000; // 最大延迟60秒(避免延迟过长) final double backoffFactor = 2.0; // 指数退避因子 while (true) { // 无限循环,直到连接成功 try { // 计算退避延迟,使用Math.min确保不超过最大延迟 long delay = (long) Math.min(baseDelay * Math.pow(backoffFactor, retryCount), maxDelay); if (retryCount > 0) { log.info("第{}次重连尝试将在{}ms后进行", retryCount + 1, delay); Thread.sleep(delay); } log.info("开始第{}次重连尝试", retryCount + 1); MqttCustomerClient mqttCustomerClient = mqttConfiguration.getMqttCustomerClient(); // 尝试连接 mqttCustomerClient.connect( mqttConfiguration.getHost(), mqttConfiguration.getClientId(), mqttConfiguration.getUsername(), mqttConfiguration.getPassword(), mqttConfiguration.getTimeout(), mqttConfiguration.getKeepalive() ); // 订阅主题 log.info("MQTT连接已恢复,正在重新订阅主题"); List serverDeploys = serverDeployService.list(); String[] deviceTopics = serverDeploys.stream() .map(deploy -> "IOT\\" + deploy.getServerCode() + "\\Mutually2") .toArray(String[]::new); if (deviceTopics.length > 0) { mqttCustomerClient.subscribe(deviceTopics); log.info("已重新订阅{}个设备主题", deviceTopics.length); } String[] systemTopics = {"$SYS/brokers/+/clients/#"}; mqttCustomerClient.subscribe(systemTopics); log.info("已重新订阅系统主题"); log.info("MQTT连接已完全恢复,共尝试{}次", retryCount + 1); return; // 连接和订阅成功,退出循环 } catch (Exception e) { retryCount++; log.error("第{}次重连尝试失败: {}", retryCount, e.getMessage(), e); } } } /** * mqtt订阅数据 * * @param topic * @param message * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 订阅数据 // 通过topic区分消息 // 关键修改:显式指定UTF-8编码 String content = new String(message.getPayload(), StandardCharsets.UTF_8); JSONObject jsonObject = JSON.parseObject(content); String clientId = String.valueOf(jsonObject.get("id")); if (clientId.equals("null")) { clientId = String.valueOf(jsonObject.get("clientid")); } ServerDeploy serverDeploy = serverDeployService.findByServerCode(clientId); if (serverDeploy == null) return; // 客户端已掉线 if (topic.endsWith("disconnected")) { // 修改守护和采集连接状态 updateCollectState(topic, serverDeploy); // 客户端已上线 } else if (topic.endsWith("connected")) { // 修改守护和采集连接状态 updateCollectState(topic, serverDeploy); } else if (topic.endsWith("Mutually2")) { receiveCollectMessage(jsonObject, serverDeploy); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { // log.info("deliveryComplete---------" + token.isComplete()); } /** * 修改守护和采集连接状态 * * @param topic * @param serverDeploy */ void updateCollectState(String topic, ServerDeploy serverDeploy) { JSONObject obj = new JSONObject(); obj.put("cmd", "collect");//业务类型 obj.put("id", serverDeploy.getServerCode());//消息id if (topic.endsWith("disconnected")) { serverDeploy.setGuardState(0); serverDeploy.setCollectState(0); obj.put("message1", 0); obj.put("message", 0); } else { serverDeploy.setGuardState(1); obj.put("message1", 1); } webSocket.sendAllMessage(obj.toJSONString()); serverDeployService.updateById(serverDeploy); } /** * 采集消息解析 * * @param jsonObject */ void receiveCollectMessage(JSONObject jsonObject, ServerDeploy serverDeploy) { String clientId = String.valueOf(jsonObject.get("id")); String type = String.valueOf(jsonObject.get("type")); String time = String.valueOf(jsonObject.get("time")); MqttParameter mqttParameter = new MqttParameter(); String parameter1 = String.valueOf(jsonObject.get("parameter1")); String parameter2 = String.valueOf(jsonObject.get("parameter2")); String parameter3 = String.valueOf(jsonObject.get("parameter3")); String parameter4 = String.valueOf(jsonObject.get("parameter4")); String parameter5 = String.valueOf(jsonObject.get("parameter5")); JSONObject obj = new JSONObject(); obj.put("cmd", type);//业务类型 obj.put("id", clientId);//消息id switch (type) { // 心跳接收 case "palpitate": // log.info("心跳状态接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp()); break; case "version": // 展示采集软件配置文件版本 ,修改采集软件版本号 if (parameter1 != null && !parameter1.equals("null") && !parameter1.equals("0")) { obj.put("state", parameter1); // 采集软件 serverDeploy.setNewCollectVersion(parameter1); } if (parameter2 != null && !parameter2.equals("null") && !parameter2.equals("0")) { obj.put("message", parameter2); // 配置文件 serverDeploy.setNewDeployVersion(parameter2); } serverDeployService.updateById(serverDeploy); //采集软件消息内容 //配置文件消息内容 log.info("守护服务连接成功! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp()); break; case "log": // 日志 obj.put("state", parameter4);//消息内容 obj.put("message", parameter5);//消息内容 log.info("日志上传接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp()); break; case "start": // 启动采集软件 obj.put("state", parameter2); obj.put("message", parameter3); int collectState = Integer.parseInt(parameter1); serverDeploy.setCollectState(collectState); if (parameter2.equals("1")) { serverDeployService.updateById(serverDeploy); } log.info("启动采集软件接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp()); break; case "collect": // 采集状态 obj.put("message", parameter1); if (!serverDeploy.getCollectState().toString().equals(parameter1)) { serverDeploy.setCollectState(Integer.getInteger(parameter1)); serverDeployService.updateById(serverDeploy); } log.info("心跳♥采集状态接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp()); break; default: return; } if (obj.size() > 2) { webSocket.sendAllMessage(obj.toJSONString()); } } }