package org.jeecg.modules.iot.mqtt.config; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.Data; import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import org.jeecg.modules.iot.entity.MqttClient; import org.jeecg.modules.iot.entity.ServerDeploy; import org.jeecg.modules.iot.service.IServerDeployService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; /** * @author: cuikaidong * @create: 2023-09-27 11:36 */ @Component @Configuration @Data public class MqttConfiguration { @Autowired private MqttCustomerClient mqttCustomerClient; @Autowired private IServerDeployService serverDeployService; @Value("${mqtt.host}") private String host; @Value("${mqtt.clientId}") private String clientId; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; private String[] topic; @Value("${mqtt.timeout}") private int timeout; @Value("${mqtt.keepalive}") private int keepalive; @Value("${mqtt.apiKey}") private String apiKey; @Value("${mqtt.secretKey}") private String secretKey; @Value("${mqtt.clientUrl}") private String clientUrl; @Bean public MqttCustomerClient getMqttCustomerClient() { mqttCustomerClient.connect(host, clientId, username, password, timeout, keepalive); // 以/#结尾表示订阅所有以test开头的主题 List serverDeploys = serverDeployService.list(); for (ServerDeploy serverDeploy : serverDeploys) { serverDeploy.setServerCode("IOT\\" + serverDeploy.getServerCode() + "\\Mutually2"); } ServerDeploy serverDeploy = new ServerDeploy(); serverDeploy.setServerCode("$SYS/brokers/+/clients/#"); serverDeploys.add(serverDeploy); mqttCustomerClient.subscribe(serverDeploys.stream().map(ServerDeploy::getServerCode).distinct() .toArray(String[]::new)); // 查询客户端列表 try { OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url(clientUrl) .header("Content-Type", "application/json") .header("Authorization", Credentials.basic(apiKey, secretKey)) .build(); Response response = client.newCall(request).execute(); String json = response.body().string(); JSONObject jsonObject = JSON.parseObject(json); String data = String.valueOf(jsonObject.get("data")); List studentList = JSONObject.parseArray(data, MqttClient.class); serverDeployService.updateServerGuardState(studentList); } catch (IOException e) { e.printStackTrace(); } return mqttCustomerClient; } }