From 68d4ddc476cbaf61432eec15f8f04dbe838296a6 Mon Sep 17 00:00:00 2001 From: cuikaidong <ckd2942379034@163.com> Date: 星期二, 26 八月 2025 13:17:20 +0800 Subject: [PATCH] iot定日任务sql修改 --- lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java | 346 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 331 insertions(+), 15 deletions(-) diff --git a/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java b/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java index db2a71c..572c6e3 100644 --- a/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java +++ b/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java @@ -1,32 +1,348 @@ package org.jeecg.modules.quartz.job; -import org.jeecg.common.util.DateUtils; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.QueryApi; +import com.influxdb.query.FluxTable; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.jeecg.modules.iot.depository.Influxdb; +import org.jeecg.modules.iot.depository.InfluxdbTest; +import org.jeecg.modules.iot.entity.Equipment; +import org.jeecg.modules.iot.entity.InfluxdbDeploy; +import org.jeecg.modules.iot.entity.ServerDeploy; +import org.jeecg.modules.iot.mdc.entity.EquipmentAlarm; +import org.jeecg.modules.iot.mdc.entity.EquipmentLog; +import org.jeecg.modules.iot.mdc.service.IEquipmentAlarmService; +import org.jeecg.modules.iot.mdc.service.IEquipmentLogService; +import org.jeecg.modules.iot.service.IEquipmentService; +import org.jeecg.modules.iot.service.IInfluxdbDeployService; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; -import lombok.extern.slf4j.Slf4j; +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** - * 绀轰緥甯﹀弬瀹氭椂浠诲姟 - * + * influxdb瀹氭椂浠诲姟 + * * @Author Scott */ @Slf4j public class SampleParamJob implements Job { - - /** - * 鑻ュ弬鏁板彉閲忓悕淇敼 QuartzJobController涓篃闇�瀵瑰簲淇敼 - */ - private String parameter; - - public void setParameter(String parameter) { - this.parameter = parameter; - } + @Value("${databaseType}") + private String databaseType; + @Autowired + private IInfluxdbDeployService influxdbDeployService; + @Autowired + private DataSource dataSource; + @Autowired + private IEquipmentService equipmentService; + @Autowired + private IEquipmentLogService equipmentLogService; + @Autowired + private org.jeecg.modules.iot.mdc.service.IEquipmentService mdcEquipmentService; + @Autowired + private IEquipmentAlarmService equipmentAlarmService; + @Autowired + private org.jeecg.modules.iot.service.IServerDeployService serverDeployService; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - log.info(" Job Execution key锛�"+jobExecutionContext.getJobDetail().getKey()); - log.info( String.format("welcome %s! Jeecg-Boot 甯﹀弬鏁板畾鏃朵换鍔� SampleParamJob ! 鏃堕棿:" + DateUtils.now(), this.parameter)); + List<InfluxdbDeploy> influxdbDeployList = influxdbDeployService.list(); + List<ServerDeploy> serverDeployList = serverDeployService.list(); + + influxdbDeployList.forEach(in -> { + // InfluxDB 2.0 杩炴帴淇℃伅 + String influxUrl = "http://" + in.getAddress() + ":" + in.getPort(); + String influxToken = in.getAuthorizeCode(); + String influxOrg = in.getOrganization(); + String influxBucket = in.getBucket(); + // 杩炴帴 InfluxDB 2.0 + InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray()); + QueryApi queryApi = influxDBClient.getQueryApi(); + // 杩炴帴 SqlServer + try (Connection mysqlConnection = dataSource.getConnection()) { + // 澶勭悊鏃堕棿 + DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + AtomicReference<String> start = new AtomicReference<>(""); + AtomicReference<String> end = new AtomicReference<>(""); + AtomicReference<Date> endDate = new AtomicReference<>(null); // 閲囬泦鏃堕棿 + serverDeployList.forEach(s -> { + if (s.getId().equals(in.getServerDeployId()) && s.getCollectTime() != null) { + start.set(sdf.format(s.getCollectTime())); + // 鑾峰彇褰撳墠鏃堕棿 + LocalDateTime now = LocalDateTime.now(); + // 鍑忓幓涓�鍒嗛挓 +// LocalDateTime oneMinuteAgo = now.minusMinutes(1); + LocalDateTime oneMinuteAgo = now.minusSeconds(2); + // 鏍煎紡鍖栦负瀛楃涓诧紙鍙�夛級 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String formattedTime = oneMinuteAgo.format(formatter); + // 杞崲涓� Date 绫诲瀷锛堜娇鐢ㄧ郴缁熼粯璁ゆ椂鍖猴級 + Date date = Date.from(oneMinuteAgo.atZone(ZoneId.systemDefault()).toInstant()); + endDate.set(date); + end.set(formattedTime); + } + }); + if (start.get().equals("")){ + return; + } + LocalDateTime startTime = LocalDateTime.parse(start.get(), inputFormatter); + LocalDateTime endTime = LocalDateTime.parse(end.get(), inputFormatter); + // 瀹氫箟鏃ユ湡鏃堕棿鏍煎紡鍣紙ISO_LOCAL_DATE_TIME 瀵瑰簲 "yyyy-MM-ddTHH:mm:ss"锛� + DateTimeFormatter formatter1 = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + + // 瑙f瀽瀛楃涓蹭负 LocalDateTime 瀵硅薄 + LocalDateTime startDateTime = LocalDateTime.parse(startTime.toString(), formatter1);// 瑙f瀽瀛楃涓蹭负 LocalDateTime 瀵硅薄 + LocalDateTime endDateTime = LocalDateTime.parse(endTime.toString(), formatter1); + + // 鍔犱笂 8 灏忔椂 + LocalDateTime startNewDateTime = startDateTime.minusHours(8); + LocalDateTime endNewDateTime = endDateTime.minusHours(8); +// String query = "from(bucket: \"IOT\") \n" + +// "|> range(start: "+startNewDateTime+"Z, stop: "+endNewDateTime+"Z) \n" + +// "|> fill(usePrevious: true) \n" + +// "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; + String query = "from(bucket: \"" + influxBucket + "\") " + + "|> range(start: -5s) " + + "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; + + // 鎵ц鏌ヨ + try { + List<FluxTable> tables = queryApi.query(query, influxOrg); + // 澶勭悊鏌ヨ缁撴灉骞舵彃鍏ュ埌 MySQL + List<Influxdb> influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection); + // 鏍规嵁绫诲瀷鎷嗗垎 + Map<String, List<Influxdb>> fieldistMap = influxdbs.stream() + .collect(Collectors.groupingBy(Influxdb::getField)); + List<Influxdb> valueList = new ArrayList<>(); + List<Influxdb> timeStampList = new ArrayList<>(); + fieldistMap.forEach((field, timeList) -> { + if (field.equals("Value")) { + valueList.addAll(timeList); + } else { + timeStampList.addAll(timeList); + } + }); + for (int i = 0; i < valueList.size(); i++) { + valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue()); + } + // 鏍规嵁璁惧杩涜鍒嗙粍 + Map<String, List<Influxdb>> measurementListMap = valueList.stream() + .collect(Collectors.groupingBy(Influxdb::getMeasurement)); + measurementListMap.forEach((table, timeList) -> { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + Map<String, List<Influxdb>> timeListMap = timeList.stream() + .collect(Collectors.groupingBy( + Influxdb::getAcquisitionTime, + () -> new TreeMap<>(Comparator.comparing( + timeStr -> LocalDateTime.parse(timeStr, formatter) + )), + Collectors.toList() + )); + + // 澶勭悊璁惧鐘舵�� + int lastIndex = table.lastIndexOf('.'); + String code = table.substring(lastIndex + 1); + EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code); + // 璁惧鐘舵�� + Integer equipmentState = null; + for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) { + String measurement = entry.getKey(); + List<Influxdb> influxdbList = entry.getValue(); + Equipment equipment = equipmentService.findEquipmentByCode(code); + String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode(); + // 鑾峰彇琛ㄥ悕 + String[] columns = new String[influxdbList.size() + 2]; + Object[] values = new Object[influxdbList.size() + 2]; + String state = ""; // 璁惧鐘舵�� + String isAlarm = ""; // 鎶ヨ鐘舵�� + String alarm = ""; // 鎶ヨ鍙� + String alarmContent = ""; // 鎶ヨ鍐呭 + // 閬嶅巻褰撳墠璁惧涓嬬殑鎵�鏈夊弬鏁� + for (int i = 0; i < influxdbList.size(); i++) { + String parameter = influxdbList.get(i).getParameter(); + columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1); + values[i] = influxdbList.get(i).getValue(); + if (values[i] == null || values[i] == "") { + values[i] = ""; + } else { + if (columns[i].equals("Oporation")) { + state = values[i].toString(); + } else if (columns[i].equals("AlarmNo")) { + alarm = values[i].toString(); + } else if (columns[i].equals("AlarmContent")) { + alarmContent = values[i].toString(); + } else if (columns[i].equals("IsAlarm")) { + isAlarm = values[i].toString(); + } + } + + } + // 楠岃瘉鏁版嵁鏄惁閮戒负绌� + if (!allEmptyStrings(values)) { + columns[influxdbList.size() + 1] = "CollectTime"; + columns[influxdbList.size()] = "EquipmentID"; + // 杞崲涓� LocalDateTime + LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter); + values[influxdbList.size() + 1] = dateTime; + values[influxdbList.size()] = tableName.replace("-", "_"); + // 鎻掑叆鏁版嵁 + if (databaseType.equals("SqlServer")) { + equipmentService.insertSqlServerData(tableName, columns, values); + } else if (databaseType.equals("MySql")) { + equipmentService.insertMySqlData(tableName, columns, values); + + } + if (!state.equals("")) { + EquipmentLog log = new EquipmentLog(); + log.setOporation(Integer.parseInt(state)); + log.setEquipmentId(equipment.getEqptCode()); + log.setEquipmentName(equipment.getEqptName()); + log.setCollectTime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant())); + // True 鎶ヨ寮�濮嬶紝False鎶ヨ缁撴潫 + if (isAlarm.equals("True")) { + // 鎶ヨ寮�濮� + if (equipmentLog == null && equipmentState == null) { + // 楠岃瘉鏄惁鎶ヨ + log.setOporation(22); + log.setAlarm(alarm); + equipmentLogService.save(log); + equipmentState = 22; + equipmentStateModification(equipment.getEqptCode(), 22); + } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != 22) { + log.setOporation(22); + log.setAlarm(alarm); + equipmentState = 22; + equipmentLogService.save(log); + equipmentStateModification(equipment.getEqptCode(), 22); + } else if (equipmentLog != null && equipmentState != null && equipmentState != 22) { + log.setOporation(22); + log.setAlarm(alarm); + equipmentState = 22; + equipmentLogService.save(log); + equipmentStateModification(equipment.getEqptCode(), 22); + } + } else if (isAlarm.equals("False")) { + // 鎶ヨ缁撴潫 + if (equipmentLog == null && equipmentState == null) { + // 楠岃瘉鏄惁鎶ヨ + equipmentLogService.save(log); + equipmentState = Integer.parseInt(state); + equipmentStateModification(equipment.getEqptCode(), equipmentState); + } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() == 22) { + log.setOporation(23); + equipmentState = 23; + equipmentLogService.save(log); + equipmentStateModification(equipment.getEqptCode(), equipmentState); + } else if (equipmentLog != null && equipmentState != null && equipmentState == 22) { + log.setOporation(23); + equipmentState = 23; + equipmentLogService.save(log); + equipmentStateModification(equipment.getEqptCode(), 23); + } else { + if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) { + equipmentState = Integer.parseInt(state); + equipmentLogService.save(log); + equipmentStateModification(equipment.getEqptCode(), equipmentState); + //楠岃瘉鏄惁鏄姤璀︼紝鎴栬�呮姤璀︾粨鏉� + } else if (equipmentLog != null && equipmentState != null && equipmentState != Integer.parseInt(state)) { + equipmentState = Integer.parseInt(state); + equipmentLogService.save(log); + equipmentStateModification(equipment.getEqptCode(), equipmentState); + } + } + } + // 澶勭悊鎶ヨ淇℃伅 + if (isAlarm.equals("True")) { + EquipmentAlarm equipmentAlarm = equipmentAlarmService.selectEquipmentAlarmByEquipmentId(equipment.getEqptCode()); + if (equipmentAlarm != null && !equipmentAlarm.getAlarmNo().equals(alarm)) { + addEquipmentAlarm(alarm, equipment, dateTime, alarmContent); + } else if (equipmentAlarm == null) { + addEquipmentAlarm(alarm, equipment, dateTime, alarmContent); + } + } + } + } + } + }); + // 濡傛灉鏈夋煡璇㈡暟鎹紝鏃堕棿涓嶄负绌� + + if (endDate.get() != null && CollectionUtils.isNotEmpty(tables)) { + for (ServerDeploy s : serverDeployList) { + if (s.getId().equals(in.getServerDeployId())) { + s.setCollectTime(endDate.get()); + return; + } + } + } + } catch (Exception e) { + log.error(String.valueOf(e)); + } + } catch (SQLException e) { + e.printStackTrace(); + } finally { + influxDBClient.close(); + } + }); + serverDeployService.updateBatchById(serverDeployList); + // 鐩存帴鍦ㄨ緭鍑鸿鍙ヤ腑鏍煎紡鍖� + log.info("瀹炴椂鏁版嵁浠诲姟鎵ц锛屽綋鍓嶆椂闂达細" + + LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault()) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); + } + + public static boolean allEmptyStrings(Object[] values) { + if (values == null) { + return false; // 濡傛灉鏁扮粍鏈韩涓簄ull锛岃繑鍥瀎alse + } + for (Object str : values) { + if (str != null && !str.toString().equals("")) { + return false; // 濡傛灉鍏冪礌涓嶆槸String绫诲瀷锛岃繑鍥瀎alse + } + } + + return true; // 鎵�鏈夊厓绱犻兘鏄┖瀛楃涓� + } + + /** + * 鎶ヨ淇℃伅澶勭悊 + * + * @param alarm 鎶ヨ鍙� + * @param equipment 璁惧淇℃伅 + * @param dateTime 鎶ヨ鏃堕棿 + * @param alarmContent 鎶ヨ鍐呭 + */ + public void addEquipmentAlarm(String alarm, Equipment equipment, LocalDateTime dateTime, String alarmContent) { + // 楠岃瘉鏄惁鎶ヨ + EquipmentAlarm equipmentAlarm = new EquipmentAlarm(); + equipmentAlarm.setEquipmentid(equipment.getEqptCode()); + equipmentAlarm.setCollecttime(Date.from(dateTime.atZone(java.time.ZoneId.systemDefault()).toInstant())); + equipmentAlarm.setAlarmNo(alarm); + equipmentAlarm.setAlarmContent(alarmContent); + equipmentAlarmService.save(equipmentAlarm); + + } + + void equipmentStateModification(String eqptCode, Integer state) { + org.jeecg.modules.iot.mdc.entity.Equipment mdcEquipment = mdcEquipmentService.findEquipmentByEquipmentId(eqptCode); + if (mdcEquipment != null) { + mdcEquipment.setOporation(state); + mdcEquipmentService.updateById(mdcEquipment); + } } } -- Gitblit v1.9.3