cuikaidong
2025-06-12 066063ed92fdd40da4dfe21770557f3adba3e1af
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package org.jeecg.modules.iot.mqtt.callback;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecg.common.util.DateUtils;
import org.jeecg.modules.iot.entity.MqttParameter;
import org.jeecg.modules.iot.entity.ServerDeploy;
import org.jeecg.modules.iot.mqtt.config.MqttConfiguration;
import org.jeecg.modules.iot.mqtt.config.MqttCustomerClient;
import org.jeecg.modules.iot.service.IServerDeployService;
import org.jeecg.modules.message.websocket.WebSocket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
 
 
/**
 * 消费监听
 *
 * @author: cuikaidong
 * @create: 2023-09-27 11:36
 */
@Slf4j
@Component
public class PushCallback implements MqttCallback {
 
    private static MqttClient client;
    @Lazy
    @Autowired
    private MqttConfiguration mqttConfiguration;
    @Autowired
    private IServerDeployService serverDeployService;
    @Autowired
    private WebSocket webSocket;
 
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("MQTT连接已断开", throwable);
 
        int retryCount = 0;
        final long baseDelay = 1000;     // 初始延迟1秒
        final long maxDelay = 60000;     // 最大延迟60秒(避免延迟过长)
        final double backoffFactor = 2.0; // 指数退避因子
 
