package org.jeecg.modules.quartz.job;
|
|
import com.influxdb.client.InfluxDBClient;
|
import com.influxdb.client.InfluxDBClientFactory;
|
import com.influxdb.client.QueryApi;
|
import com.influxdb.query.FluxTable;
|
import lombok.extern.slf4j.Slf4j;
|
import org.jeecg.modules.iot.depository.Influxdb;
|
import org.jeecg.modules.iot.depository.InfluxdbTest;
|
import org.jeecg.modules.iot.entity.Equipment;
|
import org.jeecg.modules.iot.entity.InfluxdbDeploy;
|
import org.jeecg.modules.iot.mdc.entity.EquipmentLog;
|
import org.jeecg.modules.iot.mdc.service.IEquipmentLogService;
|
import org.jeecg.modules.iot.service.IEquipmentService;
|
import org.jeecg.modules.iot.service.IInfluxdbDeployService;
|
import org.quartz.Job;
|
import org.quartz.JobExecutionContext;
|
import org.quartz.JobExecutionException;
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import javax.sql.DataSource;
|
import java.sql.Connection;
|
import java.sql.SQLException;
|
import java.time.LocalDateTime;
|
import java.time.ZoneId;
|
import java.time.format.DateTimeFormatter;
|
import java.util.*;
|
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.stream.Collectors;
|
|
/**
|
* 实时数据定时任务
|
*
|
* @Author Scott
|
*/
|
@Slf4j
|
public class SampleParamJob implements Job {
|
@Autowired
|
private IInfluxdbDeployService influxdbDeployService;
|
@Autowired
|
private DataSource dataSource;
|
@Autowired
|
private IEquipmentService equipmentService;
|
@Autowired
|
private IEquipmentLogService equipmentLogService;
|
|
@Override
|
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
List<InfluxdbDeploy> influxdbDeployList = influxdbDeployService.list();
|
|
influxdbDeployList.forEach(in -> {
|
// InfluxDB 2.0 连接信息
|
String influxUrl = "http://" + in.getAddress() + ":" + in.getPort();
|
String influxToken = in.getAuthorizeCode();
|
String influxOrg = in.getOrganization();
|
String influxBucket = in.getBucket();
|
// 连接 InfluxDB 2.0
|
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray());
|
QueryApi queryApi = influxDBClient.getQueryApi();
|
// 连接 SqlServer
|
try (Connection mysqlConnection = dataSource.getConnection()) {
|
// 创建 MySQL 表(示例表结构,可按需调整)
|
// InfluxdbTest.createTable(mysqlConnection);
|
// 定义 InfluxDB 查询语句
|
String query = "from(bucket: \"" + influxBucket + "\") " +
|
"|> range(start: -58s) " +
|
"|> fill(usePrevious: true) " +
|
"|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")";
|
// 执行查询
|
try {
|
List<FluxTable> tables = queryApi.query(query, influxOrg);
|
// 处理查询结果并插入到 MySQL
|
List<Influxdb> influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection);
|
// 根据类型拆分
|
Map<String, List<Influxdb>> fieldistMap = influxdbs.stream()
|
.collect(Collectors.groupingBy(Influxdb::getField));
|
List<Influxdb> valueList = new ArrayList<>();
|
List<Influxdb> timeStampList = new ArrayList<>();
|
fieldistMap.forEach((field, timeList) -> {
|
if (field.equals("Value")) {
|
valueList.addAll(timeList);
|
} else {
|
timeStampList.addAll(timeList);
|
}
|
});
|
for (int i = 0; i < valueList.size(); i++) {
|
valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue());
|
}
|
// 根据设备进行分组
|
Map<String, List<Influxdb>> measurementListMap = valueList.stream()
|
.collect(Collectors.groupingBy(Influxdb::getMeasurement));
|
measurementListMap.forEach((table, timeList) -> {
|
// 根据时间进行分组
|
Map<String, List<Influxdb>> timeListMap = timeList.stream()
|
.collect(Collectors.groupingBy(Influxdb::getAcquisitionTime));
|
// 处理设备状态
|
int lastIndex = table.lastIndexOf('.');
|
String code = table.substring(lastIndex + 1);
|
EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code);
|
// 设备状态
|
Integer equipmentState = null;
|
for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) {
|
String measurement = entry.getKey();
|
List<Influxdb> influxdbList = entry.getValue();
|
Equipment equipment = equipmentService.findEquipmentByCode(code);
|
String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode();
|
// 获取表名
|
String[] columns = new String[influxdbList.size() + 2];
|
Object[] values = new Object[influxdbList.size() + 2];
|
String state = "";
|
// 遍历当前设备下的所有参数
|
for (int i = 0; i < influxdbList.size(); i++) {
|
String parameter = influxdbList.get(i).getParameter();
|
columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1);
|
values[i] = influxdbList.get(i).getValue();
|
if (values[i] == null || values[i] == "") {
|
values[i] = "";
|
} else {
|
if (columns[i].equals("Oporation")) {
|
state = values[i].toString();
|
}
|
}
|
|
}
|
// 验证数据是否都为空
|
if (!allEmptyStrings(values)) {
|
columns[influxdbList.size() + 1] = "CollectTime";
|
columns[influxdbList.size()] = "EquipmentID";
|
// 定义日期格式
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
// 转换为 LocalDateTime
|
LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter);
|
values[influxdbList.size() + 1] = dateTime;
|
values[influxdbList.size()] = tableName;
|
// 插入数据
|
equipmentService.insertMysqlData(tableName, columns, values);
|
if (!state.equals("")) {
|
EquipmentLog log = new EquipmentLog();
|
log.setOporation(Integer.parseInt(state));
|
log.setEquipmentId(equipment.getEqptCode());
|
log.setEquipmentName(equipment.getEqptName());
|
log.setCollectTime(Date.from(dateTime.atZone(java.time.ZoneId.systemDefault()).toInstant()));
|
// 验证设备状态是否需要存储
|
if (equipmentLog == null) {
|
equipmentLogService.save(log);
|
equipmentState = Integer.parseInt(state);
|
} else if (equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) {
|
equipmentState = Integer.parseInt(state);
|
equipmentLogService.save(log);
|
}
|
}
|
}
|
};
|
});
|
} catch (Exception e) {
|
log.error(String.valueOf(e));
|
}
|
} catch (SQLException e) {
|
e.printStackTrace();
|
} finally {
|
influxDBClient.close();
|
}
|
});
|
// 直接在输出语句中格式化
|
log.info("实时数据任务执行,当前时间:" +
|
LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault())
|
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
|
}
|
|
public static boolean allEmptyStrings(Object[] values) {
|
if (values == null) {
|
return false; // 如果数组本身为null,返回false
|
}
|
for (Object str : values) {
|
if (str != null && !str.toString().equals("")) {
|
return false; // 如果元素不是String类型,返回false
|
}
|
}
|
|
return true; // 所有元素都是空字符串
|
}
|
}
|