reactor:【IoT 物联网】合并 messagebus 和 common 包,统一到 core 包
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
package cn.iocoder.yudao.module.iot.core.biz;
|
||||
|
||||
// TODO @芋艿:待实现
|
||||
public interface IotDeviceCommonApi {
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package cn.iocoder.yudao.module.iot.core.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
// TODO @芋艿:需要添加对应的 DTO,以及上下行的链路,网关、网关服务、设备等
|
||||
|
||||
/**
|
||||
* IoT 设备消息标识符枚举
|
||||
*/
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public enum IotDeviceMessageIdentifierEnum {
|
||||
|
||||
PROPERTY_GET("get"), // 下行 TODO 芋艿:【讨论】貌似这个“上行”更合理?device 主动拉取配置。和 IotDevicePropertyGetReqDTO 一样的配置
|
||||
PROPERTY_SET("set"), // 下行
|
||||
PROPERTY_REPORT("report"), // 上行
|
||||
|
||||
STATE_ONLINE("online"), // 上行
|
||||
STATE_OFFLINE("offline"), // 上行
|
||||
|
||||
CONFIG_GET("get"), // 上行 TODO 芋艿:【讨论】暂时没有上行的场景
|
||||
CONFIG_SET("set"), // 下行
|
||||
|
||||
SERVICE_INVOKE("${identifier}"), // 下行
|
||||
SERVICE_REPLY_SUFFIX("_reply"), // 芋艿:TODO 芋艿:【讨论】上行 or 下行
|
||||
|
||||
OTA_UPGRADE("upgrade"), // 下行
|
||||
OTA_PULL("pull"), // 上行
|
||||
OTA_PROGRESS("progress"), // 上行
|
||||
OTA_REPORT("report"), // 上行
|
||||
|
||||
REGISTER_REGISTER("register"), // 上行
|
||||
REGISTER_REGISTER_SUB("register_sub"), // 上行
|
||||
REGISTER_UNREGISTER_SUB("unregister_sub"), // 下行
|
||||
|
||||
TOPOLOGY_ADD("topology_add"), // 下行;
|
||||
;
|
||||
|
||||
/**
|
||||
* 标志符
|
||||
*/
|
||||
private final String identifier;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package cn.iocoder.yudao.module.iot.core.enums;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* IoT 设备消息类型枚举
|
||||
*/
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public enum IotDeviceMessageTypeEnum implements ArrayValuable<String> {
|
||||
|
||||
STATE("state"), // 设备状态
|
||||
PROPERTY("property"), // 设备属性:可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services 设备属性、事件、服务
|
||||
EVENT("event"), // 设备事件:可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services 设备属性、事件、服务
|
||||
SERVICE("service"), // 设备服务:可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services 设备属性、事件、服务
|
||||
CONFIG("config"), // 设备配置:可参考 https://help.aliyun.com/zh/iot/user-guide/remote-configuration-1 远程配置
|
||||
OTA("ota"), // 设备 OTA:可参考 https://help.aliyun.com/zh/iot/user-guide/ota-update OTA 升级
|
||||
REGISTER("register"), // 设备注册:可参考 https://help.aliyun.com/zh/iot/user-guide/register-devices 设备身份注册
|
||||
TOPOLOGY("topology"),; // 设备拓扑:可参考 https://help.aliyun.com/zh/iot/user-guide/manage-topological-relationships 设备拓扑
|
||||
|
||||
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageTypeEnum::getType).toArray(String[]::new);
|
||||
|
||||
/**
|
||||
* 属性
|
||||
*/
|
||||
private final String type;
|
||||
|
||||
@Override
|
||||
public String[] array() {
|
||||
return ARRAYS;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.config;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.local.LocalIotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq.RocketMQIotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* IoT 消息总线自动配置
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@EnableConfigurationProperties(IotMessageBusProperties.class)
|
||||
@Slf4j
|
||||
public class IotMessageBusAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public IotDeviceMessageProducer deviceMessageProducer(IotMessageBus messageBus) {
|
||||
return new IotDeviceMessageProducer(messageBus);
|
||||
}
|
||||
|
||||
// ==================== Local 实现 ====================
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true)
|
||||
public static class LocalIotMessageBusConfiguration {
|
||||
|
||||
@Bean
|
||||
public IotMessageBus localIotMessageBus(ApplicationContext applicationContext) {
|
||||
log.info("[localIotMessageBus][创建 Local IoT 消息总线]");
|
||||
return new LocalIotMessageBus(applicationContext);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ==================== RocketMQ 实现 ====================
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rocketmq")
|
||||
@ConditionalOnClass(RocketMQTemplate.class)
|
||||
public static class RocketMQIotMessageBusConfiguration {
|
||||
|
||||
@Bean
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public IotMessageBus rocketMQIotMessageBus(RocketMQProperties rocketMQProperties, RocketMQTemplate rocketMQTemplate) {
|
||||
log.info("[rocketMQIotMessageBus][创建 RocketMQ IoT 消息总线]");
|
||||
return new RocketMQIotMessageBus(rocketMQProperties, rocketMQTemplate);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* IoT 消息总线配置属性
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@ConfigurationProperties("yudao.iot.message-bus")
|
||||
@Data
|
||||
@Validated
|
||||
public class IotMessageBusProperties {
|
||||
|
||||
/**
|
||||
* 消息总线类型
|
||||
*
|
||||
* 可选值:local、redis、rocketmq、rabbitmq
|
||||
*/
|
||||
@NotNull(message = "IoT 消息总线类型不能为空")
|
||||
private String type = "local";
|
||||
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core;
|
||||
|
||||
/**
|
||||
* IoT 消息总线接口
|
||||
*
|
||||
* 用于在 IoT 系统中发布和订阅消息,支持多种消息中间件实现
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface IotMessageBus {
|
||||
|
||||
/**
|
||||
* 发布消息到消息总线
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param message 消息内容
|
||||
*/
|
||||
void post(String topic, Object message);
|
||||
|
||||
/**
|
||||
* 注册消息订阅者
|
||||
*
|
||||
* @param subscriber 订阅者
|
||||
*/
|
||||
void register(IotMessageBusSubscriber<?> subscriber);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core;
|
||||
|
||||
/**
|
||||
* IoT 消息总线订阅者接口
|
||||
*
|
||||
* 用于处理从消息总线接收到的消息
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface IotMessageBusSubscriber<T> {
|
||||
|
||||
/**
|
||||
* @return 主题
|
||||
*/
|
||||
String getTopic();
|
||||
|
||||
/**
|
||||
* @return 分组
|
||||
*/
|
||||
String getGroup();
|
||||
|
||||
/**
|
||||
* 处理接收到的消息
|
||||
*
|
||||
* @param message 消息内容
|
||||
*/
|
||||
void onMessage(T message);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core.local;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class LocalIotMessage {
|
||||
|
||||
private String topic;
|
||||
|
||||
private Object message;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core.local;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.event.EventListener;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 本地的 {@link IotMessageBus} 实现类
|
||||
*
|
||||
* 注意:仅适用于单机场景!!!
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class LocalIotMessageBus implements IotMessageBus {
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
/**
|
||||
* 订阅者映射表
|
||||
* Key: topic
|
||||
*/
|
||||
private final Map<String, List<IotMessageBusSubscriber<?>>> subscribers = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void post(String topic, Object message) {
|
||||
applicationContext.publishEvent(new LocalIotMessage(topic, message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(IotMessageBusSubscriber<?> subscriber) {
|
||||
String topic = subscriber.getTopic();
|
||||
List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new ArrayList<>());
|
||||
topicSubscribers.add(subscriber);
|
||||
log.info("[register][topic({}/{}) 注册消费者({})成功]",
|
||||
topic, subscriber.getGroup(), subscriber.getClass().getName());
|
||||
}
|
||||
|
||||
@EventListener
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public void onMessage(LocalIotMessage message) {
|
||||
String topic = message.getTopic();
|
||||
List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.get(topic);
|
||||
if (CollUtil.isEmpty(topicSubscribers)) {
|
||||
return;
|
||||
}
|
||||
for (IotMessageBusSubscriber subscriber : topicSubscribers) {
|
||||
try {
|
||||
subscriber.onMessage(message.getMessage());
|
||||
} catch (Exception ex) {
|
||||
log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]",
|
||||
subscriber.getTopic(), subscriber.getGroup(), message.getMessage(), subscriber.getClass().getName(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
|
||||
import jakarta.annotation.PreDestroy;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 基于 RocketMQ 的 {@link IotMessageBus} 实现类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class RocketMQIotMessageBus implements IotMessageBus {
|
||||
|
||||
private final RocketMQProperties rocketMQProperties;
|
||||
|
||||
private final RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 主题对应的消费者映射
|
||||
*/
|
||||
private final List<DefaultMQPushConsumer> topicConsumers = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void post(String topic, Object message) {
|
||||
SendResult result = rocketMQTemplate.syncSend(topic, JsonUtils.toJsonString(message));
|
||||
log.info("[post][topic({}) 发送消息({}) result({})]", topic, message, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public void register(IotMessageBusSubscriber<?> subscriber) {
|
||||
Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
|
||||
// 1.1 创建 DefaultMQPushConsumer
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
|
||||
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
|
||||
consumer.setConsumerGroup(subscriber.getGroup());
|
||||
// 1.2 订阅主题
|
||||
consumer.subscribe(subscriber.getTopic(), "*");
|
||||
// 1.3 设置消息监听器
|
||||
consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> {
|
||||
for (MessageExt messageExt : messages) {
|
||||
try {
|
||||
byte[] body = messageExt.getBody();
|
||||
subscriber.onMessage(JsonUtils.parseObject(body, type));
|
||||
} catch (Exception ex) {
|
||||
log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]",
|
||||
subscriber.getTopic(), subscriber.getGroup(), messageExt, subscriber.getClass().getName(), ex);
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
}
|
||||
}
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
});
|
||||
// 1.4 启动消费者
|
||||
consumer.start();
|
||||
|
||||
// 2. 保存消费者引用
|
||||
topicConsumers.add(consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 销毁时关闭所有消费者
|
||||
*/
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
for (DefaultMQPushConsumer consumer : topicConsumers) {
|
||||
try {
|
||||
consumer.shutdown();
|
||||
log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup());
|
||||
} catch (Exception e) {
|
||||
log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
package cn.iocoder.yudao.module.iot.core.mq.message;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageIdentifierEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageTypeEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
// TODO @芋艿:参考阿里云的物模型,优化 IoT 上下行消息的设计,尽量保持一致(渐进式,不要一口气)!
|
||||
|
||||
/**
|
||||
* IoT 设备消息
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class IotDeviceMessage {
|
||||
|
||||
/**
|
||||
* 请求编号
|
||||
*/
|
||||
private String requestId;
|
||||
|
||||
/**
|
||||
* 设备信息
|
||||
*/
|
||||
private String productKey;
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
/**
|
||||
* 设备标识
|
||||
*/
|
||||
private String deviceKey;
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*
|
||||
* 枚举 {@link IotDeviceMessageTypeEnum}
|
||||
*/
|
||||
private String type;
|
||||
/**
|
||||
* 标识符
|
||||
*
|
||||
* 枚举 {@link IotDeviceMessageIdentifierEnum}
|
||||
*/
|
||||
private String identifier;
|
||||
|
||||
/**
|
||||
* 请求参数
|
||||
*
|
||||
* 例如说:属性上报的 properties、事件上报的 params
|
||||
*/
|
||||
private Object data;
|
||||
/**
|
||||
* 响应码
|
||||
*
|
||||
* 目前只有 server 下行消息给 device 设备时,才会有响应码
|
||||
*/
|
||||
private Integer code;
|
||||
|
||||
/**
|
||||
* 上报时间
|
||||
*/
|
||||
private LocalDateTime reportTime;
|
||||
|
||||
/**
|
||||
* 租户编号
|
||||
*/
|
||||
private Long tenantId;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package cn.iocoder.yudao.module.iot.core.mq.producer;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
/**
|
||||
* IoT 设备消息生产者
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
public class IotDeviceMessageProducer {
|
||||
|
||||
/**
|
||||
* 【消息总线】应用的设备消息 Topic,由 iot-gateway 发给 iot-biz 进行消费
|
||||
*/
|
||||
private static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
|
||||
|
||||
/**
|
||||
* 【消息总线】设备消息 Topic,由 iot-biz 发送给 iot-gateway 的某个 “server”(protocol) 进行消费
|
||||
*
|
||||
* 其中,%s 就是该“server”(protocol) 的标识
|
||||
*/
|
||||
private static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "/%s";
|
||||
|
||||
private final IotMessageBus messageBus;
|
||||
|
||||
/**
|
||||
* 发送设备消息
|
||||
*
|
||||
* @param message 设备消息
|
||||
*/
|
||||
public void sendDeviceMessage(IotDeviceMessage message) {
|
||||
messageBus.post(MESSAGE_BUS_DEVICE_MESSAGE_TOPIC, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送网关设备消息
|
||||
*
|
||||
* @param server 网关的 server 标识
|
||||
* @param message 设备消息
|
||||
*/
|
||||
public void sendGatewayDeviceMessage(String server, Object message) {
|
||||
messageBus.post(String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, server), message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration
|
||||
@@ -0,0 +1,12 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class TestMessage {
|
||||
|
||||
private String nickname;
|
||||
|
||||
private Integer age;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core.local;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* {@link LocalIotMessageBus} 集成测试
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@SpringBootTest(classes = LocalIotMessageBusIntegrationTest.class)
|
||||
@Import(IotMessageBusAutoConfiguration.class)
|
||||
@TestPropertySource(properties = {
|
||||
"yudao.iot.message-bus.type=local"
|
||||
})
|
||||
@Slf4j
|
||||
public class LocalIotMessageBusIntegrationTest {
|
||||
|
||||
@Resource
|
||||
private IotMessageBus messageBus;
|
||||
|
||||
/**
|
||||
* 1 topic 2 subscriber
|
||||
*/
|
||||
@Test
|
||||
public void testSendMessageWithTwoSubscribers() throws InterruptedException {
|
||||
// 准备
|
||||
String topic = "test-topic";
|
||||
String testMessage = "Hello IoT Message Bus!";
|
||||
// 用于等待消息处理完成
|
||||
CountDownLatch latch = new CountDownLatch(2);
|
||||
// 用于记录接收到的消息
|
||||
AtomicInteger subscriber1Count = new AtomicInteger(0);
|
||||
AtomicInteger subscriber2Count = new AtomicInteger(0);
|
||||
|
||||
// 创建第一个订阅者
|
||||
IotMessageBusSubscriber<String> subscriber1 = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "group1";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
subscriber1Count.incrementAndGet();
|
||||
assertEquals(testMessage, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 创建第二个订阅者
|
||||
IotMessageBusSubscriber<String> subscriber2 = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "group2";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
subscriber2Count.incrementAndGet();
|
||||
assertEquals(testMessage, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 注册订阅者
|
||||
messageBus.register(subscriber1);
|
||||
messageBus.register(subscriber2);
|
||||
|
||||
// 发送消息
|
||||
log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
|
||||
messageBus.post(topic, testMessage);
|
||||
// 等待消息处理完成(最多等待 10 秒)
|
||||
boolean completed = latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
// 验证结果
|
||||
assertTrue(completed, "消息处理超时");
|
||||
assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息");
|
||||
assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息");
|
||||
log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息,订阅者 2 收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 2 topic 2 subscriber
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleTopics() throws InterruptedException {
|
||||
// 准备
|
||||
String topic1 = "device-status";
|
||||
String topic2 = "device-data";
|
||||
String message1 = "设备在线";
|
||||
String message2 = "温度:25°C";
|
||||
CountDownLatch latch = new CountDownLatch(2);
|
||||
|
||||
// 创建订阅者 1 - 只订阅设备状态
|
||||
IotMessageBusSubscriber<String> statusSubscriber = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "status-group";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
assertEquals(message1, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 创建订阅者 2 - 只订阅设备数据
|
||||
IotMessageBusSubscriber<String> dataSubscriber = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "data-group";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
assertEquals(message2, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 注册订阅者到不同主题
|
||||
messageBus.register(statusSubscriber);
|
||||
messageBus.register(dataSubscriber);
|
||||
|
||||
// 发送消息到不同主题
|
||||
messageBus.post(topic1, message1);
|
||||
messageBus.post(topic2, message2);
|
||||
// 等待消息处理完成
|
||||
boolean completed = latch.await(10, TimeUnit.SECONDS);
|
||||
assertTrue(completed, "消息处理超时");
|
||||
log.info("[测试] 多主题测试完成");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,268 @@
|
||||
package cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.TestMessage;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* {@link RocketMQIotMessageBus} 集成测试
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@SpringBootTest(classes = RocketMQIotMessageBusTest.class)
|
||||
@Import({RocketMQAutoConfiguration.class, IotMessageBusAutoConfiguration.class})
|
||||
@TestPropertySource(properties = {
|
||||
"yudao.iot.message-bus.type=rocketmq",
|
||||
"rocketmq.name-server=127.0.0.1:9876",
|
||||
"rocketmq.producer.group=test-rocketmq-group",
|
||||
"rocketmq.producer.send-message-timeout=10000"
|
||||
})
|
||||
@Slf4j
|
||||
public class RocketMQIotMessageBusTest {
|
||||
|
||||
@Resource
|
||||
private IotMessageBus messageBus;
|
||||
|
||||
/**
|
||||
* 1 topic 1 subscriber(string)
|
||||
*/
|
||||
@Test
|
||||
public void testSendMessageWithOneSubscriber() throws InterruptedException {
|
||||
// 准备
|
||||
String topic = "test-topic-" + IdUtil.simpleUUID();
|
||||
// String topic = "test-topic-pojo";
|
||||
String testMessage = "Hello IoT Message Bus!";
|
||||
// 用于等待消息处理完成
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
// 用于记录接收到的消息
|
||||
AtomicInteger subscriberCount = new AtomicInteger(0);
|
||||
AtomicReference<String> subscriberMessageRef = new AtomicReference<>();
|
||||
|
||||
// 发送消息(需要提前发,保证 RocketMQ 路由的创建)
|
||||
log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
|
||||
messageBus.post(topic, testMessage);
|
||||
|
||||
// 创建订阅者
|
||||
IotMessageBusSubscriber<String> subscriber1 = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
|
||||
// return "test-topic-consumer-01";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("[订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
subscriberCount.incrementAndGet();
|
||||
subscriberMessageRef.set(message);
|
||||
assertEquals(testMessage, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 注册订阅者
|
||||
messageBus.register(subscriber1);
|
||||
|
||||
// 等待消息处理完成(最多等待 5 秒)
|
||||
boolean completed = latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
// 验证结果
|
||||
assertTrue(completed, "消息处理超时");
|
||||
assertEquals(1, subscriberCount.get(), "订阅者应该收到 1 条消息");
|
||||
log.info("[测试] 测试完成 - 订阅者收到{}条消息", subscriberCount.get());
|
||||
assertEquals(testMessage, subscriberMessageRef.get(), "接收到的消息内容不匹配");
|
||||
}
|
||||
|
||||
/**
|
||||
* 1 topic 2 subscriber(pojo)
|
||||
*/
|
||||
@Test
|
||||
public void testSendMessageWithTwoSubscribers() throws InterruptedException {
|
||||
// 准备
|
||||
String topic = "test-topic-" + IdUtil.simpleUUID();
|
||||
// String topic = "test-topic-pojo";
|
||||
TestMessage testMessage = new TestMessage().setNickname("yunai").setAge(18);
|
||||
// 用于等待消息处理完成
|
||||
CountDownLatch latch = new CountDownLatch(2);
|
||||
// 用于记录接收到的消息
|
||||
AtomicInteger subscriber1Count = new AtomicInteger(0);
|
||||
AtomicReference<TestMessage> subscriber1MessageRef = new AtomicReference<>();
|
||||
AtomicInteger subscriber2Count = new AtomicInteger(0);
|
||||
AtomicReference<TestMessage> subscriber2MessageRef = new AtomicReference<>();
|
||||
|
||||
// 发送消息(需要提前发,保证 RocketMQ 路由的创建)
|
||||
log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
|
||||
messageBus.post(topic, testMessage);
|
||||
|
||||
// 创建第一个订阅者
|
||||
IotMessageBusSubscriber<TestMessage> subscriber1 = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
|
||||
// return "test-topic-consumer-01";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(TestMessage message) {
|
||||
log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
subscriber1Count.incrementAndGet();
|
||||
subscriber1MessageRef.set(message);
|
||||
assertEquals(testMessage, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 创建第二个订阅者
|
||||
IotMessageBusSubscriber<TestMessage> subscriber2 = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
|
||||
// return "test-topic-consumer-02";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(TestMessage message) {
|
||||
log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
subscriber2Count.incrementAndGet();
|
||||
subscriber2MessageRef.set(message);
|
||||
assertEquals(testMessage, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 注册订阅者
|
||||
messageBus.register(subscriber1);
|
||||
messageBus.register(subscriber2);
|
||||
|
||||
// 等待消息处理完成(最多等待 5 秒)
|
||||
boolean completed = latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
// 验证结果
|
||||
assertTrue(completed, "消息处理超时");
|
||||
assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息");
|
||||
assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息");
|
||||
log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
|
||||
assertEquals(testMessage, subscriber1MessageRef.get(), "接收到的消息内容不匹配");
|
||||
assertEquals(testMessage, subscriber2MessageRef.get(), "接收到的消息内容不匹配");
|
||||
}
|
||||
|
||||
/**
|
||||
* 2 topic 2 subscriber
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleTopics() throws InterruptedException {
|
||||
// 准备
|
||||
String topic1 = "device-status-" + IdUtil.simpleUUID();
|
||||
String topic2 = "device-data-" + IdUtil.simpleUUID();
|
||||
String message1 = "设备在线";
|
||||
String message2 = "温度:25°C";
|
||||
CountDownLatch latch = new CountDownLatch(2);
|
||||
AtomicInteger subscriber1Count = new AtomicInteger(0);
|
||||
AtomicReference<String> subscriber1MessageRef = new AtomicReference<>();
|
||||
AtomicInteger subscriber2Count = new AtomicInteger(0);
|
||||
AtomicReference<String> subscriber2MessageRef = new AtomicReference<>();
|
||||
|
||||
|
||||
// 发送消息到不同主题(需要提前发,保证 RocketMQ 路由的创建)
|
||||
log.info("[测试] 发送消息 - Topic1: {}, Message1: {}", topic1, message1);
|
||||
messageBus.post(topic1, message1);
|
||||
log.info("[测试] 发送消息 - Topic2: {}, Message2: {}", topic2, message2);
|
||||
messageBus.post(topic2, message2);
|
||||
|
||||
// 创建订阅者 1 - 只订阅设备状态
|
||||
IotMessageBusSubscriber<String> statusSubscriber = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "status-group-" + IdUtil.simpleUUID();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
subscriber1Count.incrementAndGet();
|
||||
subscriber1MessageRef.set(message);
|
||||
assertEquals(message1, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 创建订阅者 2 - 只订阅设备数据
|
||||
IotMessageBusSubscriber<String> dataSubscriber = new IotMessageBusSubscriber<>() {
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getGroup() {
|
||||
return "data-group-" + IdUtil.simpleUUID();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
|
||||
subscriber2Count.incrementAndGet();
|
||||
subscriber2MessageRef.set(message);
|
||||
assertEquals(message2, message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
};
|
||||
// 注册订阅者到不同主题
|
||||
messageBus.register(statusSubscriber);
|
||||
messageBus.register(dataSubscriber);
|
||||
|
||||
// 等待消息处理完成
|
||||
boolean completed = latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
// 验证结果
|
||||
assertTrue(completed, "消息处理超时");
|
||||
assertEquals(1, subscriber1Count.get(), "状态订阅者应该收到 1 条消息");
|
||||
assertEquals(message1, subscriber1MessageRef.get(), "状态订阅者接收到的消息内容不匹配");
|
||||
assertEquals(1, subscriber2Count.get(), "数据订阅者应该收到 1 条消息");
|
||||
assertEquals(message2, subscriber2MessageRef.get(), "数据订阅者接收到的消息内容不匹配");
|
||||
log.info("[测试] 多主题测试完成 - 状态订阅者收到{}条消息,数据订阅者收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user