package org.jeecg.modules.quartz.job;
|
|
import com.influxdb.client.InfluxDBClient;
|
import com.influxdb.client.InfluxDBClientFactory;
|
import com.influxdb.client.QueryApi;
|
import com.influxdb.query.FluxTable;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.collections.CollectionUtils;
|
import org.jeecg.modules.iot.depository.Influxdb;
|
import org.jeecg.modules.iot.depository.InfluxdbTest;
|
import org.jeecg.modules.iot.entity.Equipment;
|
import org.jeecg.modules.iot.entity.InfluxdbDeploy;
|
import org.jeecg.modules.iot.entity.ServerDeploy;
|
import org.jeecg.modules.iot.mdc.entity.EquipmentAlarm;
|
import org.jeecg.modules.iot.mdc.entity.EquipmentLog;
|
import org.jeecg.modules.iot.mdc.service.IEquipmentAlarmService;
|
import org.jeecg.modules.iot.mdc.service.IEquipmentLogService;
|
import org.jeecg.modules.iot.service.IEquipmentService;
|
import org.jeecg.modules.iot.service.IInfluxdbDeployService;
|
import org.quartz.Job;
|
import org.quartz.JobExecutionContext;
|
import org.quartz.JobExecutionException;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
|
import javax.sql.DataSource;
|
import java.sql.Connection;
|
import java.sql.SQLException;
|
import java.text.SimpleDateFormat;
|
import java.time.LocalDateTime;
|
import java.time.ZoneId;
|
import java.time.format.DateTimeFormatter;
|
import java.time.temporal.TemporalAccessor;
|
import java.util.*;
|
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.stream.Collectors;
|
|
/**
|
* influxdb定时任务
|
*
|
* @Author Scott
|
*/
|
@Slf4j
|
public class SampleParamJob implements Job {
|
@Value("${databaseType}")
|
private String databaseType;
|
@Autowired
|
private IInfluxdbDeployService influxdbDeployService;
|
@Autowired
|
private DataSource dataSource;
|
@Autowired
|
private IEquipmentService equipmentService;
|
@Autowired
|
private IEquipmentLogService equipmentLogService;
|
@Autowired
|
private org.jeecg.modules.iot.mdc.service.IEquipmentService mdcEquipmentService;
|
@Autowired
|
private IEquipmentAlarmService equipmentAlarmService;
|
@Autowired
|
private org.jeecg.modules.iot.service.IServerDeployService serverDeployService;
|
|
@Override
|
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
List<InfluxdbDeploy> influxdbDeployList = influxdbDeployService.list();
|
List<ServerDeploy> serverDeployList = serverDeployService.list();
|
|
influxdbDeployList.forEach(in -> {
|
// InfluxDB 2.0 连接信息
|
String influxUrl = "http://" + in.getAddress() + ":" + in.getPort();
|
String influxToken = in.getAuthorizeCode();
|
String influxOrg = in.getOrganization();
|
String influxBucket = in.getBucket();
|
// 连接 InfluxDB 2.0
|
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray());
|
QueryApi queryApi = influxDBClient.getQueryApi();
|
// 连接 SqlServer
|
try (Connection mysqlConnection = dataSource.getConnection()) {
|
// 处理时间
|
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
AtomicReference<String> start = new AtomicReference<>("");
|
AtomicReference<String> end = new AtomicReference<>("");
|
AtomicReference<Date> endDate = new AtomicReference<>(null); // 采集时间
|
serverDeployList.forEach(s -> {
|
if (s.getId().equals(in.getServerDeployId()) && s.getCollectTime() != null) {
|
start.set(sdf.format(s.getCollectTime()));
|
// 获取当前时间
|
LocalDateTime now = LocalDateTime.now();
|
// 减去一分钟
|
// LocalDateTime oneMinuteAgo = now.minusMinutes(1);
|
LocalDateTime oneMinuteAgo = now.minusSeconds(2);
|
// 格式化为字符串(可选)
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
String formattedTime = oneMinuteAgo.format(formatter);
|
// 转换为 Date 类型(使用系统默认时区)
|
Date date = Date.from(oneMinuteAgo.atZone(ZoneId.systemDefault()).toInstant());
|
endDate.set(date);
|
end.set(formattedTime);
|
}
|
});
|
if (start.get().equals("")){
|
return;
|
}
|
LocalDateTime startTime = LocalDateTime.parse(start.get(), inputFormatter);
|
LocalDateTime endTime = LocalDateTime.parse(end.get(), inputFormatter);
|
// 定义日期时间格式器(ISO_LOCAL_DATE_TIME 对应 "yyyy-MM-ddTHH:mm:ss")
|
DateTimeFormatter formatter1 = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
|
|
// 解析字符串为 LocalDateTime 对象
|
LocalDateTime startDateTime = LocalDateTime.parse(startTime.toString(), formatter1);// 解析字符串为 LocalDateTime 对象
|
LocalDateTime endDateTime = LocalDateTime.parse(endTime.toString(), formatter1);
|
|
// 加上 8 小时
|
LocalDateTime startNewDateTime = startDateTime.minusHours(8);
|
LocalDateTime endNewDateTime = endDateTime.minusHours(8);
|
// String query = "from(bucket: \"IOT\") \n" +
|
// "|> range(start: "+startNewDateTime+"Z, stop: "+endNewDateTime+"Z) \n" +
|
// "|> fill(usePrevious: true) \n" +
|
// "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")";
|
String query = "from(bucket: \"" + influxBucket + "\") " +
|
"|> range(start: -5s) " +
|
"|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")";
|
|
// 执行查询
|
try {
|
List<FluxTable> tables = queryApi.query(query, influxOrg);
|
// 处理查询结果并插入到 MySQL
|
List<Influxdb> influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection);
|
// 根据类型拆分
|
Map<String, List<Influxdb>> fieldistMap = influxdbs.stream()
|
.collect(Collectors.groupingBy(Influxdb::getField));
|
List<Influxdb> valueList = new ArrayList<>();
|
List<Influxdb> timeStampList = new ArrayList<>();
|
fieldistMap.forEach((field, timeList) -> {
|
if (field.equals("Value")) {
|
valueList.addAll(timeList);
|
} else {
|
timeStampList.addAll(timeList);
|
}
|
});
|
for (int i = 0; i < valueList.size(); i++) {
|
valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue());
|
}
|
// 根据设备进行分组
|
Map<String, List<Influxdb>> measurementListMap = valueList.stream()
|
.collect(Collectors.groupingBy(Influxdb::getMeasurement));
|
measurementListMap.forEach((table, timeList) -> {
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
Map<String, List<Influxdb>> timeListMap = timeList.stream()
|
.collect(Collectors.groupingBy(
|
Influxdb::getAcquisitionTime,
|
() -> new TreeMap<>(Comparator.comparing(
|
timeStr -> LocalDateTime.parse(timeStr, formatter)
|
)),
|
Collectors.toList()
|
));
|
|
// 处理设备状态
|
int lastIndex = table.lastIndexOf('.');
|
String code = table.substring(lastIndex + 1);
|
EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code);
|
// 设备状态
|
Integer equipmentState = null;
|
for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) {
|
String measurement = entry.getKey();
|
List<Influxdb> influxdbList = entry.getValue();
|
Equipment equipment = equipmentService.findEquipmentByCode(code);
|
String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode();
|
// 获取表名
|
String[] columns = new String[influxdbList.size() + 2];
|
Object[] values = new Object[influxdbList.size() + 2];
|
String state = ""; // 设备状态
|
String isAlarm = ""; // 报警状态
|
String alarm = ""; // 报警号
|
String alarmContent = ""; // 报警内容
|
// 遍历当前设备下的所有参数
|
for (int i = 0; i < influxdbList.size(); i++) {
|
String parameter = influxdbList.get(i).getParameter();
|
columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1);
|
values[i] = influxdbList.get(i).getValue();
|
if (values[i] == null || values[i] == "") {
|
values[i] = "";
|
} else {
|
if (columns[i].equals("Oporation")) {
|
state = values[i].toString();
|
} else if (columns[i].equals("AlarmNo")) {
|
alarm = values[i].toString();
|
} else if (columns[i].equals("AlarmContent")) {
|
alarmContent = values[i].toString();
|
} else if (columns[i].equals("IsAlarm")) {
|
isAlarm = values[i].toString();
|
}
|
}
|
|
}
|
// 验证数据是否都为空
|
if (!allEmptyStrings(values)) {
|
columns[influxdbList.size() + 1] = "CollectTime";
|
columns[influxdbList.size()] = "EquipmentID";
|
// 转换为 LocalDateTime
|
LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter);
|
values[influxdbList.size() + 1] = dateTime;
|
values[influxdbList.size()] = equipment.getEqptCode();
|
// 插入数据
|
if (databaseType.equals("SqlServer")) {
|
equipmentService.insertSqlServerData(tableName, columns, values);
|
} else if (databaseType.equals("MySql")) {
|
equipmentService.insertMySqlData(tableName, columns, values);
|
|
}
|
if (!state.equals("")) {
|
EquipmentLog log = new EquipmentLog();
|
log.setOporation(Integer.parseInt(state));
|
log.setEquipmentId(equipment.getEqptCode());
|
log.setEquipmentName(equipment.getEqptName());
|
log.setCollectTime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()));
|
// True 报警开始,False报警结束
|
if (isAlarm.equals("True")) {
|
// 报警开始
|
if (equipmentLog == null && equipmentState == null) {
|
// 验证是否报警
|
log.setOporation(22);
|
log.setAlarm(alarm);
|
equipmentLogService.save(log);
|
equipmentState = 22;
|
equipmentStateModification(equipment.getEqptCode(), 22);
|
} else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != 22) {
|
log.setOporation(22);
|
log.setAlarm(alarm);
|
equipmentState = 22;
|
equipmentLogService.save(log);
|
equipmentStateModification(equipment.getEqptCode(), 22);
|
} else if (equipmentLog != null && equipmentState != null && equipmentState != 22) {
|
log.setOporation(22);
|
log.setAlarm(alarm);
|
equipmentState = 22;
|
equipmentLogService.save(log);
|
equipmentStateModification(equipment.getEqptCode(), 22);
|
}
|
} else if (isAlarm.equals("False")) {
|
// 报警结束
|
if (equipmentLog == null && equipmentState == null) {
|
// 验证是否报警
|
equipmentLogService.save(log);
|
equipmentState = Integer.parseInt(state);
|
equipmentStateModification(equipment.getEqptCode(), equipmentState);
|
} else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() == 22) {
|
log.setOporation(23);
|
equipmentState = 23;
|
equipmentLogService.save(log);
|
equipmentStateModification(equipment.getEqptCode(), equipmentState);
|
} else if (equipmentLog != null && equipmentState != null && equipmentState == 22) {
|
log.setOporation(23);
|
equipmentState = 23;
|
equipmentLogService.save(log);
|
equipmentStateModification(equipment.getEqptCode(), 23);
|
} else {
|
if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) {
|
equipmentState = Integer.parseInt(state);
|
equipmentLogService.save(log);
|
equipmentStateModification(equipment.getEqptCode(), equipmentState);
|
//验证是否是报警,或者报警结束
|
} else if (equipmentLog != null && equipmentState != null && equipmentState != Integer.parseInt(state)) {
|
equipmentState = Integer.parseInt(state);
|
equipmentLogService.save(log);
|
equipmentStateModification(equipment.getEqptCode(), equipmentState);
|
}
|
}
|
}
|
// 处理报警信息
|
if (isAlarm.equals("True")) {
|
EquipmentAlarm equipmentAlarm = equipmentAlarmService.selectEquipmentAlarmByEquipmentId(equipment.getEqptCode());
|
if (equipmentAlarm != null && !equipmentAlarm.getAlarmNo().equals(alarm)) {
|
addEquipmentAlarm(alarm, equipment, dateTime, alarmContent);
|
} else if (equipmentAlarm == null) {
|
addEquipmentAlarm(alarm, equipment, dateTime, alarmContent);
|
}
|
}
|
}
|
}
|
}
|
});
|
// 如果有查询数据,时间不为空
|
|
if (endDate.get() != null && CollectionUtils.isNotEmpty(tables)) {
|
for (ServerDeploy s : serverDeployList) {
|
if (s.getId().equals(in.getServerDeployId())) {
|
s.setCollectTime(endDate.get());
|
return;
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.error(String.valueOf(e));
|
}
|
} catch (SQLException e) {
|
e.printStackTrace();
|
} finally {
|
influxDBClient.close();
|
}
|
});
|
serverDeployService.updateBatchById(serverDeployList);
|
// 直接在输出语句中格式化
|
log.info("实时数据任务执行,当前时间:" +
|
LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault())
|
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
|
}
|
|
public static boolean allEmptyStrings(Object[] values) {
|
if (values == null) {
|
return false; // 如果数组本身为null,返回false
|
}
|
for (Object str : values) {
|
if (str != null && !str.toString().equals("")) {
|
return false; // 如果元素不是String类型,返回false
|
}
|
}
|
|
return true; // 所有元素都是空字符串
|
}
|
|
/**
|
* 报警信息处理
|
*
|
* @param alarm 报警号
|
* @param equipment 设备信息
|
* @param dateTime 报警时间
|
* @param alarmContent 报警内容
|
*/
|
public void addEquipmentAlarm(String alarm, Equipment equipment, LocalDateTime dateTime, String alarmContent) {
|
// 验证是否报警
|
EquipmentAlarm equipmentAlarm = new EquipmentAlarm();
|
equipmentAlarm.setEquipmentid(equipment.getEqptCode());
|
equipmentAlarm.setCollecttime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()));
|
equipmentAlarm.setAlarmNo(alarm);
|
equipmentAlarm.setAlarmContent(alarmContent);
|
equipmentAlarmService.save(equipmentAlarm);
|
|
}
|
|
void equipmentStateModification(String eqptCode, Integer state) {
|
org.jeecg.modules.iot.mdc.entity.Equipment mdcEquipment = mdcEquipmentService.findEquipmentByEquipmentId(eqptCode);
|
if (mdcEquipment != null) {
|
mdcEquipment.setOporation(state);
|
mdcEquipmentService.updateById(mdcEquipment);
|
}
|
}
|
}
|