package org.jeecg.modules.quartz.job; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.QueryApi; import com.influxdb.query.FluxTable; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.jeecg.modules.iot.depository.Influxdb; import org.jeecg.modules.iot.depository.InfluxdbTest; import org.jeecg.modules.iot.entity.Equipment; import org.jeecg.modules.iot.entity.InfluxdbDeploy; import org.jeecg.modules.iot.entity.ServerDeploy; import org.jeecg.modules.iot.mdc.entity.EquipmentAlarm; import org.jeecg.modules.iot.mdc.entity.EquipmentLog; import org.jeecg.modules.iot.mdc.service.IEquipmentAlarmService; import org.jeecg.modules.iot.mdc.service.IEquipmentLogService; import org.jeecg.modules.iot.service.IEquipmentService; import org.jeecg.modules.iot.service.IInfluxdbDeployService; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** * influxdb定时任务 * * @Author Scott */ @Slf4j public class SampleParamJob implements Job { @Value("${databaseType}") private String databaseType; @Autowired private IInfluxdbDeployService influxdbDeployService; @Autowired private DataSource dataSource; @Autowired private IEquipmentService equipmentService; @Autowired private IEquipmentLogService equipmentLogService; @Autowired private org.jeecg.modules.iot.mdc.service.IEquipmentService mdcEquipmentService; @Autowired private IEquipmentAlarmService equipmentAlarmService; @Autowired private org.jeecg.modules.iot.service.IServerDeployService serverDeployService; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { List influxdbDeployList = influxdbDeployService.list(); List serverDeployList = serverDeployService.list(); influxdbDeployList.forEach(in -> { // InfluxDB 2.0 连接信息 String influxUrl = "http://" + in.getAddress() + ":" + in.getPort(); String influxToken = in.getAuthorizeCode(); String influxOrg = in.getOrganization(); String influxBucket = in.getBucket(); // 连接 InfluxDB 2.0 InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray()); QueryApi queryApi = influxDBClient.getQueryApi(); // 连接 SqlServer try (Connection mysqlConnection = dataSource.getConnection()) { // 处理时间 DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); AtomicReference start = new AtomicReference<>(""); AtomicReference end = new AtomicReference<>(""); AtomicReference endDate = new AtomicReference<>(null); // 采集时间 serverDeployList.forEach(s -> { if (s.getId().equals(in.getServerDeployId()) && s.getCollectTime() != null) { start.set(sdf.format(s.getCollectTime())); // 获取当前时间 LocalDateTime now = LocalDateTime.now(); // 减去一分钟 // LocalDateTime oneMinuteAgo = now.minusMinutes(1); LocalDateTime oneMinuteAgo = now.minusSeconds(2); // 格式化为字符串(可选) DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); String formattedTime = oneMinuteAgo.format(formatter); // 转换为 Date 类型(使用系统默认时区) Date date = Date.from(oneMinuteAgo.atZone(ZoneId.systemDefault()).toInstant()); endDate.set(date); end.set(formattedTime); } }); if (start.get().equals("")){ return; } LocalDateTime startTime = LocalDateTime.parse(start.get(), inputFormatter); LocalDateTime endTime = LocalDateTime.parse(end.get(), inputFormatter); // 定义日期时间格式器(ISO_LOCAL_DATE_TIME 对应 "yyyy-MM-ddTHH:mm:ss") DateTimeFormatter formatter1 = DateTimeFormatter.ISO_LOCAL_DATE_TIME; // 解析字符串为 LocalDateTime 对象 LocalDateTime startDateTime = LocalDateTime.parse(startTime.toString(), formatter1);// 解析字符串为 LocalDateTime 对象 LocalDateTime endDateTime = LocalDateTime.parse(endTime.toString(), formatter1); // 加上 8 小时 LocalDateTime startNewDateTime = startDateTime.minusHours(8); LocalDateTime endNewDateTime = endDateTime.minusHours(8); // String query = "from(bucket: \"IOT\") \n" + // "|> range(start: "+startNewDateTime+"Z, stop: "+endNewDateTime+"Z) \n" + // "|> fill(usePrevious: true) \n" + // "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; String query = "from(bucket: \"" + influxBucket + "\") " + "|> range(start: -5s) " + "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; // 执行查询 try { List tables = queryApi.query(query, influxOrg); // 处理查询结果并插入到 MySQL List influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection); // 根据类型拆分 Map> fieldistMap = influxdbs.stream() .collect(Collectors.groupingBy(Influxdb::getField)); List valueList = new ArrayList<>(); List timeStampList = new ArrayList<>(); fieldistMap.forEach((field, timeList) -> { if (field.equals("Value")) { valueList.addAll(timeList); } else { timeStampList.addAll(timeList); } }); for (int i = 0; i < valueList.size(); i++) { valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue()); } // 根据设备进行分组 Map> measurementListMap = valueList.stream() .collect(Collectors.groupingBy(Influxdb::getMeasurement)); measurementListMap.forEach((table, timeList) -> { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); Map> timeListMap = timeList.stream() .collect(Collectors.groupingBy( Influxdb::getAcquisitionTime, () -> new TreeMap<>(Comparator.comparing( timeStr -> LocalDateTime.parse(timeStr, formatter) )), Collectors.toList() )); // 处理设备状态 int lastIndex = table.lastIndexOf('.'); String code = table.substring(lastIndex + 1); EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code); // 设备状态 Integer equipmentState = null; for (Map.Entry> entry : timeListMap.entrySet()) { String measurement = entry.getKey(); List influxdbList = entry.getValue(); Equipment equipment = equipmentService.findEquipmentByCode(code); String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode(); // 获取表名 String[] columns = new String[influxdbList.size() + 2]; Object[] values = new Object[influxdbList.size() + 2]; String state = ""; // 设备状态 String isAlarm = ""; // 报警状态 String alarm = ""; // 报警号 String alarmContent = ""; // 报警内容 // 遍历当前设备下的所有参数 for (int i = 0; i < influxdbList.size(); i++) { String parameter = influxdbList.get(i).getParameter(); columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1); values[i] = influxdbList.get(i).getValue(); if (values[i] == null || values[i] == "") { values[i] = ""; } else { if (columns[i].equals("Oporation")) { state = values[i].toString(); } else if (columns[i].equals("AlarmNo")) { alarm = values[i].toString(); } else if (columns[i].equals("AlarmContent")) { alarmContent = values[i].toString(); } else if (columns[i].equals("IsAlarm")) { isAlarm = values[i].toString(); } } } // 验证数据是否都为空 if (!allEmptyStrings(values)) { columns[influxdbList.size() + 1] = "CollectTime"; columns[influxdbList.size()] = "EquipmentID"; // 转换为 LocalDateTime LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter); values[influxdbList.size() + 1] = dateTime; values[influxdbList.size()] = equipment.getEqptCode(); // 插入数据 if (databaseType.equals("SqlServer")) { equipmentService.insertSqlServerData(tableName, columns, values); } else if (databaseType.equals("MySql")) { equipmentService.insertMySqlData(tableName, columns, values); } if (!state.equals("")) { EquipmentLog log = new EquipmentLog(); log.setOporation(Integer.parseInt(state)); log.setEquipmentId(equipment.getEqptCode()); log.setEquipmentName(equipment.getEqptName()); log.setCollectTime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant())); // True 报警开始,False报警结束 if (isAlarm.equals("True")) { // 报警开始 if (equipmentLog == null && equipmentState == null) { // 验证是否报警 log.setOporation(22); log.setAlarm(alarm); equipmentLogService.save(log); equipmentState = 22; equipmentStateModification(equipment.getEqptCode(), 22); } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != 22) { log.setOporation(22); log.setAlarm(alarm); equipmentState = 22; equipmentLogService.save(log); equipmentStateModification(equipment.getEqptCode(), 22); } else if (equipmentLog != null && equipmentState != null && equipmentState != 22) { log.setOporation(22); log.setAlarm(alarm); equipmentState = 22; equipmentLogService.save(log); equipmentStateModification(equipment.getEqptCode(), 22); } } else if (isAlarm.equals("False")) { // 报警结束 if (equipmentLog == null && equipmentState == null) { // 验证是否报警 equipmentLogService.save(log); equipmentState = Integer.parseInt(state); equipmentStateModification(equipment.getEqptCode(), equipmentState); } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() == 22) { log.setOporation(23); equipmentState = 23; equipmentLogService.save(log); equipmentStateModification(equipment.getEqptCode(), equipmentState); } else if (equipmentLog != null && equipmentState != null && equipmentState == 22) { log.setOporation(23); equipmentState = 23; equipmentLogService.save(log); equipmentStateModification(equipment.getEqptCode(), 23); } else { if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) { equipmentState = Integer.parseInt(state); equipmentLogService.save(log); equipmentStateModification(equipment.getEqptCode(), equipmentState); //验证是否是报警,或者报警结束 } else if (equipmentLog != null && equipmentState != null && equipmentState != Integer.parseInt(state)) { equipmentState = Integer.parseInt(state); equipmentLogService.save(log); equipmentStateModification(equipment.getEqptCode(), equipmentState); } } } // 处理报警信息 if (isAlarm.equals("True")) { EquipmentAlarm equipmentAlarm = equipmentAlarmService.selectEquipmentAlarmByEquipmentId(equipment.getEqptCode()); if (equipmentAlarm != null && !equipmentAlarm.getAlarmNo().equals(alarm)) { addEquipmentAlarm(alarm, equipment, dateTime, alarmContent); } else if (equipmentAlarm == null) { addEquipmentAlarm(alarm, equipment, dateTime, alarmContent); } } } } } }); // 如果有查询数据,时间不为空 if (endDate.get() != null && CollectionUtils.isNotEmpty(tables)) { for (ServerDeploy s : serverDeployList) { if (s.getId().equals(in.getServerDeployId())) { s.setCollectTime(endDate.get()); return; } } } } catch (Exception e) { log.error(String.valueOf(e)); } } catch (SQLException e) { e.printStackTrace(); } finally { influxDBClient.close(); } }); serverDeployService.updateBatchById(serverDeployList); // 直接在输出语句中格式化 log.info("实时数据任务执行,当前时间:" + LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()) .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); } public static boolean allEmptyStrings(Object[] values) { if (values == null) { return false; // 如果数组本身为null,返回false } for (Object str : values) { if (str != null && !str.toString().equals("")) { return false; // 如果元素不是String类型,返回false } } return true; // 所有元素都是空字符串 } /** * 报警信息处理 * * @param alarm 报警号 * @param equipment 设备信息 * @param dateTime 报警时间 * @param alarmContent 报警内容 */ public void addEquipmentAlarm(String alarm, Equipment equipment, LocalDateTime dateTime, String alarmContent) { // 验证是否报警 EquipmentAlarm equipmentAlarm = new EquipmentAlarm(); equipmentAlarm.setEquipmentid(equipment.getEqptCode()); equipmentAlarm.setCollecttime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant())); equipmentAlarm.setAlarmNo(alarm); equipmentAlarm.setAlarmContent(alarmContent); equipmentAlarmService.save(equipmentAlarm); } void equipmentStateModification(String eqptCode, Integer state) { org.jeecg.modules.iot.mdc.entity.Equipment mdcEquipment = mdcEquipmentService.findEquipmentByEquipmentId(eqptCode); if (mdcEquipment != null) { mdcEquipment.setOporation(state); mdcEquipmentService.updateById(mdcEquipment); } } }