| | |
| | | package org.jeecg.modules.quartz.job; |
| | | |
| | | import org.jeecg.common.util.DateUtils; |
| | | import com.influxdb.client.InfluxDBClient; |
| | | import com.influxdb.client.InfluxDBClientFactory; |
| | | import com.influxdb.client.QueryApi; |
| | | import com.influxdb.query.FluxTable; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.collections.CollectionUtils; |
| | | import org.jeecg.modules.iot.depository.Influxdb; |
| | | import org.jeecg.modules.iot.depository.InfluxdbTest; |
| | | import org.jeecg.modules.iot.entity.Equipment; |
| | | import org.jeecg.modules.iot.entity.InfluxdbDeploy; |
| | | import org.jeecg.modules.iot.entity.ServerDeploy; |
| | | import org.jeecg.modules.iot.mdc.entity.EquipmentAlarm; |
| | | import org.jeecg.modules.iot.mdc.entity.EquipmentLog; |
| | | import org.jeecg.modules.iot.mdc.service.IEquipmentAlarmService; |
| | | import org.jeecg.modules.iot.mdc.service.IEquipmentLogService; |
| | | import org.jeecg.modules.iot.service.IEquipmentService; |
| | | import org.jeecg.modules.iot.service.IInfluxdbDeployService; |
| | | import org.quartz.Job; |
| | | import org.quartz.JobExecutionContext; |
| | | import org.quartz.JobExecutionException; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import javax.sql.DataSource; |
| | | import java.sql.Connection; |
| | | import java.sql.SQLException; |
| | | import java.text.SimpleDateFormat; |
| | | import java.time.LocalDateTime; |
| | | import java.time.ZoneId; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.time.temporal.TemporalAccessor; |
| | | import java.util.*; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * 示例带参定时任务 |
| | | * |
| | | * influxdb定时任务 |
| | | * |
| | | * @Author Scott |
| | | */ |
| | | @Slf4j |
| | | public class SampleParamJob implements Job { |
| | | |
| | | /** |
| | | * 若参数变量名修改 QuartzJobController中也需对应修改 |
| | | */ |
| | | private String parameter; |
| | | |
| | | public void setParameter(String parameter) { |
| | | this.parameter = parameter; |
| | | } |
| | | @Value("${databaseType}") |
| | | private String databaseType; |
| | | @Autowired |
| | | private IInfluxdbDeployService influxdbDeployService; |
| | | @Autowired |
| | | private DataSource dataSource; |
| | | @Autowired |
| | | private IEquipmentService equipmentService; |
| | | @Autowired |
| | | private IEquipmentLogService equipmentLogService; |
| | | @Autowired |
| | | private org.jeecg.modules.iot.mdc.service.IEquipmentService mdcEquipmentService; |
| | | @Autowired |
| | | private IEquipmentAlarmService equipmentAlarmService; |
| | | @Autowired |
| | | private org.jeecg.modules.iot.service.IServerDeployService serverDeployService; |
| | | |
| | | @Override |
| | | public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { |
| | | log.info(" Job Execution key:"+jobExecutionContext.getJobDetail().getKey()); |
| | | log.info( String.format("welcome %s! Jeecg-Boot 带参数定时任务 SampleParamJob ! 时间:" + DateUtils.now(), this.parameter)); |
| | | List<InfluxdbDeploy> influxdbDeployList = influxdbDeployService.list(); |
| | | List<ServerDeploy> serverDeployList = serverDeployService.list(); |
| | | |
| | | influxdbDeployList.forEach(in -> { |
| | | // InfluxDB 2.0 连接信息 |
| | | String influxUrl = "http://" + in.getAddress() + ":" + in.getPort(); |
| | | String influxToken = in.getAuthorizeCode(); |
| | | String influxOrg = in.getOrganization(); |
| | | String influxBucket = in.getBucket(); |
| | | // 连接 InfluxDB 2.0 |
| | | InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray()); |
| | | QueryApi queryApi = influxDBClient.getQueryApi(); |
| | | // 连接 SqlServer |
| | | try (Connection mysqlConnection = dataSource.getConnection()) { |
| | | // 处理时间 |
| | | DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); |
| | | SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| | | AtomicReference<String> start = new AtomicReference<>(""); |
| | | AtomicReference<String> end = new AtomicReference<>(""); |
| | | AtomicReference<Date> endDate = new AtomicReference<>(null); // 采集时间 |
| | | serverDeployList.forEach(s -> { |
| | | if (s.getId().equals(in.getServerDeployId()) && s.getCollectTime() != null) { |
| | | start.set(sdf.format(s.getCollectTime())); |
| | | // 获取当前时间 |
| | | LocalDateTime now = LocalDateTime.now(); |
| | | // 减去一分钟 |
| | | // LocalDateTime oneMinuteAgo = now.minusMinutes(1); |
| | | LocalDateTime oneMinuteAgo = now.minusSeconds(2); |
| | | // 格式化为字符串(可选) |
| | | DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); |
| | | String formattedTime = oneMinuteAgo.format(formatter); |
| | | // 转换为 Date 类型(使用系统默认时区) |
| | | Date date = Date.from(oneMinuteAgo.atZone(ZoneId.systemDefault()).toInstant()); |
| | | endDate.set(date); |
| | | end.set(formattedTime); |
| | | } |
| | | }); |
| | | if (start.get().equals("")){ |
| | | return; |
| | | } |
| | | LocalDateTime startTime = LocalDateTime.parse(start.get(), inputFormatter); |
| | | LocalDateTime endTime = LocalDateTime.parse(end.get(), inputFormatter); |
| | | // 定义日期时间格式器(ISO_LOCAL_DATE_TIME 对应 "yyyy-MM-ddTHH:mm:ss") |
| | | DateTimeFormatter formatter1 = DateTimeFormatter.ISO_LOCAL_DATE_TIME; |
| | | |
| | | // 解析字符串为 LocalDateTime 对象 |
| | | LocalDateTime startDateTime = LocalDateTime.parse(startTime.toString(), formatter1);// 解析字符串为 LocalDateTime 对象 |
| | | LocalDateTime endDateTime = LocalDateTime.parse(endTime.toString(), formatter1); |
| | | |
| | | // 加上 8 小时 |
| | | LocalDateTime startNewDateTime = startDateTime.minusHours(8); |
| | | LocalDateTime endNewDateTime = endDateTime.minusHours(8); |
| | | // String query = "from(bucket: \"IOT\") \n" + |
| | | // "|> range(start: "+startNewDateTime+"Z, stop: "+endNewDateTime+"Z) \n" + |
| | | // "|> fill(usePrevious: true) \n" + |
| | | // "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; |
| | | String query = "from(bucket: \"" + influxBucket + "\") " + |
| | | "|> range(start: -5s) " + |
| | | "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; |
| | | |
| | | // 执行查询 |
| | | try { |
| | | List<FluxTable> tables = queryApi.query(query, influxOrg); |
| | | // 处理查询结果并插入到 MySQL |
| | | List<Influxdb> influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection); |
| | | // 根据类型拆分 |
| | | Map<String, List<Influxdb>> fieldistMap = influxdbs.stream() |
| | | .collect(Collectors.groupingBy(Influxdb::getField)); |
| | | List<Influxdb> valueList = new ArrayList<>(); |
| | | List<Influxdb> timeStampList = new ArrayList<>(); |
| | | fieldistMap.forEach((field, timeList) -> { |
| | | if (field.equals("Value")) { |
| | | valueList.addAll(timeList); |
| | | } else { |
| | | timeStampList.addAll(timeList); |
| | | } |
| | | }); |
| | | for (int i = 0; i < valueList.size(); i++) { |
| | | valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue()); |
| | | } |
| | | // 根据设备进行分组 |
| | | Map<String, List<Influxdb>> measurementListMap = valueList.stream() |
| | | .collect(Collectors.groupingBy(Influxdb::getMeasurement)); |
| | | measurementListMap.forEach((table, timeList) -> { |
| | | DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); |
| | | Map<String, List<Influxdb>> timeListMap = timeList.stream() |
| | | .collect(Collectors.groupingBy( |
| | | Influxdb::getAcquisitionTime, |
| | | () -> new TreeMap<>(Comparator.comparing( |
| | | timeStr -> LocalDateTime.parse(timeStr, formatter) |
| | | )), |
| | | Collectors.toList() |
| | | )); |
| | | |
| | | // 处理设备状态 |
| | | int lastIndex = table.lastIndexOf('.'); |
| | | String code = table.substring(lastIndex + 1); |
| | | EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code); |
| | | // 设备状态 |
| | | Integer equipmentState = null; |
| | | for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) { |
| | | String measurement = entry.getKey(); |
| | | List<Influxdb> influxdbList = entry.getValue(); |
| | | Equipment equipment = equipmentService.findEquipmentByCode(code); |
| | | String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode(); |
| | | // 获取表名 |
| | | String[] columns = new String[influxdbList.size() + 2]; |
| | | Object[] values = new Object[influxdbList.size() + 2]; |
| | | String state = ""; // 设备状态 |
| | | String isAlarm = ""; // 报警状态 |
| | | String alarm = ""; // 报警号 |
| | | String alarmContent = ""; // 报警内容 |
| | | // 遍历当前设备下的所有参数 |
| | | for (int i = 0; i < influxdbList.size(); i++) { |
| | | String parameter = influxdbList.get(i).getParameter(); |
| | | columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1); |
| | | values[i] = influxdbList.get(i).getValue(); |
| | | if (values[i] == null || values[i] == "") { |
| | | values[i] = ""; |
| | | } else { |
| | | if (columns[i].equals("Oporation")) { |
| | | state = values[i].toString(); |
| | | } else if (columns[i].equals("AlarmNo")) { |
| | | alarm = values[i].toString(); |
| | | } else if (columns[i].equals("AlarmContent")) { |
| | | alarmContent = values[i].toString(); |
| | | } else if (columns[i].equals("IsAlarm")) { |
| | | isAlarm = values[i].toString(); |
| | | } |
| | | } |
| | | |
| | | } |
| | | // 验证数据是否都为空 |
| | | if (!allEmptyStrings(values)) { |
| | | columns[influxdbList.size() + 1] = "CollectTime"; |
| | | columns[influxdbList.size()] = "EquipmentID"; |
| | | // 转换为 LocalDateTime |
| | | LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter); |
| | | values[influxdbList.size() + 1] = dateTime; |
| | | values[influxdbList.size()] = tableName.replace("-", "_"); |
| | | // 插入数据 |
| | | if (databaseType.equals("SqlServer")) { |
| | | equipmentService.insertSqlServerData(tableName, columns, values); |
| | | } else if (databaseType.equals("MySql")) { |
| | | equipmentService.insertMySqlData(tableName, columns, values); |
| | | |
| | | } |
| | | if (!state.equals("")) { |
| | | EquipmentLog log = new EquipmentLog(); |
| | | log.setOporation(Integer.parseInt(state)); |
| | | log.setEquipmentId(equipment.getEqptCode()); |
| | | log.setEquipmentName(equipment.getEqptName()); |
| | | log.setCollectTime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant())); |
| | | // True 报警开始,False报警结束 |
| | | if (isAlarm.equals("True")) { |
| | | // 报警开始 |
| | | if (equipmentLog == null && equipmentState == null) { |
| | | // 验证是否报警 |
| | | log.setOporation(22); |
| | | log.setAlarm(alarm); |
| | | equipmentLogService.save(log); |
| | | equipmentState = 22; |
| | | equipmentStateModification(equipment.getEqptCode(), 22); |
| | | } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != 22) { |
| | | log.setOporation(22); |
| | | log.setAlarm(alarm); |
| | | equipmentState = 22; |
| | | equipmentLogService.save(log); |
| | | equipmentStateModification(equipment.getEqptCode(), 22); |
| | | } else if (equipmentLog != null && equipmentState != null && equipmentState != 22) { |
| | | log.setOporation(22); |
| | | log.setAlarm(alarm); |
| | | equipmentState = 22; |
| | | equipmentLogService.save(log); |
| | | equipmentStateModification(equipment.getEqptCode(), 22); |
| | | } |
| | | } else if (isAlarm.equals("False")) { |
| | | // 报警结束 |
| | | if (equipmentLog == null && equipmentState == null) { |
| | | // 验证是否报警 |
| | | equipmentLogService.save(log); |
| | | equipmentState = Integer.parseInt(state); |
| | | equipmentStateModification(equipment.getEqptCode(), equipmentState); |
| | | } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() == 22) { |
| | | log.setOporation(23); |
| | | equipmentState = 23; |
| | | equipmentLogService.save(log); |
| | | equipmentStateModification(equipment.getEqptCode(), equipmentState); |
| | | } else if (equipmentLog != null && equipmentState != null && equipmentState == 22) { |
| | | log.setOporation(23); |
| | | equipmentState = 23; |
| | | equipmentLogService.save(log); |
| | | equipmentStateModification(equipment.getEqptCode(), 23); |
| | | } else { |
| | | if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) { |
| | | equipmentState = Integer.parseInt(state); |
| | | equipmentLogService.save(log); |
| | | equipmentStateModification(equipment.getEqptCode(), equipmentState); |
| | | //验证是否是报警,或者报警结束 |
| | | } else if (equipmentLog != null && equipmentState != null && equipmentState != Integer.parseInt(state)) { |
| | | equipmentState = Integer.parseInt(state); |
| | | equipmentLogService.save(log); |
| | | equipmentStateModification(equipment.getEqptCode(), equipmentState); |
| | | } |
| | | } |
| | | } |
| | | // 处理报警信息 |
| | | if (isAlarm.equals("True")) { |
| | | EquipmentAlarm equipmentAlarm = equipmentAlarmService.selectEquipmentAlarmByEquipmentId(equipment.getEqptCode()); |
| | | if (equipmentAlarm != null && !equipmentAlarm.getAlarmNo().equals(alarm)) { |
| | | addEquipmentAlarm(alarm, equipment, dateTime, alarmContent); |
| | | } else if (equipmentAlarm == null) { |
| | | addEquipmentAlarm(alarm, equipment, dateTime, alarmContent); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | // 如果有查询数据,时间不为空 |
| | | |
| | | if (endDate.get() != null && CollectionUtils.isNotEmpty(tables)) { |
| | | for (ServerDeploy s : serverDeployList) { |
| | | if (s.getId().equals(in.getServerDeployId())) { |
| | | s.setCollectTime(endDate.get()); |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error(String.valueOf(e)); |
| | | } |
| | | } catch (SQLException e) { |
| | | e.printStackTrace(); |
| | | } finally { |
| | | influxDBClient.close(); |
| | | } |
| | | }); |
| | | serverDeployService.updateBatchById(serverDeployList); |
| | | // 直接在输出语句中格式化 |
| | | log.info("实时数据任务执行,当前时间:" + |
| | | LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()) |
| | | .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); |
| | | } |
| | | |
| | | public static boolean allEmptyStrings(Object[] values) { |
| | | if (values == null) { |
| | | return false; // 如果数组本身为null,返回false |
| | | } |
| | | for (Object str : values) { |
| | | if (str != null && !str.toString().equals("")) { |
| | | return false; // 如果元素不是String类型,返回false |
| | | } |
| | | } |
| | | |
| | | return true; // 所有元素都是空字符串 |
| | | } |
| | | |
| | | /** |
| | | * 报警信息处理 |
| | | * |
| | | * @param alarm 报警号 |
| | | * @param equipment 设备信息 |
| | | * @param dateTime 报警时间 |
| | | * @param alarmContent 报警内容 |
| | | */ |
| | | public void addEquipmentAlarm(String alarm, Equipment equipment, LocalDateTime dateTime, String alarmContent) { |
| | | // 验证是否报警 |
| | | EquipmentAlarm equipmentAlarm = new EquipmentAlarm(); |
| | | equipmentAlarm.setEquipmentid(equipment.getEqptCode()); |
| | | equipmentAlarm.setCollecttime(Date.from(dateTime.atZone(java.time.ZoneId.systemDefault()).toInstant())); |
| | | equipmentAlarm.setAlarmNo(alarm); |
| | | equipmentAlarm.setAlarmContent(alarmContent); |
| | | equipmentAlarmService.save(equipmentAlarm); |
| | | |
| | | } |
| | | |
| | | void equipmentStateModification(String eqptCode, Integer state) { |
| | | org.jeecg.modules.iot.mdc.entity.Equipment mdcEquipment = mdcEquipmentService.findEquipmentByEquipmentId(eqptCode); |
| | | if (mdcEquipment != null) { |
| | | mdcEquipment.setOporation(state); |
| | | mdcEquipmentService.updateById(mdcEquipment); |
| | | } |
| | | } |
| | | } |