diff --git a/intc-admin/pom.xml b/intc-admin/pom.xml
index 54c3ee2..3f5f7ac 100644
--- a/intc-admin/pom.xml
+++ b/intc-admin/pom.xml
@@ -108,6 +108,12 @@
com.intc
intc-workflow
+
+
+ com.intc
+ intc-iot
+ ${revision}
+
de.codecentric
diff --git a/intc-admin/src/main/resources/alarm-config-example.yml b/intc-admin/src/main/resources/alarm-config-example.yml
new file mode 100644
index 0000000..534b3c8
--- /dev/null
+++ b/intc-admin/src/main/resources/alarm-config-example.yml
@@ -0,0 +1,32 @@
+# ==================== 告警配置示例 ====================
+# 将以下配置添加到 application.yml 或 application-dev.yml 中
+
+# 告警阈值配置
+alarm:
+ threshold:
+ # 溶解氧阈值(mg/L)
+ dissolved-oxygen:
+ min: 4.0 # 最低阈值
+ max: 15.0 # 最高阈值
+
+ # 温度阈值(℃)
+ temperature:
+ min: 10.0 # 最低阈值
+ max: 35.0 # 最高阈值
+
+ # pH值阈值
+ ph:
+ min: 6.5 # 最低阈值
+ max: 8.5 # 最高阈值
+
+ # 盐度阈值
+ salinity:
+ max: 35.0 # 最高阈值
+
+ # 电池电量阈值(%)
+ battery:
+ min: 20.0 # 最低阈值
+
+ # 告警通知配置
+ notification:
+ interval: 30 # 告警通知间隔时间(分钟),同一设备在此时间内不会重复发送通知
diff --git a/intc-admin/src/main/resources/application-dev.yml b/intc-admin/src/main/resources/application-dev.yml
index fdc08b6..fceea89 100644
--- a/intc-admin/src/main/resources/application-dev.yml
+++ b/intc-admin/src/main/resources/application-dev.yml
@@ -13,7 +13,7 @@ spring.boot.admin.client:
--- # snail-job 配置
snail-job:
- enabled: true
+ enabled: false
# 需要在 SnailJob 后台组管理创建对应名称的组,然后创建任务的时候选择对应的组,才能正确分派任务
group: "ruoyi_group"
# SnailJob 接入验证令牌 详见 script/sql/ry_job.sql `sj_group_config` 表
@@ -35,7 +35,7 @@ spring:
# 动态数据源文档 https://www.kancloud.cn/tracy5546/dynamic-datasource/content
dynamic:
# 性能分析插件(有性能损耗 不建议生产环境使用)
- p6spy: true
+ p6spy: false
# 设置默认的数据源或者数据源组,默认值即为 master
primary: master
# 严格模式 匹配不到数据源则报错
@@ -204,6 +204,44 @@ sms:
code-template-id: 'SMS_465720430' # TODO: 请填写您的阿里云短信模板CODE
+--- # 阿里云生活物联网平台(飞燕平台)配置
+aliyun:
+ living-iot:
+ # 阿里云 AccessKey ID(必填)
+ access-key-id: LTAI5tRnPowmTLjH181nSbsR
+ # 阿里云 AccessKey Secret(必填)
+ access-key-secret: Vh2LoAM1t3XuMUVy2wTWSACJ97kOUW
+ # 地域节点(必填)
+ region-id: cn-shanghai
+ # 飞燕平台项目ID(Project ID)
+ project-id: a123nMibvh0q4UnU
+ # App Key(用于 API 调用)
+ app-key: 334224397
+ # App Secret
+ app-secret: 70de3018ec39423e9ca1e1b6a6a84ad6
+ # AMQP 服务端订阅配置(使用数据同步的 AppKey + AppSecret)
+ amqp:
+ # 是否启用
+ enabled: true
+ # 数据同步 AppKey(与上面的 app-key 不同!)
+ data-sync-app-key: 334224409
+ # 数据同步 AppSecret(请在阿里云控制台查看)
+ data-sync-app-secret: 17fdd58f9a4c4c90be236897b1f8e8f7
+ # AMQP 接入点地址(使用生活物联网平台官方地址)
+ endpoint: amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671
+ # 消费组 ID(按照官方文档,应该与数据同步 AppKey 相同)
+ consumer-group-id: 334224409
+ # 客户端 ID(建议使用机器 UUID、MAC 地址等唯一标识)
+ client-id: fishery-backend-001
+ # 连接超时时间(毫秒)
+ connection-timeout: 80000
+ # 是否自动重连
+ auto-reconnect: true
+ # 最大重连次数
+ max-reconnect-attempts: 10
+ # 重连延迟(毫秒)
+ reconnect-delay: 30000
+
--- # 三方授权
justauth:
# 前端外网访问地址
diff --git a/intc-admin/src/main/resources/application.yml b/intc-admin/src/main/resources/application.yml
index aed01f2..9f29768 100644
--- a/intc-admin/src/main/resources/application.yml
+++ b/intc-admin/src/main/resources/application.yml
@@ -35,7 +35,7 @@ captcha:
# 日志配置
logging:
level:
- org.dromara: @logging.level@
+ org.dromara: info
org.springframework: warn
org.mybatis.spring.mapper: error
org.apache.fury: warn
@@ -69,7 +69,7 @@ spring:
# 国际化资源文件路径
basename: i18n/messages
profiles:
- active: @profiles.active@
+ active: dev
# 文件上传
servlet:
multipart:
@@ -118,6 +118,9 @@ security:
- /*/api-docs
- /*/api-docs/**
- /warm-flow-ui/config
+ - /iot/test
+ - /iot/device/**
+ - /iot/amqp/**
# 多租户配置
tenant:
diff --git a/intc-common/intc-common-mybatis/src/main/resources/spy.properties b/intc-common/intc-common-mybatis/src/main/resources/spy.properties
index f3ed7d8..dc6e699 100644
--- a/intc-common/intc-common-mybatis/src/main/resources/spy.properties
+++ b/intc-common/intc-common-mybatis/src/main/resources/spy.properties
@@ -17,4 +17,4 @@ databaseDialectTimestampFormat=yyyy-MM-dd HH:mm:ss
# 是否过滤 Log
filter=true
# 过滤 Log 时所排除的 sql 关键字,以逗号分隔
-exclude=
+exclude=information_schema.ins_tables
diff --git a/intc-modules/intc-iot/pom.xml b/intc-modules/intc-iot/pom.xml
index 5f33d62..b37a80a 100644
--- a/intc-modules/intc-iot/pom.xml
+++ b/intc-modules/intc-iot/pom.xml
@@ -31,11 +31,40 @@
7.46.0
-
+
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.5
+ true
+
+
+
+
+ org.apache.qpid
+ qpid-jms-client
+ 0.53.0
+
+
+
+
+ commons-codec
+ commons-codec
+ 1.15
+
+
+
+
+ com.aliyun.mns
+ aliyun-sdk-mns
+ 1.1.9.2
+
+
+
+
+ com.aliyun
+ aliyun-java-sdk-dyvmsapi
+ 1.1.0
@@ -81,11 +110,30 @@
intc-common-web
+
+ com.intc
+ intc-common-excel
+
+
com.intc
intc-common-tenant
+
+
+ com.intc
+ intc-tdengine
+ 5.5.0
+ compile
+
+
+ com.intc
+ intc-fishery
+ 5.5.0
+ compile
+
+
diff --git a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotConfiguration.java b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotConfiguration.java
index a8af4d3..f40b6ef 100644
--- a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotConfiguration.java
+++ b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotConfiguration.java
@@ -6,6 +6,7 @@ import com.aliyuncs.profile.DefaultProfile;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -17,6 +18,7 @@ import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@RequiredArgsConstructor
+@EnableConfigurationProperties(AliyunIotProperties.class)
@ConditionalOnProperty(prefix = "aliyun.living-iot", name = "access-key-id")
public class AliyunIotConfiguration {
diff --git a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotProperties.java b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotProperties.java
index 475a573..a1b4888 100644
--- a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotProperties.java
+++ b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AliyunIotProperties.java
@@ -2,7 +2,6 @@ package com.intc.iot.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
/**
* 阿里云生活物联网平台(飞燕平台)配置属性
@@ -10,7 +9,6 @@ import org.springframework.stereotype.Component;
* @author intc
*/
@Data
-@Component
@ConfigurationProperties(prefix = "aliyun.living-iot")
public class AliyunIotProperties {
@@ -50,10 +48,15 @@ public class AliyunIotProperties {
private String categoryKey;
/**
- * MQTT 配置
+ * MQTT 配置(可选,用于设备直连)
*/
private MqttConfig mqtt = new MqttConfig();
+ /**
+ * AMQP 服务端订阅配置(推荐)
+ */
+ private AmqpConfig amqp = new AmqpConfig();
+
@Data
public static class MqttConfig {
/**
@@ -97,4 +100,62 @@ public class AliyunIotProperties {
private Boolean cleanSession = true;
}
+ /**
+ * AMQP 服务端订阅配置(使用数据同步的 AppKey + AppSecret)
+ */
+ @Data
+ public static class AmqpConfig {
+ /**
+ * 是否启用 AMQP 订阅
+ */
+ private Boolean enabled = false;
+
+ /**
+ * 数据同步 AppKey(与外层的 appKey 不同!)
+ */
+ private String dataSyncAppKey;
+
+ /**
+ * 数据同步 AppSecret
+ */
+ private String dataSyncAppSecret;
+
+ /**
+ * AMQP 接入点地址(使用 amqps 协议)
+ * 中国内地:amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671
+ * 新加坡:amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671
+ */
+ private String endpoint;
+
+ /**
+ * 消费组 ID
+ */
+ private String consumerGroupId;
+
+ /**
+ * 客户端 ID(建议使用机器 UUID、MAC 地址等唯一标识)
+ */
+ private String clientId;
+
+ /**
+ * 连接超时时间(毫秒)
+ */
+ private Integer connectionTimeout = 80000;
+
+ /**
+ * 自动重连
+ */
+ private Boolean autoReconnect = true;
+
+ /**
+ * 最大重连次数
+ */
+ private Integer maxReconnectAttempts = 10;
+
+ /**
+ * 重连延迟(毫秒)
+ */
+ private Integer reconnectDelay = 30000;
+ }
+
}
diff --git a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AmqpConfiguration.java b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AmqpConfiguration.java
new file mode 100644
index 0000000..32cde01
--- /dev/null
+++ b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/AmqpConfiguration.java
@@ -0,0 +1,254 @@
+package com.intc.iot.config;
+
+import com.intc.iot.service.AmqpMessageHandler;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import org.apache.commons.codec.binary.Hex;
+import java.net.URI;
+import java.util.Hashtable;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * 阿里云 IoT AMQP 服务端订阅配置
+ *
+ * 使用说明:
+ * 1. 在阿里云IoT控制台开启设备数据同步
+ * 2. 配置 AMQP 连接参数(endpoint、consumerGroupId 等)
+ * 3. 启用配置:aliyun.living-iot.amqp.enabled=true
+ *
+ * 注意:
+ * - 阿里云生活物联网平台使用 AMQP 1.0 协议
+ * - 使用 AppKey + AppSecret 认证(不是 AccessKey)
+ * - 连接地址:amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671
+ *
+ * @author intc
+ */
+@Slf4j
+@Configuration
+@RequiredArgsConstructor
+@EnableConfigurationProperties(AliyunIotProperties.class)
+@ConditionalOnProperty(prefix = "aliyun.living-iot.amqp", name = "enabled", havingValue = "true")
+public class AmqpConfiguration {
+
+ private final AliyunIotProperties iotProperties;
+ private final AmqpMessageHandler messageHandler;
+
+ /**
+ * 创建 AMQP 1.0 连接(使用 JMS)
+ */
+ @Bean
+ public Connection amqpConnection() throws Exception {
+ AliyunIotProperties.AmqpConfig amqp = iotProperties.getAmqp();
+ // 使用数据同步的 AppKey/AppSecret 进行 AMQP 认证
+ String dataSyncAppKey = amqp.getDataSyncAppKey();
+ String dataSyncAppSecret = amqp.getDataSyncAppSecret();
+
+ log.info("========== AMQP 连接配置 ==========");
+ log.info("Endpoint: {}", amqp.getEndpoint());
+ log.info("数据同步 AppKey: {}", dataSyncAppKey);
+ log.info("数据同步 AppSecret: {}...", dataSyncAppSecret != null && dataSyncAppSecret.length() > 4
+ ? dataSyncAppSecret.substring(0, 4) + "****" : "NULL");
+ log.info("ConsumerGroupId: {}", amqp.getConsumerGroupId());
+ log.info("ClientId: {}", amqp.getClientId());
+ log.info("ConnectionTimeout: {}ms", amqp.getConnectionTimeout());
+ log.info("====================================");
+
+ try {
+ // 生成随机数(按照官方示例)
+ long random = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+
+ // 构建用户名(完全按照官方示例)
+ String userName = amqp.getClientId() + "|authMode=appkey"
+ + ",signMethod=SHA256"
+ + ",random=" + random
+ + ",appKey=" + dataSyncAppKey
+ + ",groupId=" + amqp.getConsumerGroupId() + "|";
+
+ // 计算密码(完全按照官方示例)
+ String signContent = "random=" + random;
+ String password = doSign(signContent, dataSyncAppSecret, "HmacSHA256");
+
+ log.info("用户名: {}", userName);
+ log.info("签名内容: {}", signContent);
+ log.info("密码(完整): {}", password);
+ log.info("密码长度: {}", password.length());
+ log.info("ConsumerGroupId(用于groupId): {}", amqp.getConsumerGroupId());
+
+ // 构建连接 URL(带故障转移和 SSL 配置)
+ String connectionUrl = String.format(
+ "failover:(%s?amqp.idleTimeout=%d&transport.verifyHost=false)?failover.maxReconnectAttempts=%d&failover.reconnectDelay=%d",
+ amqp.getEndpoint(),
+ amqp.getConnectionTimeout(),
+ amqp.getMaxReconnectAttempts(),
+ amqp.getReconnectDelay()
+ );
+
+ log.info("连接 URL: {}", connectionUrl);
+
+ // 使用 JNDI 初始化连接
+ Hashtable hashtable = new Hashtable<>();
+ hashtable.put("connectionfactory.SBCF", connectionUrl);
+ hashtable.put("queue.QUEUE", "default");
+ hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+
+ Context context = new InitialContext(hashtable);
+ ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
+ Destination queue = (Destination) context.lookup("QUEUE");
+
+ log.info("正在连接到 AMQP 服务器...");
+ log.info("请耐必等待,连接可能需要几秒...");
+
+ // 创建连接
+ Connection connection = cf.createConnection(userName, password);
+
+ // 添加连接监听器
+ if (connection instanceof JmsConnection) {
+ ((JmsConnection) connection).addConnectionListener(new JmsConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ log.info("✅ AMQP 连接建立成功: {}", remoteURI);
+ }
+
+ @Override
+ public void onConnectionFailure(Throwable error) {
+ log.error("❌ AMQP 连接失败回调");
+ log.error("失败原因: {}", error.getMessage());
+ log.error("异常类型: {}", error.getClass().getName());
+ if (error.getCause() != null) {
+ log.error("根本原因: {}", error.getCause().getMessage());
+ log.error("根本原因类型: {}", error.getCause().getClass().getName());
+ }
+ // 打印完整堆栈
+ log.error("完整异常信息:", error);
+ }
+
+ @Override
+ public void onConnectionInterrupted(URI remoteURI) {
+ log.warn("⚠️ AMQP 连接中断: {}", remoteURI);
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ log.info("✅ AMQP 连接恢复: {}", remoteURI);
+ }
+
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
+
+ @Override
+ public void onSessionClosed(Session session, Throwable cause) {}
+
+ @Override
+ public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
+
+ @Override
+ public void onProducerClosed(MessageProducer producer, Throwable cause) {}
+ });
+ }
+
+ // 启动连接
+ connection.start();
+ log.info("✅ AMQP 连接创建成功!");
+
+ // 启动消息消费
+ startConsuming(connection, queue);
+
+ return connection;
+ } catch (Exception e) {
+ log.error("❌ AMQP 连接失败", e);
+ log.error("===== 详细错误信息 =====");
+ log.error("异常类型: {}", e.getClass().getName());
+ log.error("异常信息: {}", e.getMessage());
+
+ // 打印完整异常链
+ Throwable cause = e.getCause();
+ int level = 1;
+ while (cause != null) {
+ log.error("原因 {} - 类型: {}", level, cause.getClass().getName());
+ log.error("原因 {} - 信息: {}", level, cause.getMessage());
+ cause = cause.getCause();
+ level++;
+ }
+
+ log.error("===== 配置信息 =====");
+ log.error(" - Endpoint: {}", amqp.getEndpoint());
+ log.error(" - 数据同步 AppKey: {}", dataSyncAppKey);
+ log.error(" - 数据同步 AppSecret: {}...", dataSyncAppSecret != null && dataSyncAppSecret.length() > 4 ? dataSyncAppSecret.substring(0, 4) + "****" : "NULL");
+ log.error(" - ConsumerGroupId: {}", amqp.getConsumerGroupId());
+ log.error(" - ClientId: {}", amqp.getClientId());
+
+ log.error("===== 可能原因 =====");
+ log.error("1. 网络无法连接到服务器: {}", amqp.getEndpoint());
+ log.error("2. 数据同步 AppKey 或 AppSecret 错误");
+ log.error("3. 消费组 ID 不存在或未授权");
+ log.error("4. 未开启设备数据同步");
+ log.error("5. 防火墙阻止了端口 5671");
+ log.error("6. 签名算法或格式错误");
+
+ throw new RuntimeException("创建 AMQP 连接失败: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 启动消息消费
+ */
+ private void startConsuming(Connection connection, Destination queue) {
+ new Thread(() -> {
+ try {
+ // 创建会话(自动 ACK)
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // 创建消费者
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ log.info("开始消费 AMQP 消息...");
+
+ // 设置消息监听器
+ consumer.setMessageListener(message -> {
+ try {
+ // 阿里云 IoT 发送的是 BytesMessage
+ byte[] body = message.getBody(byte[].class);
+ String content = new String(body, "UTF-8");
+
+ // 处理消息(异步)
+ messageHandler.handleMessage(content);
+
+ } catch (Exception e) {
+ log.error("处理 AMQP 消息失败", e);
+ }
+ });
+
+ } catch (Exception e) {
+ log.error("启动 AMQP 消费失败", e);
+ }
+ }, "AMQP-Consumer").start();
+ }
+
+ /**
+ * 计算签名(完全按照官方示例)
+ */
+ private String doSign(String toSignString, String secret, String signMethod) throws Exception {
+ SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
+ Mac mac = Mac.getInstance(signMethod);
+ mac.init(signingKey);
+ byte[] rawHmac = mac.doFinal(toSignString.getBytes());
+
+ // 使用 Hex 编码(按照官方示例)
+ return Hex.encodeHexString(rawHmac);
+ }
+
+}
diff --git a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/MqttConfiguration.java b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/MqttConfiguration.java
index e768c07..81ada4a 100644
--- a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/MqttConfiguration.java
+++ b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/MqttConfiguration.java
@@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -17,6 +18,7 @@ import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@RequiredArgsConstructor
+@ConditionalOnClass(MqttClient.class)
@ConditionalOnProperty(prefix = "aliyun.living-iot.mqtt", name = "broker-url")
public class MqttConfiguration {
@@ -24,26 +26,40 @@ public class MqttConfiguration {
/**
* 创建 MQTT 客户端
+ * 添加容错机制,连接失败时不影响应用启动
*/
@Bean
- public MqttClient mqttClient() throws Exception {
+ public MqttClient mqttClient() {
AliyunIotProperties.MqttConfig mqtt = iotProperties.getMqtt();
- MemoryPersistence persistence = new MemoryPersistence();
- MqttClient client = new MqttClient(mqtt.getBrokerUrl(), mqtt.getClientId(), persistence);
+ try {
+ MemoryPersistence persistence = new MemoryPersistence();
+ MqttClient client = new MqttClient(mqtt.getBrokerUrl(), mqtt.getClientId(), persistence);
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(mqtt.getUsername());
- options.setPassword(mqtt.getPassword().toCharArray());
- options.setConnectionTimeout(mqtt.getConnectionTimeout());
- options.setKeepAliveInterval(mqtt.getKeepAliveInterval());
- options.setAutomaticReconnect(mqtt.getAutoReconnect());
- options.setCleanSession(mqtt.getCleanSession());
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setUserName(mqtt.getUsername());
+ options.setPassword(mqtt.getPassword().toCharArray());
+ options.setConnectionTimeout(mqtt.getConnectionTimeout());
+ options.setKeepAliveInterval(mqtt.getKeepAliveInterval());
+ options.setAutomaticReconnect(mqtt.getAutoReconnect());
+ options.setCleanSession(mqtt.getCleanSession());
- client.connect(options);
- log.info("MQTT 客户端连接成功,Broker: {}", mqtt.getBrokerUrl());
-
- return client;
+ client.connect(options);
+ log.info("MQTT 客户端连接成功,Broker: {}", mqtt.getBrokerUrl());
+
+ return client;
+ } catch (Exception e) {
+ log.error("MQTT 客户端连接失败,将返回未连接的客户端: {}", e.getMessage());
+
+ try {
+ // 返回未连接的客户端,由 MqttServiceImpl 中的自动重连机制处理
+ MemoryPersistence persistence = new MemoryPersistence();
+ return new MqttClient(mqtt.getBrokerUrl(), mqtt.getClientId(), persistence);
+ } catch (Exception ex) {
+ log.error("MQTT 客户端创建失败,请检查配置: {}", ex.getMessage());
+ throw new RuntimeException("创建 MQTT 客户端失败", ex);
+ }
+ }
}
}
diff --git a/intc-modules/intc-iot/src/main/java/com/intc/iot/config/VmsMnsAutoStarter.java b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/VmsMnsAutoStarter.java
new file mode 100644
index 0000000..79a532e
--- /dev/null
+++ b/intc-modules/intc-iot/src/main/java/com/intc/iot/config/VmsMnsAutoStarter.java
@@ -0,0 +1,41 @@
+package com.intc.iot.config;
+
+import com.intc.iot.service.VmsMnsConsumerService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.stereotype.Component;
+
+import jakarta.annotation.PreDestroy;
+
+/**
+ * VMS MNS 消费服务自动启动器
+ * 在 Spring 容器启动后自动启动 MNS 消费
+ *
+ * @author intc-iot
+ */
+@Component
+@RequiredArgsConstructor
+@ConditionalOnBean(VmsMnsConsumerService.class)
+@Slf4j
+public class VmsMnsAutoStarter implements ApplicationRunner {
+
+ private final VmsMnsConsumerService vmsMnsConsumerService;
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ log.info("自动启动 VMS MNS 回执消费服务");
+ vmsMnsConsumerService.start();
+ }
+
+ /**
+ * 容器销毁时停止服务
+ */
+ @PreDestroy
+ public void onDestroy() {
+ log.info("容器销毁,停止 VMS MNS 回执消费服务");
+ vmsMnsConsumerService.stop();
+ }
+}
diff --git a/intc-modules/intc-iot/src/main/java/com/intc/iot/controller/IotController.java b/intc-modules/intc-iot/src/main/java/com/intc/iot/controller/IotController.java
index ef79aac..a328ebb 100644
--- a/intc-modules/intc-iot/src/main/java/com/intc/iot/controller/IotController.java
+++ b/intc-modules/intc-iot/src/main/java/com/intc/iot/controller/IotController.java
@@ -1,10 +1,20 @@
package com.intc.iot.controller;
import com.intc.common.core.domain.R;
+import com.intc.common.mybatis.core.page.PageQuery;
+import com.intc.common.mybatis.core.page.TableDataInfo;
import com.intc.common.web.core.BaseController;
+import com.intc.iot.domain.bo.DeviceRealtimeDataBo;
+import com.intc.iot.domain.vo.DeviceRealtimeDataVo;
import com.intc.iot.service.DeviceDataService;
+import com.intc.iot.service.DeviceRealtimeDataService;
+import com.intc.iot.service.DeviceStatusService;
import com.intc.iot.service.IotDeviceService;
import com.intc.iot.service.MqttService;
+import com.intc.iot.service.VmsMnsConsumerService;
+import com.intc.iot.service.VmsNoticeService;
+import com.intc.iot.service.WarnCallNoticeService;
+import com.intc.iot.utils.AliyunAmqpSignUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -36,22 +46,115 @@ public class IotController extends BaseController {
@Autowired(required = false)
private DeviceDataService deviceDataService;
+ @Autowired(required = false)
+ private DeviceRealtimeDataService deviceRealtimeDataService;
+
+ @Autowired(required = false)
+ private DeviceStatusService deviceStatusService;
+
+ @Autowired(required = false)
+ private VmsNoticeService vmsNoticeService;
+
+ @Autowired(required = false)
+ private VmsMnsConsumerService vmsMnsConsumerService;
+
+ @Autowired(required = false)
+ private WarnCallNoticeService warnCallNoticeService;
+
+ @Autowired(required = false)
+ private javax.jms.Connection amqpConnection;
+
@Operation(summary = "测试接口")
@GetMapping("/test")
public R test() {
return R.ok("飞燕平台模块测试成功!");
}
+ @Operation(summary = "查询 AMQP 连接状态")
+ @GetMapping("/amqp/status")
+ public R