package org.jeecg.modules.iot.mqtt.config; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.jeecg.modules.iot.entity.MqttParameter; import org.jeecg.modules.iot.mqtt.callback.PushCallback; import org.jeecg.modules.iot.util.DispatchUtil; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; /** * mqtt客户端 * * @author: cuikaidong * @create: 2023-09-27 11:36 */ @Slf4j @Component public class MqttCustomerClient { @Resource private PushCallback pushCallback; private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MqttCustomerClient.client = client; } /** * 客户端连接 * * @param host ip+端口 * @param clientID 客户端Id * @param username 用户名 * @param password 密码 * @param timeout 超时时间 * @param keeplive 保留数 */ public void connect(String host, String clientID, String username, String password, int timeout, int keeplive) { MqttClient client; try { client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keeplive); MqttCustomerClient.setClient(client); try { client.setCallback(pushCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 发布,默认qos为0,非持久化 * * @param topic * @param mqttParameter */ public void pushlish(String topic, MqttParameter mqttParameter) { pushlish(0, false, topic, mqttParameter); } /** * 发布 * * @param qos 连接方式 * @param retained 是否保留 * @param topic 主题 * @param mqttParameter 参数对象 */ public void pushlish(int qos, boolean retained, String topic, MqttParameter mqttParameter) { topic = "IOT\\" + topic + "\\Mutually1"; Map map = new LinkedHashMap<>(); map.put("id", mqttParameter.getId()); map.put("type", mqttParameter.getType()); LocalDateTime now = LocalDateTime.now(); DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); map.put("time", now.format(formatter2)); map.put("parameter1", mqttParameter.getParameter1()); map.put("parameter2", mqttParameter.getParameter2()); map.put("parameter3", mqttParameter.getParameter3()); map.put("parameter4", mqttParameter.getParameter4()); map.put("parameter5", mqttParameter.getParameter5()); Object json = JSONObject.toJSON(map); String pushMessage = DispatchUtil.replaceBlank(json.toString()); MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mqttTopic = MqttCustomerClient.getClient().getTopic(topic); if (null == mqttTopic) { log.error("topic not exist"); } MqttDeliveryToken token; try { token = mqttTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); // 重连 pushCallback.connectionLost(null); } } /** * 订阅某个主题,qos为0 * * @param topic */ public void subscribe(String[] topic) { log.info("开始订阅主题" + Arrays.toString(topic)); try { int[] qos = new int[topic.length]; for (int i = 0; i < topic.length; i++) { qos[i] = 2; } MqttCustomerClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }