fix: 数据接入,控制器需要判断开关数据和溶解氧数据。

This commit is contained in:
tianyongbao
2026-01-21 18:21:21 +08:00
parent 5267dc01af
commit 212e596237

View File

@@ -12,16 +12,19 @@ import com.intc.iot.mapper.IotDeviceMapper;
import com.intc.iot.service.VmsNoticeService;
import com.intc.fishery.domain.Device;
import com.intc.fishery.mapper.DeviceMapper;
import com.intc.fishery.mapper.DeviceSwitchMapper;
import com.intc.tdengine.domain.DeviceSensorData;
import com.intc.tdengine.service.IDeviceSensorDataService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@@ -50,16 +53,17 @@ public class DeviceDataHandler {
*/
private final AquAlarmHistoryMapper alarmHistoryMapper;
/**
* IoT设备 Mapper
*/
private final IotDeviceMapper iotDeviceMapper;
/**
* 设备 Mapper
*/
private final DeviceMapper deviceMapper;
/**
* 设备开关 Mapper
*/
private final DeviceSwitchMapper deviceSwitchMapper;
/**
* VMS 语音通知服务
*/
@@ -67,6 +71,12 @@ public class DeviceDataHandler {
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 控制器产品编码(从配置文件读取)
*/
@Value("${device-type.control-integrated:a1Xj9dagTIx}")
private String controllerProductKey;
/**
* 呼叫状态:未呼叫
*/
@@ -132,29 +142,52 @@ public class DeviceDataHandler {
// 保存到 TDengine 时序数据库
DeviceSensorData sensorData = null;
if (params != null && params.size() > 3) {
// // 检查 sensorerrorcode 是否为 6
// Object errorCode = params.get("sensorerrorcode");
// if (errorCode == null || !"6".equals(String.valueOf(errorCode))) {
// log.debug("数据误码不为6已过滤: {} (errorCode: {})", deviceName, errorCode);
// return;
// }
// 判断是否为控制器设备使用配置文件中的ProductKey
boolean isController = controllerProductKey.equals(productKey);
if (params != null && params.size() > 0) {
try {
sensorData = convertToSensorData(productKey, deviceName, params);
if (sensorData != null) {
// 1. 插入到 TDengine
deviceSensorDataService.batchInsertDeviceSensorData(java.util.Collections.singletonList(sensorData));
log.debug("数据已保存: {}", deviceName);
// 如果是控制器,需要区分处理
if (isController) {
// 判断数据类型(开关数据和传感器数据不会同时出现)
if (hasSwitchData(params)) {
// 开关数据更新MySQL的开关状态和电压电流
handleSwitchData(deviceName, params);
} else if (hasSensorData(params)) {
// 传感器数据写入TDengine
sensorData = convertToSensorData(productKey, deviceName, params);
if (sensorData != null) {
// 插入到 TDengine
deviceSensorDataService.batchInsertDeviceSensorData(java.util.Collections.singletonList(sensorData));
log.debug("控制器溶解氧数据已保存: {}", deviceName);
// 2. 更新设备表的实时数据
updateDeviceRealTimeData(deviceName, sensorData);
// 更新设备表的实时数据
updateDeviceRealTimeData(deviceName, sensorData);
}
} else {
// 既不是开关数据也不是传感器数据
log.debug("控制器数据类型未知: {}", deviceName);
}
} else {
// 非控制器(水质检测仪),正常处理
if (params.size() > 3) {
sensorData = convertToSensorData(productKey, deviceName, params);
if (sensorData != null) {
// 插入到 TDengine
deviceSensorDataService.batchInsertDeviceSensorData(java.util.Collections.singletonList(sensorData));
log.debug("数据已保存: {}", deviceName);
// 更新设备表的实时数据
updateDeviceRealTimeData(deviceName, sensorData);
}
} else {
log.debug("数据字段不足3个已过滤: {} (字段数: {})", deviceName, params.size());
}
}
} catch (Exception e) {
log.error("保存数据失败[{}]: {}", deviceName, e.getMessage());
}
} else if (params != null && params.size() < 3) {
log.debug("数据字段不足3个已过滤: {} (字段数: {})", deviceName, params.size());
}
//
// // 检查是否触发报警
@@ -654,6 +687,164 @@ public class DeviceDataHandler {
}
}
/**
* 判断是否包含传感器数据(溶解氧相关)
*
* @param params 参数
* @return true: 包含传感器数据, false: 不包含
*/
private boolean hasSensorData(JSONObject params) {
if (params == null) {
return false;
}
// 判断是否包含溶解氧相关字段
return params.containsKey("dissolvedOxygen")
|| params.containsKey("dissolvedoxygen")
|| params.containsKey("currentTemperature")
|| params.containsKey("Tfluorescence")
|| params.containsKey("Treference")
|| params.containsKey("dosat");
}
/**
* 判断是否包含开关数据Switch1-4等
*
* @param params 参数
* @return true: 包含开关数据, false: 不包含
*/
private boolean hasSwitchData(JSONObject params) {
if (params == null) {
return false;
}
// 判断是否包含Switch1-4或switch1_VoltCur-switch4_VoltCur等字段
for (int i = 1; i <= 4; i++) {
if (params.containsKey("Switch" + i)
|| params.containsKey("switch" + i + "_VoltCur")) {
return true;
}
}
return false;
}
/**
* 处理开关状态数据
* 更新MySQL中的设备开关状态
*
* @param deviceName 设备名称
* @param params 参数
*/
private void handleSwitchData(String deviceName, JSONObject params) {
try {
// 查询设备
Device device = deviceMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<Device>()
.eq(Device::getSerialNum, deviceName)
.select(Device::getId)
);
if (device == null) {
log.debug("设备不存在,无法更新开关状态: {}", deviceName);
return;
}
// 查询该设备的所有开关
List<com.intc.fishery.domain.DeviceSwitch> switches = deviceSwitchMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<com.intc.fishery.domain.DeviceSwitch>()
.eq(com.intc.fishery.domain.DeviceSwitch::getDeviceId, device.getId())
);
if (switches.isEmpty()) {
return;
}
// 更新每个开关的状态
for (com.intc.fishery.domain.DeviceSwitch sw : switches) {
Integer switchIndex = sw.getIndex();
if (switchIndex == null) {
continue;
}
String switchKey = "Switch" + switchIndex;
if (params.containsKey(switchKey)) {
Integer switchStatus = params.getInt(switchKey);
if (switchStatus != null) {
// 更新开关状态
com.intc.fishery.domain.DeviceSwitch updateSwitch = new com.intc.fishery.domain.DeviceSwitch();
updateSwitch.setId(sw.getId());
updateSwitch.setIsOpen(switchStatus);
deviceSwitchMapper.updateById(updateSwitch);
log.debug("更新开关状态: 设备={}, 开关索引={}, 状态={}",
deviceName, switchIndex, switchStatus);
}
}
// 处理电压电流数据
String voltCurKey = "switch" + switchIndex + "_VoltCur";
if (params.containsKey(voltCurKey)) {
try {
JSONObject voltCurData = params.getJSONObject(voltCurKey);
// 解析A、B、C三相数据找到非0的值
Double voltage = null;
Double current = null;
for (String phase : new String[]{"A", "B", "C"}) {
if (voltCurData.containsKey(phase)) {
String phaseDataStr = voltCurData.getStr(phase);
if (phaseDataStr != null && !phaseDataStr.isEmpty()) {
try {
// 解析JSON字符串例如: {"volt":227,"cur":0.00}
JSONObject phaseData = JSONUtil.parseObj(phaseDataStr);
Double phaseVolt = phaseData.getDouble("volt");
Double phaseCur = phaseData.getDouble("cur");
// 只取非0的值
if (phaseVolt != null && phaseVolt > 0) {
voltage = phaseVolt;
}
if (phaseCur != null && phaseCur > 0) {
current = phaseCur;
}
// 如果已经找到非0的值可以提前退出
if (voltage != null && voltage > 0 && current != null && current > 0) {
break;
}
} catch (Exception e) {
log.warn("解析相{}的电压电流数据失败: {}", phase, e.getMessage());
}
}
}
}
// 如果找到了非0的电压或电流更新到数据库
if ((voltage != null && voltage > 0) || (current != null && current > 0)) {
com.intc.fishery.domain.DeviceSwitch updateVoltCur = new com.intc.fishery.domain.DeviceSwitch();
updateVoltCur.setId(sw.getId());
if (voltage != null && voltage > 0) {
updateVoltCur.setDetectVoltageValue(voltage);
}
if (current != null && current > 0) {
updateVoltCur.setDetectElectricValue(current);
}
deviceSwitchMapper.updateById(updateVoltCur);
log.debug("更新开关电压电流: 设备={}, 开关索引={}, 电压={}V, 电流={}A",
deviceName, switchIndex, voltage, current);
}
} catch (Exception e) {
log.warn("处理开关{}的电压电流数据失败: {}", switchIndex, e.getMessage());
}
}
}
} catch (Exception e) {
log.error("处理开关数据失败[{}]: {}", deviceName, e.getMessage(), e);
}
}
/**
* 转换为 TDengine 传感器数据格式
*
@@ -676,14 +867,22 @@ public class DeviceDataHandler {
sensorData.setDeviceName(deviceName);
// 解析传感器数据(根据实际字段映射)
if (params.containsKey("dissolvedoxygen")) {
sensorData.setDissolvedOxygen(params.getDouble("dissolvedoxygen"));
// 支持小写和驼峰命名两种格式
if (params.containsKey("dissolvedoxygen") || params.containsKey("dissolvedOxygen")) {
Double value = params.containsKey("dissolvedOxygen") ?
params.getDouble("dissolvedOxygen") : params.getDouble("dissolvedoxygen");
sensorData.setDissolvedOxygen(value);
}
if (params.containsKey("temperature")) {
sensorData.setTemperature(params.getDouble("temperature"));
}
if (params.containsKey("saturability")) {
sensorData.setSaturability(params.getDouble("saturability"));
if (params.containsKey("currentTemperature")) {
sensorData.setTemperature(params.getDouble("currentTemperature"));
}
if (params.containsKey("saturability") || params.containsKey("dosat")) {
Double value = params.containsKey("dosat") ?
params.getDouble("dosat") : params.getDouble("saturability");
sensorData.setSaturability(value);
}
if (params.containsKey("ph")) {
sensorData.setPh(params.getDouble("ph"));
@@ -691,11 +890,15 @@ public class DeviceDataHandler {
if (params.containsKey("salinity")) {
sensorData.setSalinity(params.getDouble("salinity"));
}
if (params.containsKey("treference")) {
sensorData.setTreference(params.getDouble("treference"));
if (params.containsKey("treference") || params.containsKey("Treference")) {
Double value = params.containsKey("Treference") ?
params.getDouble("Treference") : params.getDouble("treference");
sensorData.setTreference(value);
}
if (params.containsKey("tfluorescence")) {
sensorData.setTfluorescence(params.getDouble("tfluorescence"));
if (params.containsKey("tfluorescence") || params.containsKey("Tfluorescence")) {
Double value = params.containsKey("Tfluorescence") ?
params.getDouble("Tfluorescence") : params.getDouble("tfluorescence");
sensorData.setTfluorescence(value);
}
if (params.containsKey("phasedifference")) {
sensorData.setPhaseDifference(params.getDouble("phasedifference"));