fix: 物联网平台,amqp数据接入并插入TD数据库相关逻辑编码。
This commit is contained in:
@@ -25,44 +25,44 @@ public class DeviceSensorData {
|
||||
private Double tfluorescence; // 荧光值
|
||||
private Double phaseDifference; // 相位差
|
||||
private Double battery; // 电池电量
|
||||
|
||||
|
||||
// Getter 方法,返回保留两位小数的值
|
||||
public Double getDissolvedOxygen() {
|
||||
return roundToTwoDecimals(dissolvedOxygen);
|
||||
}
|
||||
|
||||
|
||||
public Double getTemperature() {
|
||||
return roundToTwoDecimals(temperature);
|
||||
}
|
||||
|
||||
|
||||
public Double getSaturability() {
|
||||
return roundToTwoDecimals(saturability);
|
||||
}
|
||||
|
||||
|
||||
public Double getPh() {
|
||||
return roundToTwoDecimals(ph);
|
||||
}
|
||||
|
||||
|
||||
public Double getSalinity() {
|
||||
return roundToTwoDecimals(salinity);
|
||||
}
|
||||
|
||||
|
||||
public Double getTreference() {
|
||||
return roundToTwoDecimals(treference);
|
||||
}
|
||||
|
||||
|
||||
public Double getTfluorescence() {
|
||||
return roundToTwoDecimals(tfluorescence);
|
||||
}
|
||||
|
||||
|
||||
public Double getPhaseDifference() {
|
||||
return roundToTwoDecimals(phaseDifference);
|
||||
}
|
||||
|
||||
|
||||
public Double getBattery() {
|
||||
return roundToTwoDecimals(battery);
|
||||
}
|
||||
|
||||
|
||||
// 工具方法:保留两位小数
|
||||
private Double roundToTwoDecimals(Double value) {
|
||||
if (value == null) {
|
||||
@@ -74,11 +74,12 @@ public class DeviceSensorData {
|
||||
}
|
||||
|
||||
// 标签字段(元数据,不随时间频繁变化)
|
||||
private String tenantId = "111111"; // 租户ID(默认值)
|
||||
private String serialNum; // 设备序列号
|
||||
private long deviceId; // 设备ID(对应TDengine的BIGINT)
|
||||
private long userId; // 用户ID(对应TDengine的BIGINT)
|
||||
private Long deviceId; // 设备ID(对应TDengine的BIGINT)
|
||||
private Long userId; // 用户ID(对应TDengine的BIGINT)
|
||||
private String userName; // 用户名
|
||||
private String mobilePhone; // 手机号
|
||||
private String deviceName; // 设备名称
|
||||
private int deviceType; // 设备类型
|
||||
private Integer deviceType; // 设备类型
|
||||
}
|
||||
|
||||
@@ -40,6 +40,16 @@ public interface DeviceSensorDataMapper {
|
||||
* @return 结果
|
||||
*/
|
||||
void createTable(DeviceSensorData deviceSensorData);
|
||||
|
||||
/**
|
||||
* 检查子表是否存在
|
||||
*
|
||||
* @param serialNum 设备序列号
|
||||
* @return 存在返回1,不存在返回0
|
||||
*/
|
||||
@DS("taos")
|
||||
@InterceptorIgnore(tenantLine = "true")
|
||||
Integer checkTableExists(@Param("serialNum") String serialNum);
|
||||
|
||||
/**
|
||||
* 批量插入数据
|
||||
|
||||
@@ -17,6 +17,74 @@ import java.util.List;
|
||||
public class DeviceSensorDataService implements IDeviceSensorDataService {
|
||||
@Resource
|
||||
private DeviceSensorDataMapper deviceSensorDataMapper;
|
||||
|
||||
/**
|
||||
* 已创建的子表缓存(设备序列号)
|
||||
* 使用 ConcurrentHashMap 保证线程安全
|
||||
*/
|
||||
private static final java.util.concurrent.ConcurrentHashMap<String, Boolean> CREATED_TABLES = new java.util.concurrent.ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 数据缓冲队列,用于批量插入优化
|
||||
*/
|
||||
private static final java.util.concurrent.BlockingQueue<DeviceSensorData> DATA_BUFFER = new java.util.concurrent.LinkedBlockingQueue<>(10000);
|
||||
|
||||
/**
|
||||
* 批量插入配置
|
||||
*/
|
||||
private static final int BATCH_SIZE = 100; // 每批次插入数量
|
||||
private static final long BATCH_TIMEOUT_MS = 1000; // 批次超时时间(毫秒)
|
||||
|
||||
/**
|
||||
* 启动批量插入线程
|
||||
*/
|
||||
@jakarta.annotation.PostConstruct
|
||||
public void startBatchInsertThread() {
|
||||
Thread batchThread = new Thread(() -> {
|
||||
List<DeviceSensorData> batch = new ArrayList<>(BATCH_SIZE);
|
||||
long lastInsertTime = System.currentTimeMillis();
|
||||
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
// 从队列中取数据,最多等待100ms
|
||||
DeviceSensorData data = DATA_BUFFER.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
|
||||
|
||||
if (data != null) {
|
||||
batch.add(data);
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
boolean shouldInsert = batch.size() >= BATCH_SIZE ||
|
||||
(batch.size() > 0 && (now - lastInsertTime) >= BATCH_TIMEOUT_MS);
|
||||
|
||||
if (shouldInsert) {
|
||||
batchInsertDeviceSensorDataInternal(new ArrayList<>(batch));
|
||||
batch.clear();
|
||||
lastInsertTime = now;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("批量插入线程被中断");
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
log.error("批量插入线程异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
// 线程结束前,插入剩余数据
|
||||
if (!batch.isEmpty()) {
|
||||
try {
|
||||
batchInsertDeviceSensorDataInternal(batch);
|
||||
} catch (Exception e) {
|
||||
log.error("插入剩余数据失败", e);
|
||||
}
|
||||
}
|
||||
}, "TDengine-BatchInsert-Thread");
|
||||
|
||||
batchThread.setDaemon(true);
|
||||
batchThread.start();
|
||||
log.info("✅ TDengine 批量插入线程已启动");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
@@ -31,14 +99,95 @@ public class DeviceSensorDataService implements IDeviceSensorDataService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量插入数据
|
||||
* 批量插入数据(异步,支持批量优化)
|
||||
*
|
||||
* @param dataList 数据列表
|
||||
* @return 影响行数
|
||||
*/
|
||||
@Override
|
||||
public void batchInsertDeviceSensorData(List<DeviceSensorData> dataList) {
|
||||
deviceSensorDataMapper.batchInsertDeviceSensorData(dataList);
|
||||
if (dataList == null || dataList.isEmpty()) {
|
||||
log.warn("批量插入数据列表为空,跳过插入");
|
||||
return;
|
||||
}
|
||||
|
||||
// 将数据加入缓冲队列,由后台线程批量插入
|
||||
for (DeviceSensorData data : dataList) {
|
||||
boolean added = DATA_BUFFER.offer(data);
|
||||
if (!added) {
|
||||
log.warn("⚠️ 数据缓冲队列已满,直接同步插入");
|
||||
// 队列满了,直接同步插入
|
||||
batchInsertDeviceSensorDataInternal(dataList);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 内部批量插入方法(同步执行)
|
||||
*
|
||||
* @param dataList 数据列表
|
||||
*/
|
||||
private void batchInsertDeviceSensorDataInternal(List<DeviceSensorData> dataList) {
|
||||
if (dataList == null || dataList.isEmpty()) {
|
||||
log.warn("批量插入数据列表为空,跳过插入");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 打印第一条数据的详细信息
|
||||
if (!dataList.isEmpty()) {
|
||||
DeviceSensorData firstData = dataList.get(0);
|
||||
// 检查子表是否存在(使用缓存优化)
|
||||
String serialNum = firstData.getSerialNum();
|
||||
String tableName = "t_" + serialNum;
|
||||
|
||||
// 先检查缓存,避免重复查询数据库
|
||||
if (!CREATED_TABLES.containsKey(serialNum)) {
|
||||
try {
|
||||
Integer exists = deviceSensorDataMapper.checkTableExists(serialNum);
|
||||
if (exists == null || exists == 0) {
|
||||
deviceSensorDataMapper.createTable(firstData);
|
||||
log.debug("子表 {} 已创建", tableName);
|
||||
}
|
||||
// 加入缓存
|
||||
CREATED_TABLES.put(serialNum, true);
|
||||
} catch (Exception e) {
|
||||
// 检查表不存在是正常情况,使用 USING 语法会自动创建
|
||||
String errorMsg = e.getMessage();
|
||||
if (errorMsg != null && errorMsg.contains("Table does not exist")) {
|
||||
log.debug("子表 {} 不存在,将自动创建", tableName);
|
||||
} else {
|
||||
log.warn("检查子表失败: {}", errorMsg != null && errorMsg.length() > 100 ? errorMsg.substring(0, 100) : errorMsg);
|
||||
}
|
||||
// 即使失败也加入缓存,避免重复尝试
|
||||
CREATED_TABLES.put(serialNum, true);
|
||||
}
|
||||
} else {
|
||||
log.debug("子表 {} 已在缓存中", tableName);
|
||||
}
|
||||
}
|
||||
|
||||
deviceSensorDataMapper.batchInsertDeviceSensorData(dataList);
|
||||
|
||||
log.debug("插入 {} 条数据", dataList.size());
|
||||
} catch (Exception e) {
|
||||
log.error("批量插入失败: {}", e.getMessage());
|
||||
|
||||
// 判断错误类型
|
||||
String errorMsg = e.getMessage();
|
||||
if (errorMsg != null) {
|
||||
if (errorMsg.contains("超级表") || errorMsg.contains("stable") || errorMsg.contains("not exist")) {
|
||||
log.error("⚠️ 可能原因:超级表 'fishery.device_sensor_data' 不存在!");
|
||||
log.error("请执行建表 SQL 创建超级表");
|
||||
} else if (errorMsg.contains("数据库") || errorMsg.contains("database")) {
|
||||
log.error("⚠️ 可能原因:数据库 'fishery' 不存在!");
|
||||
} else if (errorMsg.contains("字段") || errorMsg.contains("column")) {
|
||||
log.error("⚠️ 可能原因:字段不匹配或类型错误");
|
||||
}
|
||||
}
|
||||
|
||||
throw new RuntimeException("插入 TDengine 数据失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -32,23 +32,29 @@
|
||||
|
||||
create table if not exists
|
||||
|
||||
`fishery`.t_#{serialNum}
|
||||
`fishery`.`t_${serialNum}`
|
||||
|
||||
using fishery.device_sensor_data
|
||||
|
||||
tags(#{tenantId},#{serialNum},#{deviceId},#{userId},#{userName},#{mobilePhone},#{deviceName},#{deviceType})
|
||||
</update>
|
||||
|
||||
<select id="checkTableExists" resultType="java.lang.Integer">
|
||||
SELECT COUNT(*) FROM information_schema.ins_tables
|
||||
WHERE db_name = 'fishery' AND stable_name = 'device_sensor_data' AND table_name = 't_${serialNum}'
|
||||
</select>
|
||||
|
||||
<insert id="batchInsertDeviceSensorData">
|
||||
insert into
|
||||
<foreach collection="dataList" item="data" open="" close="" separator=" ">
|
||||
`fishery`.t_#{data.serialNum}
|
||||
`fishery`.`t_${data.serialNum}`
|
||||
|
||||
using fishery.device_sensor_data
|
||||
|
||||
tags(#{data.tenantId},#{data.serialNum},#{data.deviceId},#{data.userId},#{data.userName},#{data.mobilePhone},#{data.deviceName},#{data.deviceType})
|
||||
|
||||
(time, createTime, dissolvedOxygen, temperature, saturability, ph, salinity, treference, tfluorescence, phaseDifference, battery) values (#{data.time}, ${data.createTime}, ${data.dissolvedOxygen}, ${data.temperature}, ${data.saturability}, ${data.ph}, ${data.salinity}, ${data.treference}, ${data.tfluorescence}, ${data.phaseDifference}, ${data.battery})
|
||||
(time, createTime, dissolvedOxygen, temperature, saturability, ph, salinity, treference, tfluorescence, phaseDifference, battery)
|
||||
values (#{data.time}, #{data.createTime}, #{data.dissolvedOxygen}, #{data.temperature}, #{data.saturability}, #{data.ph}, #{data.salinity}, #{data.treference}, #{data.tfluorescence}, #{data.phaseDifference}, #{data.battery})
|
||||
</foreach>
|
||||
</insert>
|
||||
|
||||
|
||||
Reference in New Issue
Block a user