cuikaidong
2025-06-12 066063ed92fdd40da4dfe21770557f3adba3e1af
jeecg-boot-module-system/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java
@@ -7,9 +7,10 @@
import lombok.extern.slf4j.Slf4j;
import org.jeecg.modules.iot.depository.Influxdb;
import org.jeecg.modules.iot.depository.InfluxdbTest;
import org.jeecg.modules.iot.depository.MysqlDataWriter;
import org.jeecg.modules.iot.entity.Equipment;
import org.jeecg.modules.iot.entity.InfluxdbDeploy;
import org.jeecg.modules.iot.mdc.entity.EquipmentLog;
import org.jeecg.modules.iot.mdc.service.IEquipmentLogService;
import org.jeecg.modules.iot.service.IEquipmentService;
import org.jeecg.modules.iot.service.IInfluxdbDeployService;
import org.quartz.Job;
@@ -24,6 +25,7 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@@ -39,6 +41,8 @@
    private DataSource dataSource;
    @Autowired
    private IEquipmentService equipmentService;
    @Autowired
    private IEquipmentLogService equipmentLogService;
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
@@ -89,14 +93,21 @@
                        // 根据时间进行分组
                        Map<String, List<Influxdb>> timeListMap = timeList.stream()
                                .collect(Collectors.groupingBy(Influxdb::getAcquisitionTime));
                        timeListMap.forEach((measurement, influxdbList) -> {
                            int lastIndex = table.lastIndexOf('.');
                            String code = table.substring(lastIndex + 1);
                        // 处理设备状态
                        int lastIndex = table.lastIndexOf('.');
                        String code = table.substring(lastIndex + 1);
                        EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code);
                        // 设备状态
                        Integer equipmentState = null;
                        for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) {
                            String measurement = entry.getKey();
                            List<Influxdb> influxdbList = entry.getValue();
                            Equipment equipment = equipmentService.findEquipmentByCode(code);
                            String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode();
                            // 获取表名
                            String[] columns = new String[influxdbList.size() + 2];
                            Object[] values = new Object[influxdbList.size() + 2];
                            String state = "";
                            // 遍历当前设备下的所有参数
                            for (int i = 0; i < influxdbList.size(); i++) {
                                String parameter = influxdbList.get(i).getParameter();
@@ -104,7 +115,12 @@
                                values[i] = influxdbList.get(i).getValue();
                                if (values[i] == null || values[i] == "") {
                                    values[i] = "";
                                } else {
                                    if (columns[i].equals("Oporation")) {
                                        state = values[i].toString();
                                    }
                                }
                            }
                            // 验证数据是否都为空
                            if (!allEmptyStrings(values)) {
@@ -119,8 +135,23 @@
                                values[influxdbList.size()] = tableName;
                                // 插入数据
                                equipmentService.insertMysqlData(tableName, columns, values);
                                if (!state.equals("")) {
                                    EquipmentLog log = new EquipmentLog();
                                    log.setOporation(Integer.parseInt(state));
                                    log.setEquipmentId(equipment.getEqptCode());
                                    log.setEquipmentName(equipment.getEqptName());
                                    log.setCollectTime(Date.from(dateTime.atZone(java.time.ZoneId.systemDefault()).toInstant()));
                                    // 验证设备状态是否需要存储
                                    if (equipmentLog == null) {
                                        equipmentLogService.save(log);
                                        equipmentState = Integer.parseInt(state);
                                    } else if (equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) {
                                        equipmentState = Integer.parseInt(state);
                                        equipmentLogService.save(log);
                                    }
                                }
                            }
                        });
                        };
                    });
                } catch (Exception e) {
                    log.error(String.valueOf(e));