cuikaidong
2025-06-12 44e283b774bb1168d0c17dfe5070a1ca8e2274cd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package org.jeecg.modules.iot.mqtt.config;
 
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jeecg.modules.iot.entity.MqttParameter;
import org.jeecg.modules.iot.mqtt.callback.PushCallback;
import org.jeecg.modules.iot.util.DispatchUtil;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
 
/**
 * mqtt客户端
 *
 * @author: cuikaidong
 * @create: 2023-09-27 11:36
 */
@Slf4j
@Component
public class MqttCustomerClient {
 
    @Resource
    private PushCallback pushCallback;
 
 
    private static MqttClient client;
 
    public static MqttClient getClient() {
        return client;
    }
 
    public static void setClient(MqttClient client) {
        MqttCustomerClient.client = client;
    }
 
    /**
     * 客户端连接
     *
     * @param host     ip+端口
     * @param clientID 客户端Id
     * @param username 用户名
     * @param password 密码
     * @param timeout  超时时间
     * @param keeplive 保留数
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keeplive) {
        MqttClient client;
 
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keeplive);
            MqttCustomerClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param mqttParameter
     */
    public void pushlish(String topic, MqttParameter mqttParameter) {
        pushlish(0, false, topic, mqttParameter);
    }
 
    /**
     * 发布
     *
     * @param qos           连接方式
     * @param retained      是否保留
     * @param topic         主题
     * @param mqttParameter 参数对象
     */
    public void pushlish(int qos, boolean retained, String topic, MqttParameter mqttParameter) {
        topic = "IOT\\" + topic + "\\Mutually1";
        Map<String, String> map = new LinkedHashMap<>();
        map.put("id", mqttParameter.getId());
        map.put("type", mqttParameter.getType());
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        map.put("time", now.format(formatter2));
        map.put("parameter1", mqttParameter.getParameter1());
        map.put("parameter2", mqttParameter.getParameter2());
        map.put("parameter3", mqttParameter.getParameter3());
        map.put("parameter4", mqttParameter.getParameter4());
        map.put("parameter5", mqttParameter.getParameter5());
        Object json = JSONObject.toJSON(map);
        String pushMessage = DispatchUtil.replaceBlank(json.toString());
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic = MqttCustomerClient.getClient().getTopic(topic);
        if (null == mqttTopic) {
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
            // 重连
            pushCallback.connectionLost(null);
        }
    }
 
    /**
     * 订阅某个主题,qos为0
     *
     * @param topic
     */
    public void subscribe(String[] topic) {
        log.info("开始订阅主题" + Arrays.toString(topic));
        try {
            int[] qos = new int[topic.length];
            for (int i = 0; i < topic.length; i++) {
                qos[i] = 2;
            }
            MqttCustomerClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
}