        while (true) { // 无限循环,直到连接成功
            try {
                // 计算退避延迟,使用Math.min确保不超过最大延迟
                long delay = (long) Math.min(baseDelay * Math.pow(backoffFactor, retryCount), maxDelay);
                if (retryCount > 0) {
                    log.info("第{}次重连尝试将在{}ms后进行", retryCount + 1, delay);
                    Thread.sleep(delay);
                }
 
                log.info("开始第{}次重连尝试", retryCount + 1);
 
                MqttCustomerClient mqttCustomerClient = mqttConfiguration.getMqttCustomerClient();
 
                // 尝试连接
                mqttCustomerClient.connect(
                        mqttConfiguration.getHost(),
                        mqttConfiguration.getClientId(),
                        mqttConfiguration.getUsername(),
                        mqttConfiguration.getPassword(),
                        mqttConfiguration.getTimeout(),
                        mqttConfiguration.getKeepalive()
                );
 
                // 订阅主题
                log.info("MQTT连接已恢复,正在重新订阅主题");
 
                List<ServerDeploy> serverDeploys = serverDeployService.list();
                String[] deviceTopics = serverDeploys.stream()
                        .map(deploy -> "IOT\\" + deploy.getServerCode() + "\\Mutually2")
                        .toArray(String[]::new);
 
                if (deviceTopics.length > 0) {
                    mqttCustomerClient.subscribe(deviceTopics);
                    log.info("已重新订阅{}个设备主题", deviceTopics.length);
                }
 
                String[] systemTopics = {"$SYS/brokers/+/clients/#"};
                mqttCustomerClient.subscribe(systemTopics);
                log.info("已重新订阅系统主题");
 
                log.info("MQTT连接已完全恢复,共尝试{}次", retryCount + 1);
                return; // 连接和订阅成功,退出循环
            } catch (Exception e) {
                retryCount++;
                log.error("第{}次重连尝试失败: {}", retryCount, e.getMessage(), e);
            }
        }
    }
    /**
     * mqtt订阅数据
     *
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // 订阅数据
        // 通过topic区分消息
        // 关键修改:显式指定UTF-8编码
        String content = new String(message.getPayload(), StandardCharsets.UTF_8);
        JSONObject jsonObject = JSON.parseObject(content);
        String clientId = String.valueOf(jsonObject.get("id"));
        if (clientId.equals("null")) {
            clientId = String.valueOf(jsonObject.get("clientid"));
        }
        ServerDeploy serverDeploy = serverDeployService.findByServerCode(clientId);
        if (serverDeploy == null) return;
        // 客户端已掉线
        if (topic.endsWith("disconnected")) {
            // 修改守护和采集连接状态
            updateCollectState(topic, serverDeploy);
            // 客户端已上线
        } else if (topic.endsWith("connected")) {
            // 修改守护和采集连接状态
            updateCollectState(topic, serverDeploy);
        } else if (topic.endsWith("Mutually2")) {
            receiveCollectMessage(jsonObject, serverDeploy);
        }
    }
 
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
//        log.info("deliveryComplete---------" + token.isComplete());
    }
 
    /**
     * 修改守护和采集连接状态
     *
     * @param topic
     * @param serverDeploy
     */
    void updateCollectState(String topic, ServerDeploy serverDeploy) {
        JSONObject obj = new JSONObject();
        obj.put("cmd", "collect");//业务类型
        obj.put("id", serverDeploy.getServerCode());//消息id
        if (topic.endsWith("disconnected")) {
            serverDeploy.setGuardState(0);
            serverDeploy.setCollectState(0);
            obj.put("message1", 0);
            obj.put("message", 0);
        } else {
            serverDeploy.setGuardState(1);
            obj.put("message1", 1);
        }
        webSocket.sendAllMessage(obj.toJSONString());
        serverDeployService.updateById(serverDeploy);
    }
 
    /**
     * 采集消息解析
     *
     * @param jsonObject
     */
    void receiveCollectMessage(JSONObject jsonObject, ServerDeploy serverDeploy) {
        String clientId = String.valueOf(jsonObject.get("id"));
        String type = String.valueOf(jsonObject.get("type"));
        String time = String.valueOf(jsonObject.get("time"));
        MqttParameter mqttParameter = new MqttParameter();
        String parameter1 = String.valueOf(jsonObject.get("parameter1"));
        String parameter2 = String.valueOf(jsonObject.get("parameter2"));
        String parameter3 = String.valueOf(jsonObject.get("parameter3"));
        String parameter4 = String.valueOf(jsonObject.get("parameter4"));
        String parameter5 = String.valueOf(jsonObject.get("parameter5"));
        JSONObject obj = new JSONObject();
        obj.put("cmd", type);//业务类型
        obj.put("id", clientId);//消息id
        switch (type) {
            // 心跳接收
            case "palpitate":
//                log.info("心跳状态接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
                break;
            case "version":
                // 展示采集软件配置文件版本 ,修改采集软件版本号
                if (parameter1 != null && !parameter1.equals("null") && !parameter1.equals("0")) {
                    obj.put("state", parameter1);
                    // 采集软件
                    serverDeploy.setNewCollectVersion(parameter1);
                }
                if (parameter2 != null && !parameter2.equals("null") && !parameter2.equals("0")) {
                    obj.put("message", parameter2);
                    // 配置文件
                    serverDeploy.setNewDeployVersion(parameter2);
                }
                serverDeployService.updateById(serverDeploy);
                //采集软件消息内容
                //配置文件消息内容
                log.info("守护服务连接成功! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
                break;
            case "log":
                // 日志
                obj.put("state", parameter4);//消息内容
                obj.put("message", parameter5);//消息内容
                log.info("日志上传接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
                break;
            case "start":
                // 启动采集软件
                obj.put("state", parameter2);
                obj.put("message", parameter3);
                int collectState = Integer.parseInt(parameter1);
                serverDeploy.setCollectState(collectState);
                if (parameter2.equals("1")) {
                    serverDeployService.updateById(serverDeploy);
                }
                log.info("启动采集软件接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
                break;
            case "collect":
                // 采集状态
                obj.put("message", parameter1);
                if (!serverDeploy.getCollectState().toString().equals(parameter1)) {
                    serverDeploy.setCollectState(Integer.getInteger(parameter1));
                    serverDeployService.updateById(serverDeploy);
                }
                log.info("心跳♥采集状态接收! 服务器名称:" + serverDeploy.getServerName() + " 时间:" + DateUtils.getTimestamp());
                break;
            default:
                return;
        }
        if (obj.size() > 2) {
            webSocket.sendAllMessage(obj.toJSONString());
        }
    }
}