| | |
| | | AtomicReference<String> end = new AtomicReference<>(""); |
| | | AtomicReference<Date> endDate = new AtomicReference<>(null); // 采集时间 |
| | | serverDeployList.forEach(s -> { |
| | | if (s.getId().equals(in.getServerDeployId()) && s.getCollectTime() != null) { |
| | | start.set(sdf.format(s.getCollectTime())); |
| | | if (s.getId().equals(in.getServerDeployId())) { |
| | | if (s.getCollectTime() != null) { |
| | | start.set(sdf.format(s.getCollectTime())); |
| | | } else { |
| | | start.set(sdf.format(new Date())); |
| | | } |
| | | // 获取当前时间 |
| | | LocalDateTime now = LocalDateTime.now(); |
| | | // 减去一分钟 |
| | | // LocalDateTime oneMinuteAgo = now.minusMinutes(1); |
| | | LocalDateTime oneMinuteAgo = now.minusSeconds(2); |
| | | // 格式化为字符串(可选) |
| | | DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); |
| | |
| | | end.set(formattedTime); |
| | | } |
| | | }); |
| | | if (start.get().equals("")){ |
| | | if (start.get().equals("")) { |
| | | return; |
| | | } |
| | | LocalDateTime startTime = LocalDateTime.parse(start.get(), inputFormatter); |
| | | LocalDateTime endTime = LocalDateTime.parse(end.get(), inputFormatter); |
| | | // 定义日期时间格式器(ISO_LOCAL_DATE_TIME 对应 "yyyy-MM-ddTHH:mm:ss") |
| | | DateTimeFormatter formatter1 = DateTimeFormatter.ISO_LOCAL_DATE_TIME; |
| | | |
| | | // 解析字符串为 LocalDateTime 对象 |
| | | LocalDateTime startDateTime = LocalDateTime.parse(startTime.toString(), formatter1);// 解析字符串为 LocalDateTime 对象 |
| | | LocalDateTime endDateTime = LocalDateTime.parse(endTime.toString(), formatter1); |
| | | |
| | | // 加上 8 小时 |
| | | LocalDateTime startNewDateTime = startDateTime.minusHours(8); |
| | | LocalDateTime endNewDateTime = endDateTime.minusHours(8); |
| | | // String query = "from(bucket: \"IOT\") \n" + |
| | | // "|> range(start: "+startNewDateTime+"Z, stop: "+endNewDateTime+"Z) \n" + |
| | | // "|> fill(usePrevious: true) \n" + |
| | | // "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; |
| | | String query = "from(bucket: \"" + influxBucket + "\") " + |
| | | "|> range(start: -5s) " + |
| | | "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")"; |
| | | |
| | | // 执行查询 |
| | | try { |
| | | List<FluxTable> tables = queryApi.query(query, influxOrg); |
| | |
| | | )), |
| | | Collectors.toList() |
| | | )); |
| | | |
| | | // 处理设备状态 |
| | | int lastIndex = table.lastIndexOf('.'); |
| | | String code = table.substring(lastIndex + 1); |
| | | EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code); |
| | | EquipmentLog equipmentLog = null; |
| | | if (databaseType.equals("SqlServer")) { |
| | | equipmentLog = equipmentLogService.selectEquipmentOporationSqlServer(code); |
| | | } else if (databaseType.equals("MySql")) { |
| | | equipmentLog = equipmentLogService.selectEquipmentOporationMySql(code); |
| | | |
| | | } |
| | | // 设备状态 |
| | | Integer equipmentState = null; |
| | | for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) { |
| | |
| | | // 转换为 LocalDateTime |
| | | LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter); |
| | | values[influxdbList.size() + 1] = dateTime; |
| | | values[influxdbList.size()] = tableName.replace("-", "_"); |
| | | // 插入数据 |
| | | int index = tableName.indexOf("_"); |
| | | if (index != -1) { |
| | | values[influxdbList.size()] = tableName.substring(index + 1); |
| | | } |
| | | if (databaseType.equals("SqlServer")) { |
| | | equipmentService.insertSqlServerData(tableName, columns, values); |
| | | } else if (databaseType.equals("MySql")) { |