package org.jeecg.modules.iot.mqtt.config;
|
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import lombok.Data;
|
import okhttp3.Credentials;
|
import okhttp3.OkHttpClient;
|
import okhttp3.Request;
|
import okhttp3.Response;
|
import org.jeecg.modules.iot.entity.MqttClient;
|
import org.jeecg.modules.iot.entity.ServerDeploy;
|
import org.jeecg.modules.iot.service.IServerDeployService;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.stereotype.Component;
|
|
import java.io.IOException;
|
import java.util.List;
|
|
/**
|
* @author: cuikaidong
|
* @create: 2023-09-27 11:36
|
*/
|
@Component
|
@Configuration
|
@Data
|
public class MqttConfiguration {
|
|
@Autowired
|
private MqttCustomerClient mqttCustomerClient;
|
@Autowired
|
private IServerDeployService serverDeployService;
|
|
@Value("${mqtt.host}")
|
private String host;
|
@Value("${mqtt.clientId}")
|
private String clientId;
|
@Value("${mqtt.username}")
|
private String username;
|
@Value("${mqtt.password}")
|
private String password;
|
private String[] topic;
|
@Value("${mqtt.timeout}")
|
private int timeout;
|
@Value("${mqtt.keepalive}")
|
private int keepalive;
|
@Value("${mqtt.apiKey}")
|
private String apiKey;
|
@Value("${mqtt.secretKey}")
|
private String secretKey;
|
@Value("${mqtt.clientUrl}")
|
private String clientUrl;
|
|
@Bean
|
public MqttCustomerClient getMqttCustomerClient() {
|
mqttCustomerClient.connect(host, clientId, username, password, timeout, keepalive);
|
// 以/#结尾表示订阅所有以test开头的主题
|
List<ServerDeploy> serverDeploys = serverDeployService.list();
|
for (ServerDeploy serverDeploy : serverDeploys) {
|
serverDeploy.setServerCode("IOT\\" + serverDeploy.getServerCode() + "\\Mutually2");
|
}
|
ServerDeploy serverDeploy = new ServerDeploy();
|
serverDeploy.setServerCode("$SYS/brokers/+/clients/#");
|
serverDeploys.add(serverDeploy);
|
mqttCustomerClient.subscribe(serverDeploys.stream().map(ServerDeploy::getServerCode).distinct()
|
.toArray(String[]::new));
|
// 查询客户端列表
|
try {
|
OkHttpClient client = new OkHttpClient();
|
Request request = new Request.Builder()
|
.url(clientUrl)
|
.header("Content-Type", "application/json")
|
.header("Authorization", Credentials.basic(apiKey, secretKey))
|
.build();
|
Response response = client.newCall(request).execute();
|
String json = response.body().string();
|
JSONObject jsonObject = JSON.parseObject(json);
|
String data = String.valueOf(jsonObject.get("data"));
|
List<MqttClient> studentList = JSONObject.parseArray(data, MqttClient.class);
|
serverDeployService.updateServerGuardState(studentList);
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
return mqttCustomerClient;
|
}
|
|
}
|