package org.jeecg.modules.iot.mqtt.config;
|
|
import com.alibaba.fastjson.JSONObject;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.jeecg.modules.iot.entity.MqttParameter;
|
import org.jeecg.modules.iot.mqtt.callback.PushCallback;
|
import org.jeecg.modules.iot.util.DispatchUtil;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.time.LocalDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.util.Arrays;
|
import java.util.LinkedHashMap;
|
import java.util.Map;
|
|
/**
|
* mqtt客户端
|
*
|
* @author: cuikaidong
|
* @create: 2023-09-27 11:36
|
*/
|
@Slf4j
|
@Component
|
public class MqttCustomerClient {
|
|
@Resource
|
private PushCallback pushCallback;
|
|
|
private static MqttClient client;
|
|
public static MqttClient getClient() {
|
return client;
|
}
|
|
public static void setClient(MqttClient client) {
|
MqttCustomerClient.client = client;
|
}
|
|
/**
|
* 客户端连接
|
*
|
* @param host ip+端口
|
* @param clientID 客户端Id
|
* @param username 用户名
|
* @param password 密码
|
* @param timeout 超时时间
|
* @param keeplive 保留数
|
*/
|
public void connect(String host, String clientID, String username, String password, int timeout, int keeplive) {
|
MqttClient client;
|
|
try {
|
client = new MqttClient(host, clientID, new MemoryPersistence());
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setCleanSession(true);
|
options.setUserName(username);
|
options.setPassword(password.toCharArray());
|
options.setConnectionTimeout(timeout);
|
options.setKeepAliveInterval(keeplive);
|
MqttCustomerClient.setClient(client);
|
try {
|
client.setCallback(pushCallback);
|
client.connect(options);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 发布,默认qos为0,非持久化
|
*
|
* @param topic
|
* @param mqttParameter
|
*/
|
public void pushlish(String topic, MqttParameter mqttParameter) {
|
pushlish(0, false, topic, mqttParameter);
|
}
|
|
/**
|
* 发布
|
*
|
* @param qos 连接方式
|
* @param retained 是否保留
|
* @param topic 主题
|
* @param mqttParameter 参数对象
|
*/
|
public void pushlish(int qos, boolean retained, String topic, MqttParameter mqttParameter) {
|
topic = "IOT\\" + topic + "\\Mutually1";
|
Map<String, String> map = new LinkedHashMap<>();
|
map.put("id", mqttParameter.getId());
|
map.put("type", mqttParameter.getType());
|
LocalDateTime now = LocalDateTime.now();
|
DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
map.put("time", now.format(formatter2));
|
map.put("parameter1", mqttParameter.getParameter1());
|
map.put("parameter2", mqttParameter.getParameter2());
|
map.put("parameter3", mqttParameter.getParameter3());
|
map.put("parameter4", mqttParameter.getParameter4());
|
map.put("parameter5", mqttParameter.getParameter5());
|
Object json = JSONObject.toJSON(map);
|
String pushMessage = DispatchUtil.replaceBlank(json.toString());
|
MqttMessage message = new MqttMessage();
|
message.setQos(qos);
|
message.setRetained(retained);
|
message.setPayload(pushMessage.getBytes());
|
MqttTopic mqttTopic = MqttCustomerClient.getClient().getTopic(topic);
|
if (null == mqttTopic) {
|
log.error("topic not exist");
|
}
|
MqttDeliveryToken token;
|
try {
|
token = mqttTopic.publish(message);
|
token.waitForCompletion();
|
} catch (MqttPersistenceException e) {
|
e.printStackTrace();
|
} catch (MqttException e) {
|
e.printStackTrace();
|
// 重连
|
pushCallback.connectionLost(null);
|
}
|
}
|
|
/**
|
* 订阅某个主题,qos为0
|
*
|
* @param topic
|
*/
|
public void subscribe(String[] topic) {
|
log.info("开始订阅主题" + Arrays.toString(topic));
|
try {
|
int[] qos = new int[topic.length];
|
for (int i = 0; i < topic.length; i++) {
|
qos[i] = 2;
|
}
|
MqttCustomerClient.getClient().subscribe(topic, qos);
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
}
|