cuikaidong
2025-06-12 44e283b774bb1168d0c17dfe5070a1ca8e2274cd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package org.jeecg.modules.iot.mqtt.config;
 
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.jeecg.modules.iot.entity.MqttClient;
import org.jeecg.modules.iot.entity.ServerDeploy;
import org.jeecg.modules.iot.service.IServerDeployService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
 
import java.io.IOException;
import java.util.List;
 
/**
 * @author: cuikaidong
 * @create: 2023-09-27 11:36
 */
@Component
@Configuration
@Data
public class MqttConfiguration {
 
    @Autowired
    private MqttCustomerClient mqttCustomerClient;
    @Autowired
    private IServerDeployService serverDeployService;
 
    @Value("${mqtt.host}")
    private String host;
    @Value("${mqtt.clientId}")
    private String clientId;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;
    private String[] topic;
    @Value("${mqtt.timeout}")
    private int timeout;
    @Value("${mqtt.keepalive}")
    private int keepalive;
    @Value("${mqtt.apiKey}")
    private String apiKey;
    @Value("${mqtt.secretKey}")
    private String secretKey;
    @Value("${mqtt.clientUrl}")
    private String clientUrl;
 
    @Bean
    public MqttCustomerClient getMqttCustomerClient() {
        mqttCustomerClient.connect(host, clientId, username, password, timeout, keepalive);
        // 以/#结尾表示订阅所有以test开头的主题
        List<ServerDeploy> serverDeploys = serverDeployService.list();
        for (ServerDeploy serverDeploy : serverDeploys) {
            serverDeploy.setServerCode("IOT\\" + serverDeploy.getServerCode() + "\\Mutually2");
        }
        ServerDeploy serverDeploy = new ServerDeploy();
        serverDeploy.setServerCode("$SYS/brokers/+/clients/#");
        serverDeploys.add(serverDeploy);
        mqttCustomerClient.subscribe(serverDeploys.stream().map(ServerDeploy::getServerCode).distinct()
                .toArray(String[]::new));
        // 查询客户端列表
        try {
            OkHttpClient client = new OkHttpClient();
            Request request = new Request.Builder()
                    .url(clientUrl)
                    .header("Content-Type", "application/json")
                    .header("Authorization", Credentials.basic(apiKey, secretKey))
                    .build();
            Response response = client.newCall(request).execute();
            String json = response.body().string();
            JSONObject jsonObject = JSON.parseObject(json);
            String data = String.valueOf(jsonObject.get("data"));
            List<MqttClient> studentList = JSONObject.parseArray(data, MqttClient.class);
            serverDeployService.updateServerGuardState(studentList);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return mqttCustomerClient;
    }
 
}