cuikaidong
5 天以前 b36d4c15a5a7e5ed983217cc98623fb3de3bda16
lxzn-module-system/lxzn-system-biz/src/main/java/org/jeecg/modules/quartz/job/SampleParamJob.java
@@ -1,32 +1,348 @@
package org.jeecg.modules.quartz.job;
import org.jeecg.common.util.DateUtils;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.query.FluxTable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.jeecg.modules.iot.depository.Influxdb;
import org.jeecg.modules.iot.depository.InfluxdbTest;
import org.jeecg.modules.iot.entity.Equipment;
import org.jeecg.modules.iot.entity.InfluxdbDeploy;
import org.jeecg.modules.iot.entity.ServerDeploy;
import org.jeecg.modules.iot.mdc.entity.EquipmentAlarm;
import org.jeecg.modules.iot.mdc.entity.EquipmentLog;
import org.jeecg.modules.iot.mdc.service.IEquipmentAlarmService;
import org.jeecg.modules.iot.mdc.service.IEquipmentLogService;
import org.jeecg.modules.iot.service.IEquipmentService;
import org.jeecg.modules.iot.service.IInfluxdbDeployService;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
 * 示例带参定时任务
 *
 * influxdb定时任务
 *
 * @Author Scott
 */
@Slf4j
public class SampleParamJob implements Job {
   /**
    * 若参数变量名修改 QuartzJobController中也需对应修改
    */
   private String parameter;
   public void setParameter(String parameter) {
      this.parameter = parameter;
   }
   @Value("${databaseType}")
   private String databaseType;
   @Autowired
   private IInfluxdbDeployService influxdbDeployService;
   @Autowired
   private DataSource dataSource;
   @Autowired
   private IEquipmentService equipmentService;
   @Autowired
   private IEquipmentLogService equipmentLogService;
   @Autowired
   private org.jeecg.modules.iot.mdc.service.IEquipmentService mdcEquipmentService;
   @Autowired
   private IEquipmentAlarmService equipmentAlarmService;
   @Autowired
   private org.jeecg.modules.iot.service.IServerDeployService serverDeployService;
   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
      log.info(" Job Execution key:"+jobExecutionContext.getJobDetail().getKey());
      log.info( String.format("welcome %s! Jeecg-Boot 带参数定时任务 SampleParamJob !   时间:" + DateUtils.now(), this.parameter));
      List<InfluxdbDeploy> influxdbDeployList = influxdbDeployService.list();
      List<ServerDeploy> serverDeployList = serverDeployService.list();
      influxdbDeployList.forEach(in -> {
         // InfluxDB 2.0 连接信息
         String influxUrl = "http://" + in.getAddress() + ":" + in.getPort();
         String influxToken = in.getAuthorizeCode();
         String influxOrg = in.getOrganization();
         String influxBucket = in.getBucket();
         // 连接 InfluxDB 2.0
         InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxUrl, influxToken.toCharArray());
         QueryApi queryApi = influxDBClient.getQueryApi();
         // 连接 SqlServer
         try (Connection mysqlConnection = dataSource.getConnection()) {
            // 处理时间
            DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            AtomicReference<String> start = new AtomicReference<>("");
            AtomicReference<String> end = new AtomicReference<>("");
            AtomicReference<Date> endDate = new AtomicReference<>(null); // 采集时间
            serverDeployList.forEach(s -> {
               if (s.getId().equals(in.getServerDeployId()) && s.getCollectTime() != null) {
                  start.set(sdf.format(s.getCollectTime()));
                  // 获取当前时间
                  LocalDateTime now = LocalDateTime.now();
                  // 减去一分钟
//                        LocalDateTime oneMinuteAgo = now.minusMinutes(1);
                  LocalDateTime oneMinuteAgo = now.minusSeconds(2);
                  // 格式化为字符串(可选)
                  DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                  String formattedTime = oneMinuteAgo.format(formatter);
                  // 转换为 Date 类型(使用系统默认时区)
                  Date date = Date.from(oneMinuteAgo.atZone(ZoneId.systemDefault()).toInstant());
                  endDate.set(date);
                  end.set(formattedTime);
               }
            });
            if (start.get().equals("")){
               return;
            }
            LocalDateTime startTime = LocalDateTime.parse(start.get(), inputFormatter);
            LocalDateTime endTime = LocalDateTime.parse(end.get(), inputFormatter);
            // 定义日期时间格式器(ISO_LOCAL_DATE_TIME 对应 "yyyy-MM-ddTHH:mm:ss")
            DateTimeFormatter formatter1 = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
            // 解析字符串为 LocalDateTime 对象
            LocalDateTime startDateTime = LocalDateTime.parse(startTime.toString(), formatter1);// 解析字符串为 LocalDateTime 对象
            LocalDateTime endDateTime = LocalDateTime.parse(endTime.toString(), formatter1);
            // 加上 8 小时
            LocalDateTime startNewDateTime = startDateTime.minusHours(8);
            LocalDateTime endNewDateTime = endDateTime.minusHours(8);
//                String query = "from(bucket: \"IOT\") \n" +
//                        "|> range(start: "+startNewDateTime+"Z, stop: "+endNewDateTime+"Z) \n" +
//                        "|> fill(usePrevious: true) \n" +
//                        "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")";
            String query = "from(bucket: \"" + influxBucket + "\") " +
                  "|> range(start: -5s) " +
                  "|> filter(fn: (r) => r[\"_field\"] == \"TimeStamp\" or r[\"_field\"] == \"Value\")";
            // 执行查询
            try {
               List<FluxTable> tables = queryApi.query(query, influxOrg);
               // 处理查询结果并插入到 MySQL
               List<Influxdb> influxdbs = InfluxdbTest.processAndInsert(tables, mysqlConnection);
               // 根据类型拆分
               Map<String, List<Influxdb>> fieldistMap = influxdbs.stream()
                     .collect(Collectors.groupingBy(Influxdb::getField));
               List<Influxdb> valueList = new ArrayList<>();
               List<Influxdb> timeStampList = new ArrayList<>();
               fieldistMap.forEach((field, timeList) -> {
                  if (field.equals("Value")) {
                     valueList.addAll(timeList);
                  } else {
                     timeStampList.addAll(timeList);
                  }
               });
               for (int i = 0; i < valueList.size(); i++) {
                  valueList.get(i).setAcquisitionTime(timeStampList.get(i).getValue());
               }
               // 根据设备进行分组
               Map<String, List<Influxdb>> measurementListMap = valueList.stream()
                     .collect(Collectors.groupingBy(Influxdb::getMeasurement));
               measurementListMap.forEach((table, timeList) -> {
                  DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                  Map<String, List<Influxdb>> timeListMap = timeList.stream()
                        .collect(Collectors.groupingBy(
                              Influxdb::getAcquisitionTime,
                              () -> new TreeMap<>(Comparator.comparing(
                                    timeStr -> LocalDateTime.parse(timeStr, formatter)
                              )),
                              Collectors.toList()
                        ));
                  // 处理设备状态
                  int lastIndex = table.lastIndexOf('.');
                  String code = table.substring(lastIndex + 1);
                  EquipmentLog equipmentLog = equipmentLogService.selectEquipmentOporation(code);
                  // 设备状态
                  Integer equipmentState = null;
                  for (Map.Entry<String, List<Influxdb>> entry : timeListMap.entrySet()) {
                     String measurement = entry.getKey();
                     List<Influxdb> influxdbList = entry.getValue();
                     Equipment equipment = equipmentService.findEquipmentByCode(code);
                     String tableName = equipment.getControlSystem() + '_' + equipment.getEqptCode();
                     // 获取表名
                     String[] columns = new String[influxdbList.size() + 2];
                     Object[] values = new Object[influxdbList.size() + 2];
                     String state = ""; // 设备状态
                     String isAlarm = ""; // 报警状态
                     String alarm = ""; // 报警号
                     String alarmContent = ""; // 报警内容
                     // 遍历当前设备下的所有参数
                     for (int i = 0; i < influxdbList.size(); i++) {
                        String parameter = influxdbList.get(i).getParameter();
                        columns[i] = parameter.substring(parameter.lastIndexOf('-') + 1);
                        values[i] = influxdbList.get(i).getValue();
                        if (values[i] == null || values[i] == "") {
                           values[i] = "";
                        } else {
                           if (columns[i].equals("Oporation")) {
                              state = values[i].toString();
                           } else if (columns[i].equals("AlarmNo")) {
                              alarm = values[i].toString();
                           } else if (columns[i].equals("AlarmContent")) {
                              alarmContent = values[i].toString();
                           } else if (columns[i].equals("IsAlarm")) {
                              isAlarm = values[i].toString();
                           }
                        }
                     }
                     // 验证数据是否都为空
                     if (!allEmptyStrings(values)) {
                        columns[influxdbList.size() + 1] = "CollectTime";
                        columns[influxdbList.size()] = "EquipmentID";
                        // 转换为 LocalDateTime
                        LocalDateTime dateTime = LocalDateTime.parse(influxdbList.get(0).getAcquisitionTime(), formatter);
                        values[influxdbList.size() + 1] = dateTime;
                        values[influxdbList.size()] = equipment.getEqptCode();
                        // 插入数据
                        if (databaseType.equals("SqlServer")) {
                           equipmentService.insertSqlServerData(tableName, columns, values);
                        } else if (databaseType.equals("MySql")) {
                           equipmentService.insertMySqlData(tableName, columns, values);
                        }
                        if (!state.equals("")) {
                           EquipmentLog log = new EquipmentLog();
                           log.setOporation(Integer.parseInt(state));
                           log.setEquipmentId(equipment.getEqptCode());
                           log.setEquipmentName(equipment.getEqptName());
                           log.setCollectTime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()));
                           // True 报警开始,False报警结束
                           if (isAlarm.equals("True")) {
                              // 报警开始
                              if (equipmentLog == null && equipmentState == null) {
                                 // 验证是否报警
                                 log.setOporation(22);
                                 log.setAlarm(alarm);
                                 equipmentLogService.save(log);
                                 equipmentState = 22;
                                 equipmentStateModification(equipment.getEqptCode(), 22);
                              } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != 22) {
                                 log.setOporation(22);
                                 log.setAlarm(alarm);
                                 equipmentState = 22;
                                 equipmentLogService.save(log);
                                 equipmentStateModification(equipment.getEqptCode(), 22);
                              } else if (equipmentLog != null && equipmentState != null && equipmentState != 22) {
                                 log.setOporation(22);
                                 log.setAlarm(alarm);
                                 equipmentState = 22;
                                 equipmentLogService.save(log);
                                 equipmentStateModification(equipment.getEqptCode(), 22);
                              }
                           } else if (isAlarm.equals("False")) {
                              // 报警结束
                              if (equipmentLog == null && equipmentState == null) {
                                 // 验证是否报警
                                 equipmentLogService.save(log);
                                 equipmentState = Integer.parseInt(state);
                                 equipmentStateModification(equipment.getEqptCode(), equipmentState);
                              } else if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() == 22) {
                                 log.setOporation(23);
                                 equipmentState = 23;
                                 equipmentLogService.save(log);
                                 equipmentStateModification(equipment.getEqptCode(), equipmentState);
                              } else if (equipmentLog != null && equipmentState != null && equipmentState == 22) {
                                 log.setOporation(23);
                                 equipmentState = 23;
                                 equipmentLogService.save(log);
                                 equipmentStateModification(equipment.getEqptCode(), 23);
                              } else {
                                 if (equipmentLog != null && equipmentState == null && equipmentLog.getOporation() != Integer.parseInt(state)) {
                                    equipmentState = Integer.parseInt(state);
                                    equipmentLogService.save(log);
                                    equipmentStateModification(equipment.getEqptCode(), equipmentState);
                                    //验证是否是报警,或者报警结束
                                 } else if (equipmentLog != null && equipmentState != null && equipmentState != Integer.parseInt(state)) {
                                    equipmentState = Integer.parseInt(state);
                                    equipmentLogService.save(log);
                                    equipmentStateModification(equipment.getEqptCode(), equipmentState);
                                 }
                              }
                           }
                           // 处理报警信息
                           if (isAlarm.equals("True")) {
                              EquipmentAlarm equipmentAlarm = equipmentAlarmService.selectEquipmentAlarmByEquipmentId(equipment.getEqptCode());
                              if (equipmentAlarm != null && !equipmentAlarm.getAlarmNo().equals(alarm)) {
                                 addEquipmentAlarm(alarm, equipment, dateTime, alarmContent);
                              } else if (equipmentAlarm == null) {
                                 addEquipmentAlarm(alarm, equipment, dateTime, alarmContent);
                              }
                           }
                        }
                     }
                  }
               });
               // 如果有查询数据,时间不为空
               if (endDate.get() != null && CollectionUtils.isNotEmpty(tables)) {
                  for (ServerDeploy s : serverDeployList) {
                     if (s.getId().equals(in.getServerDeployId())) {
                        s.setCollectTime(endDate.get());
                        return;
                     }
                  }
               }
            } catch (Exception e) {
               log.error(String.valueOf(e));
            }
         } catch (SQLException e) {
            e.printStackTrace();
         } finally {
            influxDBClient.close();
         }
      });
      serverDeployService.updateBatchById(serverDeployList);
      // 直接在输出语句中格式化
      log.info("实时数据任务执行,当前时间:" +
            LocalDateTime.ofInstant(new Date().toInstant(), ZoneId.systemDefault())
                  .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
   }
   public static boolean allEmptyStrings(Object[] values) {
      if (values == null) {
         return false; // 如果数组本身为null,返回false
      }
      for (Object str : values) {
         if (str != null && !str.toString().equals("")) {
            return false; // 如果元素不是String类型,返回false
         }
      }
      return true; // 所有元素都是空字符串
   }
   /**
    * 报警信息处理
    *
    * @param alarm        报警号
    * @param equipment    设备信息
    * @param dateTime     报警时间
    * @param alarmContent 报警内容
    */
   public void addEquipmentAlarm(String alarm, Equipment equipment, LocalDateTime dateTime, String alarmContent) {
      // 验证是否报警
      EquipmentAlarm equipmentAlarm = new EquipmentAlarm();
      equipmentAlarm.setEquipmentid(equipment.getEqptCode());
      equipmentAlarm.setCollecttime(Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()));
      equipmentAlarm.setAlarmNo(alarm);
      equipmentAlarm.setAlarmContent(alarmContent);
      equipmentAlarmService.save(equipmentAlarm);
   }
   void equipmentStateModification(String eqptCode, Integer state) {
      org.jeecg.modules.iot.mdc.entity.Equipment mdcEquipment = mdcEquipmentService.findEquipmentByEquipmentId(eqptCode);
      if (mdcEquipment != null) {
         mdcEquipment.setOporation(state);
         mdcEquipmentService.updateById(mdcEquipment);
      }
   }
}