package org.jeecg.modules.quartz.job; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; import org.jeecg.modules.iot.entity.Equipment; import org.jeecg.modules.iot.entity.EquipmentCsvData; import org.jeecg.modules.iot.service.IEquipmentService; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceUtils; import javax.sql.DataSource; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.lang.reflect.Method; import java.sql.*; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; import java.util.*; /** * 定时生成csv文件 * * @Author Scott */ @Slf4j public class ParameterCsvJob implements Job { @Autowired private IEquipmentService equipmentService; @Autowired private DataSource dataSource; // CSV文件保存路径 private static final String CSV_FILE_PATH = "D:/iot/"; private static final Map FIELD_MAPPING = new HashMap<>(); static { // 初始化字段映射:key是数据库字段名,value是CSV/实体类字段名 FIELD_MAPPING.put("CollectTime", "k_ts"); FIELD_MAPPING.put("EquipmentID", "k_device"); FIELD_MAPPING.put("ProgramNo", "ProgramNumber"); // 可以添加其他需要映射的字段 } private static final List INPUT_FORMATTERS = Arrays.asList( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"), // 标准格式(目标格式) DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"), // 斜杠分隔 DateTimeFormatter.ofPattern("yyyy-M-d H:m:s"), // 不补零格式 DateTimeFormatter.ofPattern("yyyy/MM/d HH:mm:ss"), // 混合格式 DateTimeFormatter.ISO_LOCAL_DATE_TIME // 带T的格式(如2025-09-15T10:21:11) ); // 输出格式:严格指定为yyyy-MM-dd HH:mm:ss(补零+连字符) private static final DateTimeFormatter OUTPUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { // 查询所有虚设备 List equipmentByTypeList = equipmentService.findEquipmentByTypeList(); if (CollectionUtils.isNotEmpty(equipmentByTypeList)) { // 遍历虚设备 equipmentByTypeList.forEach(eq -> { String tableName = eq.getControlSystem() + "_" + eq.getEqptCode().replace('-', '_'); if (doesTheTableExist(tableName)) { queryLastHourDataAndExportToCsv(tableName); } }); } } // 验证是否存在 Boolean doesTheTableExist(String tableName) { boolean tableExists = false; // 获取数据库连接 try (Connection connection = dataSource.getConnection()) { // 查询虚设备是否有单表 // 获取数据库产品名称以区分数据库类型 String databaseProductName = connection.getMetaData().getDatabaseProductName(); // 表名转换 if (databaseProductName.contains("MySQL")) { // MySQL: 查询information_schema库 String sql = "SELECT COUNT(*) FROM information_schema.tables " + "WHERE table_schema = DATABASE() AND table_name = ?"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { stmt.setString(1, tableName); try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { tableExists = rs.getInt(1) > 0; } } } } else if (databaseProductName.contains("SQL Server")) { // SQL Server: 查询sys.tables String sql = "SELECT COUNT(*) FROM sys.tables WHERE name = ?"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { stmt.setString(1, tableName); try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { tableExists = rs.getInt(1) > 0; } } } } else { log.info("不支持的数据库类型: " + databaseProductName); } } catch (SQLException e) { e.printStackTrace(); } return tableExists; } public void queryLastHourDataAndExportToCsv(String tableName) { LocalDateTime now = LocalDateTime.now(); LocalDateTime lastHourStart = now.truncatedTo(ChronoUnit.HOURS).minusHours(1); LocalDateTime lastHourEnd = lastHourStart.plusHours(1).minusSeconds(1); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); String startTimeStr = lastHourStart.format(formatter); String endTimeStr = lastHourEnd.format(formatter); log.info("查询时间范围:{} ~ {}", startTimeStr, endTimeStr); String sql = String.format("SELECT * FROM %s WHERE CollectTime BETWEEN ? AND ?", tableName); List dataList = new ArrayList<>(); try (Connection conn = DataSourceUtils.getConnection(dataSource); PreparedStatement pstmt = conn.prepareStatement(sql)) { pstmt.setString(1, startTimeStr); pstmt.setString(2, endTimeStr); try (ResultSet rs = pstmt.executeQuery()) { ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); log.debug("数据库返回字段总数:{},字段列表:{}", columnCount, getColumnNames(metaData)); // 验证关键字段是否存在 boolean hasCollectTime = false; boolean hasEquipmentID = false; for (int i = 1; i <= columnCount; i++) { String colName = metaData.getColumnName(i); if ("CollectTime".equals(colName)) hasCollectTime = true; if ("EquipmentID".equals(colName)) hasEquipmentID = true; } log.info("数据库是否包含CollectTime:{},是否包含EquipmentID:{}", hasCollectTime, hasEquipmentID); int totalCount = 0; while (rs.next()) { totalCount++; EquipmentCsvData csvData = new EquipmentCsvData(); // 手动处理关键字段(确保映射生效) handleCriticalFields(csvData, rs); // 处理其他字段 for (int i = 1; i <= columnCount; i++) { String dbColName = metaData.getColumnName(i); // 跳过已手动处理的字段 if ("CollectTime".equals(dbColName) || "EquipmentID".equals(dbColName)) { continue; } Object value = rs.getObject(i); setFieldValue(csvData, dbColName, value); } dataList.add(csvData); log.debug("第{}条数据:k_device={}, k_ts={}", totalCount, csvData.getK_device(), csvData.getK_ts()); } log.info("查询完成,共{}条数据", totalCount); } } catch (SQLException e) { log.error("查询失败", e); return; } if (!dataList.isEmpty()) { String fileName = CSV_FILE_PATH + tableName + "_" + lastHourStart.format(DateTimeFormatter.ofPattern("yyyyMMddHH")) + ".csv"; exportToCsv(dataList, fileName); } else { log.info("无数据导出"); } } // 手动处理关键字段(避免反射失败) private void handleCriticalFields(EquipmentCsvData csvData, ResultSet rs) throws SQLException { // 机组状态 Object status = rs.getObject("Oporation"); if (status != null) { String statusStr = status.toString().trim(); // 状态值转换映射 switch (statusStr) { case "0": csvData.setStatus("3"); break; case "2": csvData.setStatus("6"); break; case "3": csvData.setStatus("10"); break; } // 机组状态 Object isAlarm = rs.getObject("lsAlarm"); if (isAlarm != null && status.toString().trim().equals("True")) { csvData.setStatus("16"); } } // 设备状态 Object oporation = rs.getObject("Oporation"); if (oporation != null) { csvData.setThreeColorIndicatorState(oporation.toString().trim()); } // 处理EquipmentID→k_device(保持不变) Object equipmentId = rs.getObject("EquipmentID"); if (equipmentId != null) { csvData.setK_device(equipmentId.toString().trim()); } // 程序编号 Object programNumber = rs.getObject("ProgramNumber"); if (programNumber != null) { csvData.setProgramNo(programNumber.toString().trim()); } // 进给倍率 Object feedOverride = rs.getObject("FeedOverride"); if (feedOverride != null) { csvData.setFeedrateOverride(feedOverride.toString().trim()); } // 报警代码 Object alarmNo = rs.getObject("AlarmNo"); if (alarmNo != null) { csvData.setAlarmCode(alarmNo.toString().trim()); } // 报警内容 Object alarmInfo = rs.getObject("AlarmInfo"); if (alarmInfo != null) { csvData.setAlarmMessage(alarmInfo.toString().trim()); } // 主轴转速 Object actualSpindleSpeed = rs.getObject("ActualSpindleSpeed"); if (actualSpindleSpeed != null) { csvData.setSpindleSpeed(actualSpindleSpeed.toString().trim()); } // 进给速度 Object actualFeedRate = rs.getObject("ActualFeedRate"); if (actualSpindleSpeed != null) { csvData.setFeedRate(actualFeedRate.toString().trim()); } // 当前执行行号 Object executingCode = rs.getObject("ExecutingCode"); if (executingCode != null) { csvData.setCurrentExecutionLine(executingCode.toString().trim()); } // 机械坐标 Object xmachine = rs.getObject("Xmachine"); if (xmachine != null) { csvData.setXmechanicalCoordinate(xmachine.toString().trim()); } // 机械坐标 Object ymachine = rs.getObject("Ymachine"); if (ymachine != null) { csvData.setYmechanicalCoordinate(ymachine.toString().trim()); } // 机械坐标 Object zmachine = rs.getObject("Zmachine"); if (zmachine != null) { csvData.setZmechanicalCoordinate(zmachine.toString().trim()); } // 机械坐标 Object amachine = rs.getObject("Amachine"); if (amachine != null) { csvData.setAmechanicalCoordinateA(amachine.toString().trim()); } // 机械坐标 Object bmachine = rs.getObject("Bmachine"); if (bmachine != null) { csvData.setBmechanicalCoordinateB(bmachine.toString().trim()); } // 绝对坐标 Object aabsolute = rs.getObject("Aabsolute"); if (aabsolute != null) { csvData.setAabsoluteCoordinate(aabsolute.toString().trim()); } // 绝对坐标 Object babsolute = rs.getObject("Babsolute"); if (babsolute != null) { csvData.setBabsoluteCoordinate(babsolute.toString().trim()); } // 绝对坐标 Object xabsolute = rs.getObject("Xabsolute"); if (xabsolute != null) { csvData.setXabsoluteCoordinate(xabsolute.toString().trim()); } // 绝对坐标 Object yabsolute = rs.getObject("Yabsolute"); if (babsolute != null) { csvData.setYabsoluteCoordinate(yabsolute.toString().trim()); } // 绝对坐标 Object zabsolute = rs.getObject("Zabsolute"); if (zabsolute != null) { csvData.setZabsoluteCoordinate(zabsolute.toString().trim()); } // 相对坐标 Object arelative = rs.getObject("Arelative"); if (zabsolute != null) { csvData.setZabsoluteCoordinate(arelative.toString().trim()); } // 处理CollectTime→k_ts(使用改进的转换方法) Object collectTimeObj = rs.getObject("CollectTime"); if (collectTimeObj != null) { String formattedTime = convertTimeFormat(collectTimeObj.toString()); csvData.setK_ts(formattedTime); } } // 处理其他字段的通用方法 private void setFieldValue(EquipmentCsvData csvData, String dbColName, Object value) { if (value == null) return; String fieldName = FIELD_MAPPING.getOrDefault(dbColName, dbColName); fieldName = fieldName.replaceAll("[^a-zA-Z0-9]", ""); try { String setterName = "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1); Method setter = EquipmentCsvData.class.getMethod(setterName, String.class); setter.invoke(csvData, value.toString()); } catch (Exception e) { log.debug("字段{}设置失败(可忽略)", fieldName); } } private void exportToCsv(List dataList, String filePath) { File file = new File(filePath); File parent = file.getParentFile(); if (parent != null && !parent.exists()) { parent.mkdirs(); } try (FileWriter writer = new FileWriter(file); CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT .withHeader("k_device", "k_ts", "IsConnected", "Status", "ThreeColorIndicatorState", "ProgramName", "ProgramNo", "SubProgramNo", "SpindleOverride", "FeedrateOverride", "AlarmCode", "AlarmMessage", "SpindleLoad", "SpindleSpeed", "FeedRate", "CurrentExecutionLine", "ExecutingCode", "ToolNo", "XmechanicalCoordinate", "YmechanicalCoordinate", "ZmechanicalCoordinate", "AmechanicalCoordinateA", "BmechanicalCoordinateB", "XabsoluteCoordinate", "YabsoluteCoordinate", "ZabsoluteCoordinate", "AabsoluteCoordinate", "BabsoluteCoordinate", "VTVersion", "SerialNumber", "Temperature", "XaccelerationRMS", "YaccelerationRMS", "ZaccelerationRMS", "XvelocityRMS", "YvelocityRMS", "ZvelocityRMS", "XdisplacementRMS", "YdisplacementRMS", "ZdisplacementRMS", "XvelocityMax", "YvelocityMax", "ZvelocityMax", "XaccelerationMax", "YaccelerationMax", "ZaccelerationMax", "XdisplacementMax", "YdisplacementMax", "ZdisplacementMax", "AccelerationHrate", "AccelerationLrate", "VelocityHrate", "VelocityLrate", "DisplacementHrate", "DisplacementLrate"))) { for (EquipmentCsvData data : dataList) { csvPrinter.printRecord( data.getK_device(), data.getK_ts(), data.getIsConnected(), data.getStatus(), data.getThreeColorIndicatorState(), data.getProgramName(), data.getProgramNo(), data.getSubProgramNo(), data.getSpindleOverride(), data.getFeedrateOverride(), data.getAlarmCode(), data.getAlarmMessage(), data.getSpindleLoad(), data.getSpindleSpeed(), data.getFeedRate(), data.getCurrentExecutionLine(), data.getExecutingCode(), data.getToolNo(), data.getXmechanicalCoordinate(), data.getYmechanicalCoordinate(), data.getZmechanicalCoordinate(), data.getAmechanicalCoordinateA(), data.getBmechanicalCoordinateB(), data.getXabsoluteCoordinate(), data.getYabsoluteCoordinate(), data.getZabsoluteCoordinate(), data.getAabsoluteCoordinate(), data.getBabsoluteCoordinate(), data.getVTVersion(), data.getSerialNumber(), data.getTemperature(), data.getXaccelerationRMS(), data.getYaccelerationRMS(), data.getZaccelerationRMS(), data.getXvelocityRMS(), data.getYvelocityRMS(), data.getZvelocityRMS(), data.getXdisplacementRMS(), data.getYdisplacementRMS(), data.getZdisplacementRMS(), data.getXvelocityMax(), data.getYvelocityMax(), data.getZvelocityMax(), data.getXaccelerationMax(), data.getYaccelerationMax(), data.getZaccelerationMax(), data.getXdisplacementMax(), data.getYdisplacementMax(), data.getZdisplacementMax(), data.getAccelerationHrate(), data.getAccelerationLrate(), data.getVelocityHrate(), data.getVelocityLrate(), data.getDisplacementHrate(), data.getDisplacementLrate() ); } log.info("CSV导出成功:{}", filePath); } catch (IOException e) { log.error("CSV导出失败", e); } } private String getColumnNames(ResultSetMetaData metaData) throws SQLException { StringBuilder sb = new StringBuilder(); for (int i = 1; i <= metaData.getColumnCount(); i++) { sb.append(metaData.getColumnName(i)).append(","); } return sb.toString(); } // 时间格式转换核心方法 private String convertTimeFormat(String originalTime) { if (originalTime == null || originalTime.trim().isEmpty()) { log.warn("原始时间为空"); return ""; } // 去除原始时间中的多余空格 String trimmedTime = originalTime.trim().replaceAll("\\s+", " "); // 尝试所有可能的输入格式进行解析 for (DateTimeFormatter formatter : INPUT_FORMATTERS) { try { LocalDateTime dateTime = LocalDateTime.parse(trimmedTime, formatter); // 强制转换为目标格式 String formattedTime = dateTime.format(OUTPUT_FORMATTER); log.debug("时间转换成功:{} → {}", trimmedTime, formattedTime); return formattedTime; } catch (DateTimeParseException e) { // 尝试下一种格式 continue; } } // 所有格式都解析失败时,记录警告并返回原始值(避免破坏数据) log.warn("无法转换时间格式为yyyy-MM-dd HH:mm:ss,原始值:{}", trimmedTime); return trimmedTime; } }