| | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.jeecg.modules.iot.depository.Influxdb; |
| | | import org.jeecg.modules.iot.depository.InfluxdbTest; |
| | | import org.jeecg.modules.iot.depository.MysqlDataWriter; |
| | | import org.jeecg.modules.iot.entity.Equipment; |
| | | import org.jeecg.modules.iot.entity.InfluxdbDeploy; |
| | | import org.jeecg.modules.iot.mdc.entity.EquipmentLog; |
| | | import org.jeecg.modules.iot.mdc.service.IEquipmentLogService; |
| | | import org.jeecg.modules.iot.service.IEquipmentService; |
| | | import org.jeecg.modules.iot.service.IInfluxdbDeployService; |
| | | import org.quartz.Job; |
| | |
| | | import java.time.ZoneId; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.*; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | |
| | | private DataSource dataSource; |
| | | @Autowired |
| | | private IEquipmentService equipmentService; |
| | | @Autowired |
| | | private IEquipmentLogService equipmentLogService; |
| | | |
| | | @Override |
| | | public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { |
| | |
| | | // 根据时间进行分组 |
| | | Map<String, List<Influxdb>> timeListMap = timeList.stream() |
| | | .collect(Collectors.groupingBy(Influxdb::getAcquisitionTime)); |
| | | timeListMap.forEach((measurement, influxdbList) -> { |
| | | int lastIndex = table.lastIndexOf('.'); |
| | | String code = table.substring(lastIndex + 1); |
| | | // 处理设备状态 |
| | | int lastIndex = table.lastIndexOf('.'); |
| | | String code = table.substring(lastIndex + 1); |
| | | EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code); |
| | | // 设备状态 |
| | | Integer equipmentState = null; |
| | | for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) { |
| | | String measurement = entry.getKey(); |
| | | List<Influxdb> influxdbList = entry.getValue(); |
| | | Equipment equipment = equipmentService.findEquipmentByCode(code); |
| | | String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode(); |
| | | // 获取表名 |
| | | String[] columns = new String[influxdbList.size() + 2]; |
| | | Object[] values = new Object[influxdbList.size() + 2]; |
| | | String state = ""; |
| | | // 遍历当前设备下的所有参数 |
| | | for (int i = 0; i < influxdbList.size(); i++) { |
| | | String parameter = influxdbList.get(i).getParameter(); |
| | |
| | | values[i] = influxdbList.get(i).getValue(); |
| | | if (values[i] == null || values[i] == "") { |
| | | values[i] = ""; |
| | | } else { |
| | | if (columns[i].equals("Oporation")) { |
| | | state = values[i].toString(); |
| | | } |
| | | } |
| | | |
| | | } |
| | | // 验证数据是否都为空 |
| | | if (!allEmptyStrings(values)) { |
| | |
| | | values[influxdbList.size()] = tableName; |
| | | // 插入数据 |
| | | equipmentService.insertMysqlData(tableName, columns, values); |
| | | if (!state.equals("")) { |
| | | EquipmentLog log = new EquipmentLog(); |
| | | log.setOporation(Integer.parseInt(state)); |
| | | log.setEquipmentId(equipment.getEqptCode()); |
| | | log.setEquipmentName(equipment.getEqptName()); |
| | | log.setCollectTime(Date.from(dateTime.atZone(java.time.ZoneId.systemDefault()).toInstant())); |
| | | // 验证设备状态是否需要存储 |
| | | if (equipmentLog == null) { |
| | | equipmentLogService.save(log); |
| | | equipmentState = Integer.parseInt(state); |
| | | } else if (equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) { |
| | | equipmentState = Integer.parseInt(state); |
| | | equipmentLogService.save(log); |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | }; |
| | | }); |
| | | } catch (Exception e) { |
| | | log.error(String.valueOf(e)); |