From 68d4ddc476cbaf61432eec15f8f04dbe838296a6 Mon Sep 17 00:00:00 2001
From: cuikaidong <ckd2942379034@163.com>
Date: 星期二, 26 八月 2025 13:17:20 +0800
Subject: [PATCH] iot定日任务sql修改

---
 lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java |  346 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 331 insertions(+), 15 deletions(-)

diff --git a/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java b/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java
index db2a71c..572c6e3 100644
--- a/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java
+++ b/lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java
@@ -1,32 +1,348 @@
 package org.jeecg.modules.quartz.job;
 
-import org.jeecg.common.util.DateUtils;
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.QueryApi;
+import com.influxdb.query.FluxTable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.jeecg.modules.iot.depository.Influxdb;
+import org.jeecg.modules.iot.depository.InfluxdbTest;
+import org.jeecg.modules.iot.entity.Equipment;
+import org.jeecg.modules.iot.entity.InfluxdbDeploy;
+import org.jeecg.modules.iot.entity.ServerDeploy;
+import org.jeecg.modules.iot.mdc.entity.EquipmentAlarm;
+import org.jeecg.modules.iot.mdc.entity.EquipmentLog;
+import org.jeecg.modules.iot.mdc.service.IEquipmentAlarmService;
+import org.jeecg.modules.iot.mdc.service.IEquipmentLogService;
+import org.jeecg.modules.iot.service.IEquipmentService;
+import org.jeecg.modules.iot.service.IInfluxdbDeployService;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 
-import lombok.extern.slf4j.Slf4j;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 /**
- * 绀轰緥甯﹀弬瀹氭椂浠诲姟
- * 
+ * influxdb瀹氭椂浠诲姟
+ *
  * @Author Scott
  */
 @Slf4j
 public class SampleParamJob implements Job {
-
-	/**
-	 * 鑻ュ弬鏁板彉閲忓悕淇敼 QuartzJobController涓篃闇�瀵瑰簲淇敼
-	 */
-	private String parameter;
-
-	public void setParameter(String parameter) {
-		this.parameter = parameter;
-	}
+	@Value("${databaseType}")
+	private String databaseType;
+	@Autowired
+	private IInfluxdbDeployService influxdbDeployService;
+	@Autowired
+	private DataSource dataSource;
+	@Autowired
+	private IEquipmentService equipmentService;
+	@Autowired
+	private IEquipmentLogService equipmentLogService;
+	@Autowired
+	private org.jeecg.modules.iot.mdc.service.IEquipmentService mdcEquipmentService;
+	@Autowired
+	private IEquipmentAlarmService equipmentAlarmService;
+	@Autowired
+	private org.jeecg.modules.iot.service.IServerDeployService serverDeployService;
 
 	@Override
 	public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
-		log.info(" Job Execution key锛�"+jobExecutionContext.getJobDetail().getKey());
-		log.info( String.format("welcome %s! Jeecg-Boot 甯﹀弬鏁板畾鏃朵换鍔� SampleParamJob !   鏃堕棿:" + DateUtils.now(), this.parameter));
+		List<InfluxdbDeploy> influxdbDeployList = influxdbDeployService.list();
+		List<ServerDeploy> serverDeployList = serverDeployService.list();
+
+		influxdbDeployList.forEach(in -> {
+			// InfluxDB 2.0 杩炴帴淇℃伅
+			String influxUrl = "http://" + in.getAddress() + ":" + in.getPort();
+			String influxToken = in.getAuthorizeCode();
+			String influxOrg = in.getOrganization();
+			String influxBucket = in.getBucket();
+			// 杩炴帴 InfluxDB 2.0
+			InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray());
+			QueryApi queryApi = influxDBClient.getQueryApi();
+			// 杩炴帴 SqlServer
+			try (Connection mysqlConnection = dataSource.getConnection()) {
+				// 澶勭悊鏃堕棿
+				DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+				AtomicReference<String> start = new AtomicReference<>("");
+				AtomicReference<String> end = new AtomicReference<>("");
+				AtomicReference<Date> endDate = new AtomicReference<>(null); // 閲囬泦鏃堕棿
+				serverDeployList.forEach(s -> {
+					if (s.getId().equals(in.getServerDeployId()) && s.getCollectTime() != null) {
+						start.set(sdf.format(s.getCollectTime()));
+						// 鑾峰彇褰撳墠鏃堕棿
+						LocalDateTime now = LocalDateTime.now();
+						// 鍑忓幓涓�鍒嗛挓
+//                        LocalDateTime oneMinuteAgo = now.minusMinutes(1);
+						LocalDateTime oneMinuteAgo = now.minusSeconds(2);
+						// 鏍煎紡鍖栦负瀛楃涓诧紙鍙�夛級
+						DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+						String formattedTime = oneMinuteAgo.format(formatter);
+						// 杞崲涓� Date 绫诲瀷锛堜娇鐢ㄧ郴缁熼粯璁ゆ椂鍖猴級
+						Date date = Date.from(oneMinuteAgo.atZone(ZoneId.systemDefault()).toInstant());
+						endDate.set(date);
+						end.set(formattedTime);
+					}
+				});
+				if (start.get().equals("")){
+					return;
+				}
+				LocalDateTime startTime = LocalDateTime.parse(start.get(), inputFormatter);
+				LocalDateTime endTime = LocalDateTime.parse(end.get(), inputFormatter);
+				// 瀹氫箟鏃ユ湡鏃堕棿鏍煎紡鍣紙ISO_LOCAL_DATE_TIME 瀵瑰簲 "yyyy-MM-ddTHH:mm:ss"锛�
+				DateTimeFormatter formatter1 = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+
+				// 瑙f瀽瀛楃涓蹭负 LocalDateTime 瀵硅薄
+				LocalDateTime startDateTime = LocalDateTime.parse(startTime.toString(), formatter1);// 瑙f瀽瀛楃涓蹭负 LocalDateTime 瀵硅薄
+				LocalDateTime endDateTime = LocalDateTime.parse(endTime.toString(), formatter1);
+
+				// 鍔犱笂 8 灏忔椂
+				LocalDateTime startNewDateTime = startDateTime.minusHours(8);
+				LocalDateTime endNewDateTime = endDateTime.minusHours(8);
+//                String query = "from(bucket: \"IOT\") \n" +
+//                        "|> range(start: "+startNewDateTime+"Z, stop: "+endNewDateTime+"Z) \n" +
+//                        "|> fill(usePrevious: true) \n" +
+//                        "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")";
+				String query = "from(bucket: \"" + influxBucket + "\") " +
+						"|> range(start: -5s) " +
+						"|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")";
+
+				// 鎵ц鏌ヨ
+				try {
+					List<FluxTable> tables = queryApi.query(query, influxOrg);
+					// 澶勭悊鏌ヨ缁撴灉骞舵彃鍏ュ埌 MySQL
+					List<Influxdb> influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection);
+					// 鏍规嵁绫诲瀷鎷嗗垎
+					Map<String, List<Influxdb>> fieldistMap = influxdbs.stream()
+							.collect(Collectors.groupingBy(Influxdb::getField));
+					List<Influxdb> valueList = new ArrayList<>();
+					List<Influxdb> timeStampList = new ArrayList<>();
+					fieldistMap.forEach((field, timeList) -> {
+						if (field.equals("Value")) {
+							valueList.addAll(timeList);
+						} else {
+							timeStampList.addAll(timeList);
+						}
+					});
+					for (int i = 0; i < valueList.size(); i++) {
+						valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue());
+					}
+					// 鏍规嵁璁惧杩涜鍒嗙粍
+					Map<String, List<Influxdb>> measurementListMap = valueList.stream()
+							.collect(Collectors.groupingBy(Influxdb::getMeasurement));
+					measurementListMap.forEach((table, timeList) -> {
+						DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+						Map<String, List<Influxdb>> timeListMap = timeList.stream()
+								.collect(Collectors.groupingBy(
+										Influxdb::getAcquisitionTime,
+										() -> new TreeMap<>(Comparator.comparing(
+												timeStr -> LocalDateTime.parse(timeStr, formatter)
+										)),
+										Collectors.toList()
+								));
+
+						// 澶勭悊璁惧鐘舵��
+						int lastIndex = table.lastIndexOf('.');
+						String code = table.substring(lastIndex + 1);
+						EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code);
+						// 璁惧鐘舵��
+						Integer equipmentState = null;
+						for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) {
+							String measurement = entry.getKey();
+							List<Influxdb> influxdbList = entry.getValue();
+							Equipment equipment = equipmentService.findEquipmentByCode(code);
+							String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode();
+							// 鑾峰彇琛ㄥ悕
+							String[] columns = new String[influxdbList.size() + 2];
+							Object[] values = new Object[influxdbList.size() + 2];
+							String state = ""; // 璁惧鐘舵��
+							String isAlarm = ""; // 鎶ヨ鐘舵��
+							String alarm = ""; // 鎶ヨ鍙�
+							String alarmContent = ""; // 鎶ヨ鍐呭
+							// 閬嶅巻褰撳墠璁惧涓嬬殑鎵�鏈夊弬鏁�
+							for (int i = 0; i < influxdbList.size(); i++) {
+								String parameter = influxdbList.get(i).getParameter();
+								columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1);
+								values[i] = influxdbList.get(i).getValue();
+								if (values[i] == null || values[i] == "") {
+									values[i] = "";
+								} else {
+									if (columns[i].equals("Oporation")) {
+										state = values[i].toString();
+									} else if (columns[i].equals("AlarmNo")) {
+										alarm = values[i].toString();
+									} else if (columns[i].equals("AlarmContent")) {
+										alarmContent = values[i].toString();
+									} else if (columns[i].equals("IsAlarm")) {
+										isAlarm = values[i].toString();
+									}
+								}
+
+							}
+							// 楠岃瘉鏁版嵁鏄惁閮戒负绌�
+							if (!allEmptyStrings(values)) {
+								columns[influxdbList.size() + 1] = "CollectTime";
+								columns[influxdbList.size()] = "EquipmentID";
+								// 杞崲涓� LocalDateTime
+								LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter);
+								values[influxdbList.size() + 1] = dateTime;
+								values[influxdbList.size()] = tableName.replace("-", "_");
+								// 鎻掑叆鏁版嵁
+								if (databaseType.equals("SqlServer")) {
+									equipmentService.insertSqlServerData(tableName, columns, values);
+								} else if (databaseType.equals("MySql")) {
+									equipmentService.insertMySqlData(tableName, columns, values);
+
+								}
+								if (!state.equals("")) {
+									EquipmentLog log = new EquipmentLog();
+									log.setOporation(Integer.parseInt(state));
+									log.setEquipmentId(equipment.getEqptCode());
+									log.setEquipmentName(equipment.getEqptName());
+									log.setCollectTime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()));
+									// True 鎶ヨ寮�濮嬶紝False鎶ヨ缁撴潫
+									if (isAlarm.equals("True")) {
+										// 鎶ヨ寮�濮�
+										if (equipmentLog == null && equipmentState == null) {
+											// 楠岃瘉鏄惁鎶ヨ
+											log.setOporation(22);
+											log.setAlarm(alarm);
+											equipmentLogService.save(log);
+											equipmentState = 22;
+											equipmentStateModification(equipment.getEqptCode(), 22);
+										} else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != 22) {
+											log.setOporation(22);
+											log.setAlarm(alarm);
+											equipmentState = 22;
+											equipmentLogService.save(log);
+											equipmentStateModification(equipment.getEqptCode(), 22);
+										} else if (equipmentLog != null && equipmentState != null && equipmentState != 22) {
+											log.setOporation(22);
+											log.setAlarm(alarm);
+											equipmentState = 22;
+											equipmentLogService.save(log);
+											equipmentStateModification(equipment.getEqptCode(), 22);
+										}
+									} else if (isAlarm.equals("False")) {
+										// 鎶ヨ缁撴潫
+										if (equipmentLog == null && equipmentState == null) {
+											// 楠岃瘉鏄惁鎶ヨ
+											equipmentLogService.save(log);
+											equipmentState = Integer.parseInt(state);
+											equipmentStateModification(equipment.getEqptCode(), equipmentState);
+										} else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() == 22) {
+											log.setOporation(23);
+											equipmentState = 23;
+											equipmentLogService.save(log);
+											equipmentStateModification(equipment.getEqptCode(), equipmentState);
+										} else if (equipmentLog != null && equipmentState != null && equipmentState == 22) {
+											log.setOporation(23);
+											equipmentState = 23;
+											equipmentLogService.save(log);
+											equipmentStateModification(equipment.getEqptCode(), 23);
+										} else {
+											if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) {
+												equipmentState = Integer.parseInt(state);
+												equipmentLogService.save(log);
+												equipmentStateModification(equipment.getEqptCode(), equipmentState);
+												//楠岃瘉鏄惁鏄姤璀︼紝鎴栬�呮姤璀︾粨鏉�
+											} else if (equipmentLog != null && equipmentState != null && equipmentState != Integer.parseInt(state)) {
+												equipmentState = Integer.parseInt(state);
+												equipmentLogService.save(log);
+												equipmentStateModification(equipment.getEqptCode(), equipmentState);
+											}
+										}
+									}
+									// 澶勭悊鎶ヨ淇℃伅
+									if (isAlarm.equals("True")) {
+										EquipmentAlarm equipmentAlarm = equipmentAlarmService.selectEquipmentAlarmByEquipmentId(equipment.getEqptCode());
+										if (equipmentAlarm != null && !equipmentAlarm.getAlarmNo().equals(alarm)) {
+											addEquipmentAlarm(alarm, equipment, dateTime, alarmContent);
+										} else if (equipmentAlarm == null) {
+											addEquipmentAlarm(alarm, equipment, dateTime, alarmContent);
+										}
+									}
+								}
+							}
+						}
+					});
+					// 濡傛灉鏈夋煡璇㈡暟鎹紝鏃堕棿涓嶄负绌�
+
+					if (endDate.get() != null && CollectionUtils.isNotEmpty(tables)) {
+						for (ServerDeploy s : serverDeployList) {
+							if (s.getId().equals(in.getServerDeployId())) {
+								s.setCollectTime(endDate.get());
+								return;
+							}
+						}
+					}
+				} catch (Exception e) {
+					log.error(String.valueOf(e));
+				}
+			} catch (SQLException e) {
+				e.printStackTrace();
+			} finally {
+				influxDBClient.close();
+			}
+		});
+		serverDeployService.updateBatchById(serverDeployList);
+		// 鐩存帴鍦ㄨ緭鍑鸿鍙ヤ腑鏍煎紡鍖�
+		log.info("瀹炴椂鏁版嵁浠诲姟鎵ц锛屽綋鍓嶆椂闂达細" +
+				LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault())
+						.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
+	}
+
+	public static boolean allEmptyStrings(Object[] values) {
+		if (values == null) {
+			return false; // 濡傛灉鏁扮粍鏈韩涓簄ull锛岃繑鍥瀎alse
+		}
+		for (Object str : values) {
+			if (str != null && !str.toString().equals("")) {
+				return false; // 濡傛灉鍏冪礌涓嶆槸String绫诲瀷锛岃繑鍥瀎alse
+			}
+		}
+
+		return true; // 鎵�鏈夊厓绱犻兘鏄┖瀛楃涓�
+	}
+
+	/**
+	 * 鎶ヨ淇℃伅澶勭悊
+	 *
+	 * @param alarm        鎶ヨ鍙�
+	 * @param equipment    璁惧淇℃伅
+	 * @param dateTime     鎶ヨ鏃堕棿
+	 * @param alarmContent 鎶ヨ鍐呭
+	 */
+	public void addEquipmentAlarm(String alarm, Equipment equipment, LocalDateTime dateTime, String alarmContent) {
+		// 楠岃瘉鏄惁鎶ヨ
+		EquipmentAlarm equipmentAlarm = new EquipmentAlarm();
+		equipmentAlarm.setEquipmentid(equipment.getEqptCode());
+		equipmentAlarm.setCollecttime(Date.from(dateTime.atZone(java.time.ZoneId.systemDefault()).toInstant()));
+		equipmentAlarm.setAlarmNo(alarm);
+		equipmentAlarm.setAlarmContent(alarmContent);
+		equipmentAlarmService.save(equipmentAlarm);
+
+	}
+
+	void equipmentStateModification(String eqptCode, Integer state) {
+		org.jeecg.modules.iot.mdc.entity.Equipment mdcEquipment = mdcEquipmentService.findEquipmentByEquipmentId(eqptCode);
+		if (mdcEquipment != null) {
+			mdcEquipment.setOporation(state);
+			mdcEquipmentService.updateById(mdcEquipment);
+		}
 	}
 }

--
Gitblit v1.9.3