package org.jeecg.modules.iot.depository;
|
|
|
import com.influxdb.query.FluxRecord;
|
import com.influxdb.query.FluxTable;
|
|
import java.sql.Connection;
|
import java.sql.PreparedStatement;
|
import java.sql.SQLException;
|
import java.time.OffsetDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
|
public class InfluxdbTest {
|
/**
|
* @param connection
|
* @throws SQLException
|
*/
|
public static void createTable(Connection connection) throws SQLException {
|
String createTableQuery = "CREATE TABLE IF NOT EXISTS influxdb_data (" +
|
"id INT AUTO_INCREMENT PRIMARY KEY, " +
|
"time VARCHAR(255), " +
|
"measurement VARCHAR(255), " +
|
"field_key VARCHAR(255), " +
|
"field_value VARCHAR(255))";
|
try (PreparedStatement statement = connection.prepareStatement(createTableQuery)) {
|
statement.execute();
|
}
|
}
|
|
public static List<Influxdb> processAndInsert(List<FluxTable> tables, Connection mysqlConnection) throws SQLException {
|
// String insertQuery = "INSERT INTO influxdb_data (time, measurement, field_key, field_value, field) " +
|
// "VALUES (?, ?, ?, ?, ?)";
|
List<Influxdb> influxdbList = new ArrayList<>();
|
// try (PreparedStatement insertStatement = mysqlConnection.prepareStatement(insertQuery)) {
|
|
for (FluxTable table : tables) {
|
List<FluxRecord> records = table.getRecords();
|
for (FluxRecord record : records) {
|
Influxdb influxdb = new Influxdb();
|
for (Map.Entry<String, Object> entry : record.getValues().entrySet()) {
|
String key = entry.getKey();
|
Object value = entry.getValue();
|
// System.out.println("Key: " + key + ", Value: " + value);
|
// 拆分表数据
|
if (key.equals("_time")) {
|
// 解析 ISO 8601 字符串
|
OffsetDateTime odt = OffsetDateTime.parse(value.toString());
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
|
String customFormat = odt.format(formatter);
|
influxdb.setTime(customFormat);
|
// insertStatement.setString(1, customFormat);
|
} else if (key.equals("_value")) {
|
if (value != null) {
|
influxdb.setValue(value.toString());
|
// insertStatement.setString(4, value.toString());
|
} else {
|
// insertStatement.setString(4, "");
|
influxdb.setValue("");
|
}
|
} else if (key.equals("Parameter")) {
|
influxdb.setParameter(value.toString());
|
// insertStatement.setString(3, value.toString());
|
|
} else if (key.equals("_measurement")) {
|
influxdb.setMeasurement(value.toString());
|
// insertStatement.setString(2, value.toString());
|
} else if (key.equals("_field")) {
|
influxdb.setField(value.toString());
|
// insertStatement.setString(5, value.toString());
|
}
|
}
|
influxdbList.add(influxdb);
|
// insertStatement.executeUpdate();
|
}
|
}
|
// }
|
return influxdbList;
|
}
|
}
|