package org.jeecg.modules.iot.mqtt.callback;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.jeecg.common.util.DateUtils;
|
import org.jeecg.modules.iot.entity.MqttParameter;
|
import org.jeecg.modules.iot.entity.ServerDeploy;
|
import org.jeecg.modules.iot.mqtt.config.MqttConfiguration;
|
import org.jeecg.modules.iot.mqtt.config.MqttCustomerClient;
|
import org.jeecg.modules.iot.service.IServerDeployService;
|
import org.jeecg.modules.message.websocket.WebSocket;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Lazy;
|
import org.springframework.stereotype.Component;
|
|
import java.nio.charset.StandardCharsets;
|
import java.util.List;
|
|
|
/**
|
* 消费监听
|
*
|
* @author: cuikaidong
|
* @create: 2023-09-27 11:36
|
*/
|
@Slf4j
|
@Component
|
public class PushCallback implements MqttCallback {
|
|
private static MqttClient client;
|
@Lazy
|
@Autowired
|
private MqttConfiguration mqttConfiguration;
|
@Autowired
|
private IServerDeployService serverDeployService;
|
@Autowired
|
private WebSocket webSocket;
|
|
@Override
|
public void connectionLost(Throwable throwable) {
|
log.error("MQTT连接已断开", throwable);
|
|
int retryCount = 0;
|
final long baseDelay = 1000; // 初始延迟1秒
|
final long maxDelay = 60000; // 最大延迟60秒(避免延迟过长)
|
final double backoffFactor = 2.0; // 指数退避因子
|
|
while (true) { // 无限循环,直到连接成功
|
try {
|
// 计算退避延迟,使用Math.min确保不超过最大延迟
|
long delay = (long) Math.min(baseDelay * Math.pow(backoffFactor, retryCount), maxDelay);
|
if (retryCount > 0) {
|
log.info("第{}次重连尝试将在{}ms后进行", retryCount + 1, delay);
|
Thread.sleep(delay);
|
}
|
|
log.info("开始第{}次重连尝试", retryCount + 1);
|
|
MqttCustomerClient mqttCustomerClient = mqttConfiguration.getMqttCustomerClient();
|
|
// 尝试连接
|
mqttCustomerClient.connect(
|
mqttConfiguration.getHost(),
|
mqttConfiguration.getClientId(),
|
mqttConfiguration.getUsername(),
|
mqttConfiguration.getPassword(),
|
mqttConfiguration.getTimeout(),
|
mqttConfiguration.getKeepalive()
|
);
|
|
// 订阅主题
|
log.info("MQTT连接已恢复,正在重新订阅主题");
|
|
List<ServerDeploy> serverDeploys = serverDeployService.list();
|
String[] deviceTopics = serverDeploys.stream()
|
.map(deploy -> "IOT\\" + deploy.getServerCode() + "\\Mutually2")
|
.toArray(String[]::new);
|
|
if (deviceTopics.length > 0) {
|
mqttCustomerClient.subscribe(deviceTopics);
|
log.info("已重新订阅{}个设备主题", deviceTopics.length);
|
}
|
|
String[] systemTopics = {"$SYS/brokers/+/clients/#"};
|
mqttCustomerClient.subscribe(systemTopics);
|
log.info("已重新订阅系统主题");
|
|
log.info("MQTT连接已完全恢复,共尝试{}次", retryCount + 1);
|
return; // 连接和订阅成功,退出循环
|
} catch (Exception e) {
|
retryCount++;
|
log.error("第{}次重连尝试失败: {}", retryCount, e.getMessage(), e);
|
}
|
}
|
}
|
/**
|
* mqtt订阅数据
|
*
|
* @param topic
|
* @param message
|
* @throws Exception
|
*/
|
@Override
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
// 订阅数据
|
// 通过topic区分消息
|
// 关键修改:显式指定UTF-8编码
|
String content = new String(message.getPayload(), StandardCharsets.UTF_8);
|
JSONObject jsonObject = JSON.parseObject(content);
|
String clientId = String.valueOf(jsonObject.get("id"));
|
if (clientId.equals("null")) {
|
clientId = String.valueOf(jsonObject.get("clientid"));
|
}
|
ServerDeploy serverDeploy = serverDeployService.findByServerCode(clientId);
|
if (serverDeploy == null) return;
|
// 客户端已掉线
|
if (topic.endsWith("disconnected")) {
|
// 修改守护和采集连接状态
|
updateCollectState(topic, serverDeploy);
|
// 客户端已上线
|
} else if (topic.endsWith("connected")) {
|
// 修改守护和采集连接状态
|
updateCollectState(topic, serverDeploy);
|
} else if (topic.endsWith("Mutually2")) {
|
receiveCollectMessage(jsonObject, serverDeploy);
|
}
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
// log.info("deliveryComplete---------" + token.isComplete());
|
}
|
|
/**
|
* 修改守护和采集连接状态
|
*
|
* @param topic
|
* @param serverDeploy
|
*/
|
void updateCollectState(String topic, ServerDeploy serverDeploy) {
|
JSONObject obj = new JSONObject();
|
obj.put("cmd", "collect");//业务类型
|
obj.put("id", serverDeploy.getServerCode());//消息id
|
if (topic.endsWith("disconnected")) {
|
serverDeploy.setGuardState(0);
|
serverDeploy.setCollectState(0);
|
obj.put("message1", 0);
|
obj.put("message", 0);
|
} else {
|
serverDeploy.setGuardState(1);
|
obj.put("message1", 1);
|
}
|
webSocket.sendAllMessage(obj.toJSONString());
|
serverDeployService.updateById(serverDeploy);
|
}
|
|
/**
|
* 采集消息解析
|
*
|
* @param jsonObject
|
*/
|
void receiveCollectMessage(JSONObject jsonObject, ServerDeploy serverDeploy) {
|
String clientId = String.valueOf(jsonObject.get("id"));
|
String type = String.valueOf(jsonObject.get("type"));
|
String time = String.valueOf(jsonObject.get("time"));
|
MqttParameter mqttParameter = new MqttParameter();
|
String parameter1 = String.valueOf(jsonObject.get("parameter1"));
|
String parameter2 = String.valueOf(jsonObject.get("parameter2"));
|
String parameter3 = String.valueOf(jsonObject.get("parameter3"));
|
String parameter4 = String.valueOf(jsonObject.get("parameter4"));
|
String parameter5 = String.valueOf(jsonObject.get("parameter5"));
|
JSONObject obj = new JSONObject();
|
obj.put("cmd", type);//业务类型
|
obj.put("id", clientId);//消息id
|
switch (type) {
|
// 心跳接收
|
case "palpitate":
|
// log.info("心跳状态接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
|
break;
|
case "version":
|
// 展示采集软件配置文件版本 ,修改采集软件版本号
|
if (parameter1 != null && !parameter1.equals("null") && !parameter1.equals("0")) {
|
obj.put("state", parameter1);
|
// 采集软件
|
serverDeploy.setNewCollectVersion(parameter1);
|
}
|
if (parameter2 != null && !parameter2.equals("null") && !parameter2.equals("0")) {
|
obj.put("message", parameter2);
|
// 配置文件
|
serverDeploy.setNewDeployVersion(parameter2);
|
}
|
serverDeployService.updateById(serverDeploy);
|
//采集软件消息内容
|
//配置文件消息内容
|
log.info("守护服务连接成功! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
|
break;
|
case "log":
|
// 日志
|
obj.put("state", parameter4);//消息内容
|
obj.put("message", parameter5);//消息内容
|
log.info("日志上传接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
|
break;
|
case "start":
|
// 启动采集软件
|
obj.put("state", parameter2);
|
obj.put("message", parameter3);
|
int collectState = Integer.parseInt(parameter1);
|
serverDeploy.setCollectState(collectState);
|
if (parameter2.equals("1")) {
|
serverDeployService.updateById(serverDeploy);
|
}
|
log.info("启动采集软件接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
|
break;
|
case "collect":
|
// 采集状态
|
obj.put("message", parameter1);
|
if (!serverDeploy.getCollectState().toString().equals(parameter1)) {
|
serverDeploy.setCollectState(Integer.getInteger(parameter1));
|
serverDeployService.updateById(serverDeploy);
|
}
|
log.info("心跳♥采集状态接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
|
break;
|
default:
|
return;
|
}
|
if (obj.size() > 2) {
|
webSocket.sendAllMessage(obj.toJSONString());
|
}
|
}
|
}
|