package org.jeecg.modules.iot.service.impl;
|
|
import cn.hutool.core.collection.CollectionUtil;
|
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import org.jeecg.common.api.vo.Result;
|
import org.jeecg.modules.iot.depository.DbConfig;
|
import org.jeecg.modules.iot.depository.InfluxdbTest;
|
import org.jeecg.modules.iot.depository.MysqlDataWriter;
|
import org.jeecg.modules.iot.entity.Equipment;
|
import org.jeecg.modules.iot.entity.ParameterGroup;
|
import org.jeecg.modules.iot.entity.ServerDeploy;
|
import org.jeecg.modules.iot.entity.xmlEntity.*;
|
import org.jeecg.modules.iot.entity.xmlEntity.script.FunctionEntity;
|
import org.jeecg.modules.iot.entity.xmlEntity.script.Functions;
|
import org.jeecg.modules.iot.mapper.EquipmentMapper;
|
import org.jeecg.modules.iot.service.IEquipmentService;
|
import org.jeecg.modules.iot.service.IParameterGroupService;
|
import org.jeecg.modules.iot.service.IServerDeployService;
|
import org.jeecg.modules.iot.util.HttpClientUtil;
|
import org.jeecg.modules.iot.util.JaxbUtil;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Lazy;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import javax.sql.DataSource;
|
import java.sql.*;
|
import java.text.ParseException;
|
import java.text.SimpleDateFormat;
|
import java.time.LocalDate;
|
import java.time.LocalDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.util.*;
|
import java.util.Date;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Description: 设备
|
* @Author: cuikaidong
|
* @Date: 2024-12-23
|
* @Version: V1.0
|
*/
|
@Service("iotEquipmentServiceImpl")
|
public class EquipmentServiceImpl extends ServiceImpl<EquipmentMapper, Equipment> implements IEquipmentService {
|
@Value("${script}")
|
private String script;
|
@Value("${databaseType}")
|
private String databaseType;
|
@Value("${ftp.address}")
|
private String ftpAddress;
|
@Autowired
|
private IParameterGroupService parameterGroupService;
|
@Lazy
|
@Autowired
|
private IServerDeployService serverDeployService;
|
@Autowired
|
private org.jeecg.modules.iot.mdc.service.IEquipmentService equipmentService;
|
@Autowired
|
private DataSource dataSource;
|
|
@Value("${drive}")
|
private String drive;
|
|
@Override
|
public List<SystemType> findProjectCode() {
|
Config entity = (Config) JaxbUtil.xmlToBean(drive, Config.class);
|
return entity.getSystemTypeList();
|
}
|
|
@Override
|
public Object[] queryEquipmentSubscribeList(String id) {
|
Object[] strings = new Object[0];
|
List<Equipment> equipmentList = new LambdaQueryChainWrapper<>(baseMapper)
|
.eq(Equipment::getServerId, id).list();
|
if (CollectionUtil.isNotEmpty(equipmentList)) {
|
Object[] objects = new Object[equipmentList.size()];
|
for (int i = 0; i < equipmentList.size(); i++) {
|
objects[i] = equipmentList.get(i).getReadTopic();
|
}
|
return objects;
|
}
|
return strings;
|
}
|
|
@Override
|
public List<ControlSystem> findControlSystems(String type) {
|
Config entity = (Config) JaxbUtil.xmlToBean(drive, Config.class);
|
List<ControlSystem> controlSystemList = entity.getControlSystemList();
|
return controlSystemList.stream().filter(c -> c.getType().equals(type)).collect(Collectors.toList());
|
}
|
|
@Override
|
public List<ByteOrder> findByteOrder() {
|
Config entity = (Config) JaxbUtil.xmlToBean(drive, Config.class);
|
return entity.getByteOrderList();
|
}
|
|
@Override
|
public List<SystemDataType> findSystemDataTypeList() {
|
Config entity = (Config) JaxbUtil.xmlToBean(drive, Config.class);
|
return entity.getSystemDataTypeList();
|
}
|
|
@Override
|
public List<Parameter> findParameterById(String id) {
|
ParameterGroup parameterGroup = parameterGroupService.getById(id);
|
Equipment equipment = this.baseMapper.selectById(parameterGroup.getEquipmentId());
|
Config entity = (Config) JaxbUtil.xmlToBean(drive, Config.class);
|
List<ControlSystem> controlSystemList = entity.getControlSystemList();
|
List<ControlSystem> list1 = controlSystemList.stream().filter(c -> c.getType().equals(equipment.getEqptType()) && c.getName().equals(equipment.getControlSystem())).collect(Collectors.toList());
|
List<Protocol> controlSystem = list1.get(0).protocolList;
|
List<Protocol> list2 = controlSystem.stream().filter(c -> c.getName().equals(equipment.getProtocol())).collect(Collectors.toList());
|
List<DriveType> driveTypeList = list2.get(0).driveTypeList;
|
List<DriveType> list3 = driveTypeList.stream().filter(c -> c.getName().equals(equipment.getDriveType())).collect(Collectors.toList());
|
return list3.get(0).registerInfoList.get(0).defaultParameterList.get(0).parameterList;
|
}
|
|
@Override
|
public List<Parameter> findParameterCustomizeById(String id) {
|
ParameterGroup parameterGroup = parameterGroupService.getById(id);
|
Equipment equipment = this.baseMapper.selectById(parameterGroup.getEquipmentId());
|
Config entity = (Config) JaxbUtil.xmlToBean(drive, Config.class);
|
List<ControlSystem> controlSystemList = entity.getControlSystemList();
|
List<ControlSystem> list1 = controlSystemList.stream().filter(c -> c.getType().equals(equipment.getEqptType()) && c.getName().equals(equipment.getControlSystem())).collect(Collectors.toList());
|
List<Protocol> controlSystem = list1.get(0).protocolList;
|
List<Protocol> list2 = controlSystem.stream().filter(c -> c.getName().equals(equipment.getProtocol())).collect(Collectors.toList());
|
List<DriveType> driveTypeList = list2.get(0).driveTypeList;
|
List<DriveType> list3 = driveTypeList.stream().filter(c -> c.getName().equals(equipment.getDriveType())).collect(Collectors.toList());
|
return list3.get(0).registerInfoList.get(0).registerTypeList.get(0).parameterList;
|
}
|
|
@Override
|
public List<Parameter> findDataTypeById(String id) {
|
ParameterGroup parameterGroup = parameterGroupService.getById(id);
|
Equipment equipment = this.baseMapper.selectById(parameterGroup.getEquipmentId());
|
Config entity = (Config) JaxbUtil.xmlToBean(drive, Config.class);
|
List<ControlSystem> controlSystemList = entity.getControlSystemList();
|
List<ControlSystem> list1 = controlSystemList.stream().filter(c -> c.getType().equals(equipment.getEqptType()) && c.getName().equals(equipment.getControlSystem())).collect(Collectors.toList());
|
List<Protocol> controlSystem = list1.get(0).protocolList;
|
List<Protocol> list2 = controlSystem.stream().filter(c -> c.getName().equals(equipment.getProtocol())).collect(Collectors.toList());
|
List<DriveType> driveTypeList = list2.get(0).driveTypeList;
|
List<DriveType> list3 = driveTypeList.stream().filter(c -> c.getName().equals(equipment.getDriveType())).collect(Collectors.toList());
|
return list3.get(0).registerInfoList.get(0).dataTypeList.get(0).parameterList;
|
}
|
|
@Override
|
public List<Equipment> findEquipmentByServerId(String id) {
|
return new LambdaQueryChainWrapper<>(baseMapper).eq(Equipment::getServerId, id).list();
|
}
|
|
@Override
|
public Boolean findEquipmentByName(Equipment equipment, Integer type) {
|
List<Equipment> equipmentList = new LambdaQueryChainWrapper<>(baseMapper)
|
.eq(Equipment::getServerId, equipment.getServerId())
|
.eq(Equipment::getEquipmentType, type)
|
.eq(Equipment::getEqptName, equipment.getEqptName())
|
.list();
|
return equipmentList.size() > 0;
|
}
|
|
@Override
|
public Boolean findEquipmentByCode(Equipment equipment) {
|
List<Equipment> equipmentList = new LambdaQueryChainWrapper<>(baseMapper)
|
.eq(Equipment::getServerId, equipment.getServerId())
|
.eq(Equipment::getEqptCode, equipment.getEqptCode())
|
.list();
|
return equipmentList.size() > 0;
|
}
|
|
@Override
|
public List<FunctionEntity> findFunctionInformationList(String functionType) {
|
Functions entity = (Functions) JaxbUtil.xmlToBean(script, Functions.class);
|
switch (functionType) {
|
case "Convert":
|
return entity.getConvertEntity().getFunctionList();
|
case "Math":
|
return entity.getMathEntity().getFunctionEntityList();
|
case "String":
|
return entity.getStringEntity().getFunctionEntityList();
|
case "File":
|
return entity.getFileEntity().getFunctionEntityList();
|
case "Time":
|
return entity.getTimeEntity().getFunctionEntityList();
|
case "Database":
|
return entity.getDatabaseEntity().getFunctionEntityList();
|
case "Custom":
|
return entity.getCustomEntity().getFunctionEntityList();
|
default:
|
return null;
|
}
|
}
|
|
@Override
|
public void createEmptyEquipmentTable(Date deployDate, String serverId) {
|
List<Equipment> equipmentList = new LambdaQueryChainWrapper<>(baseMapper)
|
.eq(Equipment::getServerId, serverId)
|
.eq(Equipment::getEquipmentType, 0)
|
.ge(Equipment::getCreateTime, deployDate)
|
.list();
|
equipmentList.forEach(equipment -> {
|
// 将所有的 '-' 替换为 '_'
|
equipment.setEqptCode(equipment.getEqptCode().replace("-", "_"));
|
if (databaseType.equals("SqlServer")) {
|
createSqlServerTable(equipment.getControlSystem() + '_' + equipment.getEqptCode(),
|
"CollectTime DATETIME DEFAULT GETDATE() NOT NULL",
|
"EquipmentID NVARCHAR(50) NOT NULL",
|
"PRIMARY KEY (CollectTime)"
|
);
|
} else if (databaseType.equals("MySql")) {
|
createMySqlTable(equipment.getControlSystem() + '_' + equipment.getEqptCode(),
|
"CollectTime DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL",
|
"EquipmentID VARCHAR(50) CHARACTER SET utf8mb4 NOT NULL",
|
"PRIMARY KEY (CollectTime)"
|
);
|
}
|
// 给mdc设备表增加设备信息
|
org.jeecg.modules.iot.mdc.entity.Equipment mdcEquipment = new org.jeecg.modules.iot.mdc.entity.Equipment();
|
mdcEquipment.setEquipmentid(equipment.getEqptCode());
|
mdcEquipment.setEquipmentname(equipment.getEqptName());
|
mdcEquipment.setSavetablename(equipment.getControlSystem() + '_' + equipment.getEqptCode());
|
mdcEquipment.setDrivetype(equipment.getControlSystem());
|
equipmentService.save(mdcEquipment);
|
});
|
}
|
|
/**
|
* 创建实时数据表
|
*
|
* @param tableName
|
* @param columns
|
*/
|
private void createSqlServerTable(String tableName, String... columns) {
|
String tableSql = String.format(
|
"IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = '%s' ) " +
|
"CREATE TABLE [%s] (%s)",
|
tableName, tableName, String.join(", ", columns)
|
);
|
try (Connection conn = dataSource.getConnection()) {
|
Statement stmt = conn.createStatement();
|
stmt.executeUpdate(tableSql);
|
System.out.println("表创建成功: " + tableName);
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 创建MySQL数据表
|
*
|
* @param tableName 表名
|
* @param columns 列定义,格式如 "column_name DATATYPE [CONSTRAINT]"
|
*/
|
private void createMySqlTable(String tableName, String... columns) {
|
String tableSql = String.format(
|
"CREATE TABLE IF NOT EXISTS `%s` (%s)",
|
tableName, String.join(", ", columns)
|
);
|
try (Connection conn = dataSource.getConnection()) {
|
Statement stmt = conn.createStatement();
|
stmt.executeUpdate(tableSql);
|
System.out.println("表创建成功: " + tableName);
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
|
@Override
|
@Transactional(rollbackFor = Exception.class)
|
public Result<?> addEmpty(Equipment equipment) {
|
// 验证编码名称是否重复
|
if (this.findEquipmentByName(equipment, 0)) {
|
return Result.error("设备名称已存在!");
|
}
|
if (this.findEquipmentByCode(equipment)) {
|
return Result.error("设备编号已存在!");
|
}
|
// 分类
|
equipment.setEquipmentType(0);
|
ServerDeploy serverDeploy = serverDeployService.getById(equipment.getServerId());
|
this.save(equipment);
|
equipment.setReadTopic("IOT/" + serverDeploy.getServerCode() + "/VirtualDevices/" + equipment.getEqptCode() + "/Read");
|
equipment.setWriteTopic("IOT." + serverDeploy.getServerCode() + ".VirtualDevices." + equipment.getEqptCode());
|
this.updateById(equipment);
|
// 新增虚设备时调用
|
// 新建项目目录时或者项目载入时调用
|
Map<String, String> param = new HashMap<>();
|
param.put("path", ftpAddress + serverDeploy.getServerCode() + "/script");
|
HttpClientUtil.doGet("http://localhost:3002/ScriptCompiler/ProjectPath", param);
|
param.clear();
|
param.put("id", equipment.getEqptCode());
|
HttpClientUtil.doGet("http://localhost:3002/ScriptCompiler/AddDevicescript", param);
|
return Result.ok("添加成功!");
|
}
|
|
@Override
|
public Equipment findEquipmentByCode(String code) {
|
return new LambdaQueryChainWrapper<>(baseMapper)
|
.eq(Equipment::getEqptCode, code)
|
.one();
|
}
|
|
@Override
|
public void insertSqlServerData(String tableName, String[] columns, Object[] values) {
|
if (columns.length != values.length) {
|
throw new IllegalArgumentException("字段与值的数量不匹配");
|
}
|
|
// 转义表名和列名(SQL Server使用方括号[])
|
String escapedTableName = "[" + tableName.replace("]", "]]") + "]";
|
|
// 构建列名和占位符
|
StringBuilder columnBuilder = new StringBuilder();
|
StringBuilder placeholderBuilder = new StringBuilder();
|
|
for (int i = 0; i < columns.length; i++) {
|
if (i > 0) {
|
columnBuilder.append(", ");
|
placeholderBuilder.append(", ");
|
}
|
// 转义列名,处理可能包含的特殊字符
|
String escapedColumn = "[" + columns[i].replace("]", "]]") + "]";
|
columnBuilder.append(escapedColumn);
|
placeholderBuilder.append("?");
|
}
|
|
String sql = String.format("INSERT INTO %s (%s) VALUES (%s)",
|
escapedTableName,
|
columnBuilder.toString(),
|
placeholderBuilder.toString());
|
|
// log.debug("构建SQL: {}", sql);
|
|
try (Connection conn = dataSource.getConnection();
|
PreparedStatement pstmt = conn.prepareStatement(sql)) {
|
|
// 设置参数,根据值类型选择合适的set方法
|
for (int i = 0; i < values.length; i++) {
|
Object value = values[i];
|
String columnName = columns[i];
|
|
// log.debug("设置参数 {}: 列名={}, 值={}, 类型={}",
|
// i+1, columnName, value, value != null ? value.getClass().getName() : "NULL");
|
|
if (value == null) {
|
// 对于NULL值,需要指定类型
|
if (columnName.toLowerCase().contains("bool")) {
|
pstmt.setNull(i + 1, Types.BIT);
|
} else if (columnName.toLowerCase().contains("int")) {
|
pstmt.setNull(i + 1, Types.INTEGER);
|
} else if (columnName.toLowerCase().contains("time") ||
|
columnName.toLowerCase().contains("date")) {
|
pstmt.setNull(i + 1, Types.TIMESTAMP);
|
} else {
|
pstmt.setNull(i + 1, Types.VARCHAR);
|
}
|
} else if (value instanceof String) {
|
pstmt.setString(i + 1, (String) value);
|
} else if (value instanceof Integer) {
|
pstmt.setInt(i + 1, (Integer) value);
|
} else if (value instanceof Long) {
|
pstmt.setLong(i + 1, (Long) value);
|
} else if (value instanceof Boolean) {
|
pstmt.setBoolean(i + 1, (Boolean) value);
|
} else if (value instanceof Date) {
|
// 将java.util.Date转换为java.sql.Timestamp
|
pstmt.setTimestamp(i + 1, new Timestamp(((Date) value).getTime()));
|
} else if (value instanceof Calendar) {
|
pstmt.setTimestamp(i + 1, new Timestamp(
|
((Calendar) value).getTimeInMillis()));
|
} else if (value instanceof byte[]) {
|
pstmt.setBytes(i + 1, (byte[]) value);
|
} else if (value instanceof boolean[]) {
|
// 布尔数组处理:转换为逗号分隔的字符串
|
boolean[] boolArray = (boolean[]) value;
|
StringBuilder sb = new StringBuilder();
|
for (int j = 0; j < boolArray.length; j++) {
|
if (j > 0) sb.append(",");
|
sb.append(boolArray[j] ? "1" : "0");
|
}
|
pstmt.setString(i + 1, sb.toString());
|
} else if (value instanceof int[]) {
|
// 整数数组处理:转换为逗号分隔的字符串
|
int[] intArray = (int[]) value;
|
StringBuilder sb = new StringBuilder();
|
for (int j = 0; j < intArray.length; j++) {
|
if (j > 0) sb.append(",");
|
sb.append(intArray[j]);
|
}
|
pstmt.setString(i + 1, sb.toString());
|
} else {
|
LocalDateTime localDateTime = LocalDateTime.parse(values[i].toString());
|
pstmt.setTimestamp(i + 1, Timestamp.valueOf(localDateTime));
|
}
|
}
|
|
pstmt.executeUpdate();
|
// log.debug("数据插入成功: {}", tableName);
|
} catch (SQLException e) {
|
log.error("SQL Server插入数据失败: " + sql, e);
|
throw new RuntimeException("SQL Server插入数据失败", e);
|
}
|
}
|
|
/**
|
* 向MySQL表插入数据
|
*
|
* @param tableName 表名
|
* @param columns 列名数组
|
* @param values 对应的值数组
|
*/
|
public void insertMySqlData(String tableName, String[] columns, Object[] values) {
|
if (columns.length != values.length) {
|
throw new IllegalArgumentException("字段与值的数量不匹配");
|
}
|
|
// 转义表名(MySQL使用反引号``)
|
String escapedTableName = "`" + tableName.replace("`", "``") + "`";
|
|
// 构建列名和占位符
|
StringBuilder columnBuilder = new StringBuilder();
|
StringBuilder placeholderBuilder = new StringBuilder();
|
|
for (int i = 0; i < columns.length; i++) {
|
if (i > 0) {
|
columnBuilder.append(", ");
|
placeholderBuilder.append(", ");
|
}
|
// 转义列名,处理特殊字符
|
String escapedColumn = "`" + columns[i].replace("`", "``") + "`";
|
columnBuilder.append(escapedColumn);
|
placeholderBuilder.append("?");
|
}
|
|
String sql = String.format("INSERT INTO %s (%s) VALUES (%s)",
|
escapedTableName,
|
columnBuilder.toString(),
|
placeholderBuilder.toString());
|
|
try (Connection conn = dataSource.getConnection();
|
PreparedStatement pstmt = conn.prepareStatement(sql)) {
|
|
// 设置参数,适配MySQL类型
|
for (int i = 0; i < values.length; i++) {
|
Object value = values[i];
|
String columnName = columns[i];
|
|
if (value == null) {
|
// MySQL对NULL值的类型处理更灵活,但仍建议指定
|
if (columnName.toLowerCase().contains("bool")) {
|
pstmt.setNull(i + 1, Types.BOOLEAN); // MySQL使用BOOLEAN而非BIT
|
} else if (columnName.toLowerCase().contains("int")) {
|
pstmt.setNull(i + 1, Types.INTEGER);
|
} else if (columnName.toLowerCase().contains("time") ||
|
columnName.toLowerCase().contains("date")) {
|
pstmt.setNull(i + 1, Types.TIMESTAMP);
|
} else {
|
pstmt.setNull(i + 1, Types.VARCHAR);
|
}
|
} else if (value instanceof String) {
|
// MySQL的VARCHAR有长度限制,这里可根据需要截断或校验
|
String strValue = (String) value;
|
pstmt.setString(i + 1, strValue);
|
} else if (value instanceof Integer) {
|
pstmt.setInt(i + 1, (Integer) value);
|
} else if (value instanceof Long) {
|
pstmt.setLong(i + 1, (Long) value);
|
} else if (value instanceof Boolean) {
|
// MySQL中布尔值存储为1(true)和0(false)
|
pstmt.setBoolean(i + 1, (Boolean) value);
|
} else if (value instanceof Date) {
|
// MySQL推荐使用Timestamp类型
|
pstmt.setTimestamp(i + 1, new Timestamp(((Date) value).getTime()));
|
} else if (value instanceof Calendar) {
|
pstmt.setTimestamp(i + 1, new Timestamp(
|
((Calendar) value).getTimeInMillis()));
|
} else if (value instanceof byte[]) {
|
pstmt.setBytes(i + 1, (byte[]) value);
|
} else if (value instanceof boolean[]) {
|
// 布尔数组转为逗号分隔字符串(同SQL Server逻辑)
|
boolean[] boolArray = (boolean[]) value;
|
StringBuilder sb = new StringBuilder();
|
for (int j = 0; j < boolArray.length; j++) {
|
if (j > 0) sb.append(",");
|
sb.append(boolArray[j] ? "1" : "0");
|
}
|
pstmt.setString(i + 1, sb.toString());
|
} else if (value instanceof int[]) {
|
// 整数数组转为逗号分隔字符串
|
int[] intArray = (int[]) value;
|
StringBuilder sb = new StringBuilder();
|
for (int j = 0; j < intArray.length; j++) {
|
if (j > 0) sb.append(",");
|
sb.append(intArray[j]);
|
}
|
pstmt.setString(i + 1, sb.toString());
|
} else if (value instanceof LocalDateTime) {
|
// 直接处理Java 8时间类型(MySQL 8.0+推荐)
|
pstmt.setTimestamp(i + 1, Timestamp.valueOf((LocalDateTime) value));
|
} else {
|
// 未知类型转为字符串
|
pstmt.setString(i + 1, value.toString());
|
}
|
}
|
|
pstmt.executeUpdate();
|
System.out.println("数据插入成功: " + tableName);
|
} catch (SQLException e) {
|
System.err.println("MySQL插入数据失败: " + sql);
|
e.printStackTrace();
|
throw new RuntimeException("MySQL插入数据失败", e);
|
}
|
}
|
}
|