fix: 数据接入,控制器需要判断开关数据和溶解氧数据。
This commit is contained in:
@@ -12,16 +12,19 @@ import com.intc.iot.mapper.IotDeviceMapper;
|
||||
import com.intc.iot.service.VmsNoticeService;
|
||||
import com.intc.fishery.domain.Device;
|
||||
import com.intc.fishery.mapper.DeviceMapper;
|
||||
import com.intc.fishery.mapper.DeviceSwitchMapper;
|
||||
import com.intc.tdengine.domain.DeviceSensorData;
|
||||
import com.intc.tdengine.service.IDeviceSensorDataService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@@ -50,16 +53,17 @@ public class DeviceDataHandler {
|
||||
*/
|
||||
private final AquAlarmHistoryMapper alarmHistoryMapper;
|
||||
|
||||
/**
|
||||
* IoT设备 Mapper
|
||||
*/
|
||||
private final IotDeviceMapper iotDeviceMapper;
|
||||
|
||||
/**
|
||||
* 设备 Mapper
|
||||
*/
|
||||
private final DeviceMapper deviceMapper;
|
||||
|
||||
/**
|
||||
* 设备开关 Mapper
|
||||
*/
|
||||
private final DeviceSwitchMapper deviceSwitchMapper;
|
||||
|
||||
/**
|
||||
* VMS 语音通知服务
|
||||
*/
|
||||
@@ -67,6 +71,12 @@ public class DeviceDataHandler {
|
||||
|
||||
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
/**
|
||||
* 控制器产品编码(从配置文件读取)
|
||||
*/
|
||||
@Value("${device-type.control-integrated:a1Xj9dagTIx}")
|
||||
private String controllerProductKey;
|
||||
|
||||
/**
|
||||
* 呼叫状态:未呼叫
|
||||
*/
|
||||
@@ -132,29 +142,52 @@ public class DeviceDataHandler {
|
||||
|
||||
// 保存到 TDengine 时序数据库
|
||||
DeviceSensorData sensorData = null;
|
||||
if (params != null && params.size() > 3) {
|
||||
// // 检查 sensorerrorcode 是否为 6
|
||||
// Object errorCode = params.get("sensorerrorcode");
|
||||
// if (errorCode == null || !"6".equals(String.valueOf(errorCode))) {
|
||||
// log.debug("数据误码不为6,已过滤: {} (errorCode: {})", deviceName, errorCode);
|
||||
// return;
|
||||
// }
|
||||
|
||||
// 判断是否为控制器设备(使用配置文件中的ProductKey)
|
||||
boolean isController = controllerProductKey.equals(productKey);
|
||||
|
||||
if (params != null && params.size() > 0) {
|
||||
try {
|
||||
sensorData = convertToSensorData(productKey, deviceName, params);
|
||||
if (sensorData != null) {
|
||||
// 1. 插入到 TDengine
|
||||
deviceSensorDataService.batchInsertDeviceSensorData(java.util.Collections.singletonList(sensorData));
|
||||
log.debug("数据已保存: {}", deviceName);
|
||||
// 如果是控制器,需要区分处理
|
||||
if (isController) {
|
||||
// 判断数据类型(开关数据和传感器数据不会同时出现)
|
||||
if (hasSwitchData(params)) {
|
||||
// 开关数据:更新MySQL的开关状态和电压电流
|
||||
handleSwitchData(deviceName, params);
|
||||
} else if (hasSensorData(params)) {
|
||||
// 传感器数据:写入TDengine
|
||||
sensorData = convertToSensorData(productKey, deviceName, params);
|
||||
if (sensorData != null) {
|
||||
// 插入到 TDengine
|
||||
deviceSensorDataService.batchInsertDeviceSensorData(java.util.Collections.singletonList(sensorData));
|
||||
log.debug("控制器溶解氧数据已保存: {}", deviceName);
|
||||
|
||||
// 2. 更新设备表的实时数据
|
||||
updateDeviceRealTimeData(deviceName, sensorData);
|
||||
// 更新设备表的实时数据
|
||||
updateDeviceRealTimeData(deviceName, sensorData);
|
||||
}
|
||||
} else {
|
||||
// 既不是开关数据也不是传感器数据
|
||||
log.debug("控制器数据类型未知: {}", deviceName);
|
||||
}
|
||||
} else {
|
||||
// 非控制器(水质检测仪),正常处理
|
||||
if (params.size() > 3) {
|
||||
sensorData = convertToSensorData(productKey, deviceName, params);
|
||||
if (sensorData != null) {
|
||||
// 插入到 TDengine
|
||||
deviceSensorDataService.batchInsertDeviceSensorData(java.util.Collections.singletonList(sensorData));
|
||||
log.debug("数据已保存: {}", deviceName);
|
||||
|
||||
// 更新设备表的实时数据
|
||||
updateDeviceRealTimeData(deviceName, sensorData);
|
||||
}
|
||||
} else {
|
||||
log.debug("数据字段不足3个,已过滤: {} (字段数: {})", deviceName, params.size());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("保存数据失败[{}]: {}", deviceName, e.getMessage());
|
||||
}
|
||||
} else if (params != null && params.size() < 3) {
|
||||
log.debug("数据字段不足3个,已过滤: {} (字段数: {})", deviceName, params.size());
|
||||
}
|
||||
//
|
||||
// // 检查是否触发报警
|
||||
@@ -654,6 +687,164 @@ public class DeviceDataHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否包含传感器数据(溶解氧相关)
|
||||
*
|
||||
* @param params 参数
|
||||
* @return true: 包含传感器数据, false: 不包含
|
||||
*/
|
||||
private boolean hasSensorData(JSONObject params) {
|
||||
if (params == null) {
|
||||
return false;
|
||||
}
|
||||
// 判断是否包含溶解氧相关字段
|
||||
return params.containsKey("dissolvedOxygen")
|
||||
|| params.containsKey("dissolvedoxygen")
|
||||
|| params.containsKey("currentTemperature")
|
||||
|| params.containsKey("Tfluorescence")
|
||||
|| params.containsKey("Treference")
|
||||
|| params.containsKey("dosat");
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否包含开关数据(Switch1-4等)
|
||||
*
|
||||
* @param params 参数
|
||||
* @return true: 包含开关数据, false: 不包含
|
||||
*/
|
||||
private boolean hasSwitchData(JSONObject params) {
|
||||
if (params == null) {
|
||||
return false;
|
||||
}
|
||||
// 判断是否包含Switch1-4或switch1_VoltCur-switch4_VoltCur等字段
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
if (params.containsKey("Switch" + i)
|
||||
|| params.containsKey("switch" + i + "_VoltCur")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理开关状态数据
|
||||
* 更新MySQL中的设备开关状态
|
||||
*
|
||||
* @param deviceName 设备名称
|
||||
* @param params 参数
|
||||
*/
|
||||
private void handleSwitchData(String deviceName, JSONObject params) {
|
||||
try {
|
||||
// 查询设备
|
||||
Device device = deviceMapper.selectOne(
|
||||
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<Device>()
|
||||
.eq(Device::getSerialNum, deviceName)
|
||||
.select(Device::getId)
|
||||
);
|
||||
|
||||
if (device == null) {
|
||||
log.debug("设备不存在,无法更新开关状态: {}", deviceName);
|
||||
return;
|
||||
}
|
||||
|
||||
// 查询该设备的所有开关
|
||||
List<com.intc.fishery.domain.DeviceSwitch> switches = deviceSwitchMapper.selectList(
|
||||
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<com.intc.fishery.domain.DeviceSwitch>()
|
||||
.eq(com.intc.fishery.domain.DeviceSwitch::getDeviceId, device.getId())
|
||||
);
|
||||
|
||||
if (switches.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新每个开关的状态
|
||||
for (com.intc.fishery.domain.DeviceSwitch sw : switches) {
|
||||
Integer switchIndex = sw.getIndex();
|
||||
if (switchIndex == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String switchKey = "Switch" + switchIndex;
|
||||
if (params.containsKey(switchKey)) {
|
||||
Integer switchStatus = params.getInt(switchKey);
|
||||
if (switchStatus != null) {
|
||||
// 更新开关状态
|
||||
com.intc.fishery.domain.DeviceSwitch updateSwitch = new com.intc.fishery.domain.DeviceSwitch();
|
||||
updateSwitch.setId(sw.getId());
|
||||
updateSwitch.setIsOpen(switchStatus);
|
||||
deviceSwitchMapper.updateById(updateSwitch);
|
||||
|
||||
log.debug("更新开关状态: 设备={}, 开关索引={}, 状态={}",
|
||||
deviceName, switchIndex, switchStatus);
|
||||
}
|
||||
}
|
||||
|
||||
// 处理电压电流数据
|
||||
String voltCurKey = "switch" + switchIndex + "_VoltCur";
|
||||
if (params.containsKey(voltCurKey)) {
|
||||
try {
|
||||
JSONObject voltCurData = params.getJSONObject(voltCurKey);
|
||||
|
||||
// 解析A、B、C三相数据,找到非0的值
|
||||
Double voltage = null;
|
||||
Double current = null;
|
||||
|
||||
for (String phase : new String[]{"A", "B", "C"}) {
|
||||
if (voltCurData.containsKey(phase)) {
|
||||
String phaseDataStr = voltCurData.getStr(phase);
|
||||
if (phaseDataStr != null && !phaseDataStr.isEmpty()) {
|
||||
try {
|
||||
// 解析JSON字符串,例如: {"volt":227,"cur":0.00}
|
||||
JSONObject phaseData = JSONUtil.parseObj(phaseDataStr);
|
||||
Double phaseVolt = phaseData.getDouble("volt");
|
||||
Double phaseCur = phaseData.getDouble("cur");
|
||||
|
||||
// 只取非0的值
|
||||
if (phaseVolt != null && phaseVolt > 0) {
|
||||
voltage = phaseVolt;
|
||||
}
|
||||
if (phaseCur != null && phaseCur > 0) {
|
||||
current = phaseCur;
|
||||
}
|
||||
|
||||
// 如果已经找到非0的值,可以提前退出
|
||||
if (voltage != null && voltage > 0 && current != null && current > 0) {
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("解析相{}的电压电流数据失败: {}", phase, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果找到了非0的电压或电流,更新到数据库
|
||||
if ((voltage != null && voltage > 0) || (current != null && current > 0)) {
|
||||
com.intc.fishery.domain.DeviceSwitch updateVoltCur = new com.intc.fishery.domain.DeviceSwitch();
|
||||
updateVoltCur.setId(sw.getId());
|
||||
|
||||
if (voltage != null && voltage > 0) {
|
||||
updateVoltCur.setDetectVoltageValue(voltage);
|
||||
}
|
||||
if (current != null && current > 0) {
|
||||
updateVoltCur.setDetectElectricValue(current);
|
||||
}
|
||||
|
||||
deviceSwitchMapper.updateById(updateVoltCur);
|
||||
log.debug("更新开关电压电流: 设备={}, 开关索引={}, 电压={}V, 电流={}A",
|
||||
deviceName, switchIndex, voltage, current);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("处理开关{}的电压电流数据失败: {}", switchIndex, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理开关数据失败[{}]: {}", deviceName, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为 TDengine 传感器数据格式
|
||||
*
|
||||
@@ -676,14 +867,22 @@ public class DeviceDataHandler {
|
||||
sensorData.setDeviceName(deviceName);
|
||||
|
||||
// 解析传感器数据(根据实际字段映射)
|
||||
if (params.containsKey("dissolvedoxygen")) {
|
||||
sensorData.setDissolvedOxygen(params.getDouble("dissolvedoxygen"));
|
||||
// 支持小写和驼峰命名两种格式
|
||||
if (params.containsKey("dissolvedoxygen") || params.containsKey("dissolvedOxygen")) {
|
||||
Double value = params.containsKey("dissolvedOxygen") ?
|
||||
params.getDouble("dissolvedOxygen") : params.getDouble("dissolvedoxygen");
|
||||
sensorData.setDissolvedOxygen(value);
|
||||
}
|
||||
if (params.containsKey("temperature")) {
|
||||
sensorData.setTemperature(params.getDouble("temperature"));
|
||||
}
|
||||
if (params.containsKey("saturability")) {
|
||||
sensorData.setSaturability(params.getDouble("saturability"));
|
||||
if (params.containsKey("currentTemperature")) {
|
||||
sensorData.setTemperature(params.getDouble("currentTemperature"));
|
||||
}
|
||||
if (params.containsKey("saturability") || params.containsKey("dosat")) {
|
||||
Double value = params.containsKey("dosat") ?
|
||||
params.getDouble("dosat") : params.getDouble("saturability");
|
||||
sensorData.setSaturability(value);
|
||||
}
|
||||
if (params.containsKey("ph")) {
|
||||
sensorData.setPh(params.getDouble("ph"));
|
||||
@@ -691,11 +890,15 @@ public class DeviceDataHandler {
|
||||
if (params.containsKey("salinity")) {
|
||||
sensorData.setSalinity(params.getDouble("salinity"));
|
||||
}
|
||||
if (params.containsKey("treference")) {
|
||||
sensorData.setTreference(params.getDouble("treference"));
|
||||
if (params.containsKey("treference") || params.containsKey("Treference")) {
|
||||
Double value = params.containsKey("Treference") ?
|
||||
params.getDouble("Treference") : params.getDouble("treference");
|
||||
sensorData.setTreference(value);
|
||||
}
|
||||
if (params.containsKey("tfluorescence")) {
|
||||
sensorData.setTfluorescence(params.getDouble("tfluorescence"));
|
||||
if (params.containsKey("tfluorescence") || params.containsKey("Tfluorescence")) {
|
||||
Double value = params.containsKey("Tfluorescence") ?
|
||||
params.getDouble("Tfluorescence") : params.getDouble("tfluorescence");
|
||||
sensorData.setTfluorescence(value);
|
||||
}
|
||||
if (params.containsKey("phasedifference")) {
|
||||
sensorData.setPhaseDifference(params.getDouble("phasedifference"));
|
||||
|
||||
Reference in New Issue
Block a user