package org.jeecg.modules.quartz.job; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.QueryApi; import com.influxdb.query.FluxTable; import lombok.extern.slf4j.Slf4j; import org.jeecg.modules.iot.depository.Influxdb; import org.jeecg.modules.iot.depository.InfluxdbTest; import org.jeecg.modules.iot.entity.Equipment; import org.jeecg.modules.iot.entity.InfluxdbDeploy; import org.jeecg.modules.iot.mdc.entity.EquipmentLog; import org.jeecg.modules.iot.mdc.service.IEquipmentLogService; import org.jeecg.modules.iot.service.IEquipmentService; import org.jeecg.modules.iot.service.IInfluxdbDeployService; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** * 实时数据定时任务 * * @Author Scott */ @Slf4j public class SampleParamJob implements Job { @Autowired private IInfluxdbDeployService influxdbDeployService; @Autowired private DataSource dataSource; @Autowired private IEquipmentService equipmentService; @Autowired private IEquipmentLogService equipmentLogService; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { List influxdbDeployList = influxdbDeployService.list(); influxdbDeployList.forEach(in -> { // InfluxDB 2.0 连接信息 String influxUrl = "http://" + in.getAddress() + ":" + in.getPort(); String influxToken = in.getAuthorizeCode(); String influxOrg = in.getOrganization(); String influxBucket = in.getBucket(); // 连接 InfluxDB 2.0 InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray()); QueryApi queryApi = influxDBClient.getQueryApi(); // 连接 SqlServer try (Connection mysqlConnection = dataSource.getConnection()) { // 创建 MySQL 表(示例表结构,可按需调整) // InfluxdbTest.createTable(mysqlConnection); // 定义 InfluxDB 查询语句 String query = "from(bucket: \"" + influxBucket + "\") " + "|> range(start: -58s) " + "|> fill(usePrevious: true) " + "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; // 执行查询 try { List tables = queryApi.query(query, influxOrg); // 处理查询结果并插入到 MySQL List influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection); // 根据类型拆分 Map> fieldistMap = influxdbs.stream() .collect(Collectors.groupingBy(Influxdb::getField)); List valueList = new ArrayList<>(); List timeStampList = new ArrayList<>(); fieldistMap.forEach((field, timeList) -> { if (field.equals("Value")) { valueList.addAll(timeList); } else { timeStampList.addAll(timeList); } }); for (int i = 0; i < valueList.size(); i++) { valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue()); } // 根据设备进行分组 Map> measurementListMap = valueList.stream() .collect(Collectors.groupingBy(Influxdb::getMeasurement)); measurementListMap.forEach((table, timeList) -> { // 根据时间进行分组 Map> timeListMap = timeList.stream() .collect(Collectors.groupingBy(Influxdb::getAcquisitionTime)); // 处理设备状态 int lastIndex = table.lastIndexOf('.'); String code = table.substring(lastIndex + 1); EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code); // 设备状态 Integer equipmentState = null; for (Map.Entry> entry : timeListMap.entrySet()) { String measurement = entry.getKey(); List influxdbList = entry.getValue(); Equipment equipment = equipmentService.findEquipmentByCode(code); String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode(); // 获取表名 String[] columns = new String[influxdbList.size() + 2]; Object[] values = new Object[influxdbList.size() + 2]; String state = ""; // 遍历当前设备下的所有参数 for (int i = 0; i < influxdbList.size(); i++) { String parameter = influxdbList.get(i).getParameter(); columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1); values[i] = influxdbList.get(i).getValue(); if (values[i] == null || values[i] == "") { values[i] = ""; } else { if (columns[i].equals("Oporation")) { state = values[i].toString(); } } } // 验证数据是否都为空 if (!allEmptyStrings(values)) { columns[influxdbList.size() + 1] = "CollectTime"; columns[influxdbList.size()] = "EquipmentID"; // 定义日期格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 转换为 LocalDateTime LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter); values[influxdbList.size() + 1] = dateTime; values[influxdbList.size()] = tableName; // 插入数据 equipmentService.insertMysqlData(tableName, columns, values); if (!state.equals("")) { EquipmentLog log = new EquipmentLog(); log.setOporation(Integer.parseInt(state)); log.setEquipmentId(equipment.getEqptCode()); log.setEquipmentName(equipment.getEqptName()); log.setCollectTime(Date.from(dateTime.atZone(java.time.ZoneId.systemDefault()).toInstant())); // 验证设备状态是否需要存储 if (equipmentLog == null) { equipmentLogService.save(log); equipmentState = Integer.parseInt(state); } else if (equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) { equipmentState = Integer.parseInt(state); equipmentLogService.save(log); } } } }; }); } catch (Exception e) { log.error(String.valueOf(e)); } } catch (SQLException e) { e.printStackTrace(); } finally { influxDBClient.close(); } }); // 直接在输出语句中格式化 log.info("实时数据任务执行,当前时间:" + LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()) .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); } public static boolean allEmptyStrings(Object[] values) { if (values == null) { return false; // 如果数组本身为null,返回false } for (Object str : values) { if (str != null && !str.toString().equals("")) { return false; // 如果元素不是String类型,返回false } } return true; // 所有元素都是空字符串 } }