reactor:【IoT 物联网】优化设备缓存的加载逻辑

This commit is contained in:
YunaiV
2025-06-11 20:35:09 +08:00
parent 66b42367cb
commit 33fed79820
28 changed files with 486 additions and 684 deletions

View File

@@ -13,4 +13,7 @@ public interface ErrorCodeConstants {
ErrorCode DEVICE_AUTH_FAIL = new ErrorCode(1_051_001_000, "设备鉴权失败"); // 对应阿里云 20000
ErrorCode DEVICE_TOKEN_EXPIRED = new ErrorCode(1_051_001_002, "token 失效。需重新调用 auth 进行鉴权获取token"); // 对应阿里云 20001
// ========== 设备信息 1-050-002-000 ============
ErrorCode DEVICE_NOT_EXISTS = new ErrorCode(1_051_002_001, "设备({}/{}) 不存在");
}

View File

@@ -36,14 +36,10 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
public final void handle(RoutingContext context) {
try {
// 1. 前置处理
CommonResult<Object> result = beforeHandle(context);
if (result != null) {
writeResponse(context, result);
return;
}
beforeHandle(context);
// 2. 执行逻辑
result = handle0(context);
CommonResult<Object> result = handle0(context);
writeResponse(context, result);
} catch (ServiceException e) {
writeResponse(context, CommonResult.error(e.getCode(), e.getMessage()));
@@ -55,11 +51,11 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
protected abstract CommonResult<Object> handle0(RoutingContext context);
private CommonResult<Object> beforeHandle(RoutingContext context) {
private void beforeHandle(RoutingContext context) {
// 如果不需要认证,则不走前置处理
String path = context.request().path();
if (ObjUtil.equal(path, IotHttpAuthHandler.PATH)) {
return null;
return;
}
// 解析参数
@@ -84,7 +80,6 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
|| ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) {
throw exception(FORBIDDEN);
}
return null;
}
@SuppressWarnings("deprecation")

View File

@@ -9,11 +9,10 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
@@ -35,19 +34,16 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
private final IotHttpUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceCommonApi deviceClientService;
private final IotDeviceCommonApi deviceApi;
private final IotDeviceMessageService deviceMessageService;
public IotHttpAuthHandler(IotHttpUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
this.deviceClientService = SpringUtil.getBean(IotDeviceCommonApi.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
}
@@ -69,9 +65,9 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
}
// 2.1 执行认证
CommonResult<Boolean> result = deviceClientService.authDevice(new IotDeviceAuthReqDTO()
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password));
result.checkError();;
result.checkError();
if (!BooleanUtil.isTrue(result.getData())) {
throw exception(DEVICE_AUTH_FAIL);
}
@@ -82,9 +78,9 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
Assert.notBlank(token, "生成 token 不能为空位");
// 3. 执行上线
IotDeviceMessage message = deviceMessageService.buildDeviceMessageOfStateOnline(
IotDeviceMessage message = IotDeviceMessage.buildStateOnline();
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
deviceMessageProducer.sendDeviceMessage(message);
// 构建响应数据
return success(MapUtil.of("token", token));

View File

@@ -8,7 +8,7 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@@ -61,7 +61,7 @@ public class IotDeviceTokenServiceImpl implements IotDeviceTokenService {
JSONObject payload = jwt.getPayloads();
// 检查过期时间
Long exp = payload.getLong("exp");
if (exp == null || exp > System.currentTimeMillis() / 1000) {
if (exp == null || exp < System.currentTimeMillis() / 1000) {
throw exception(DEVICE_TOKEN_EXPIRED);
}
String productKey = payload.getStr("productKey");

View File

@@ -1,75 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.service.device;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* IoT 设备缓存 Service 接口
*
* @author 芋道源码
*/
public interface IotDeviceCacheService {
/**
* 设备信息
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class DeviceInfo {
/**
* 设备编号
*/
private Long deviceId;
/**
* 产品标识
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 设备密钥
*/
private String deviceKey;
/**
* 租户编号
*/
private Long tenantId;
}
/**
* 根据 productKey 和 deviceName 获取设备信息
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 设备信息,如果不存在返回 null
*/
DeviceInfo getDeviceInfo(String productKey, String deviceName);
/**
* 根据 deviceId 获取设备信息
*
* @param deviceId 设备编号
* @return 设备信息,如果不存在返回 null
*/
DeviceInfo getDeviceInfoById(Long deviceId);
/**
* 清除设备缓存
*
* @param deviceId 设备编号
*/
void evictDeviceCache(Long deviceId);
/**
* 清除设备缓存
*
* @param productKey 产品标识
* @param deviceName 设备名称
*/
void evictDeviceCache(String productKey, String deviceName);
}

View File

@@ -1,241 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.service.device;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* IoT 设备缓存 Service 实现类
* <p>
* 使用本地缓存 + 远程 API 的方式获取设备信息,提高性能并避免敏感信息传输
*
* @author 芋道源码
*/
@Service
@Slf4j
public class IotDeviceCacheServiceImpl implements IotDeviceCacheService {
/**
* 设备信息本地缓存
* Key: deviceId
* Value: DeviceInfo
*/
private final ConcurrentHashMap<Long, DeviceInfo> deviceIdCache = new ConcurrentHashMap<>();
/**
* 设备名称到设备ID的映射缓存
* Key: productKey:deviceName
* Value: deviceId
*/
private final ConcurrentHashMap<String, Long> deviceNameCache = new ConcurrentHashMap<>();
/**
* 锁对象,防止并发请求同一设备信息
*/
private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
@Override
public DeviceInfo getDeviceInfo(String productKey, String deviceName) {
if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) {
log.warn("[getDeviceInfo][参数为空][productKey: {}][deviceName: {}]", productKey, deviceName);
return null;
}
String cacheKey = buildDeviceNameCacheKey(productKey, deviceName);
// 1. 先从缓存获取 deviceId
Long deviceId = deviceNameCache.get(cacheKey);
if (deviceId != null) {
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
if (deviceInfo != null) {
log.debug("[getDeviceInfo][缓存命中][productKey: {}][deviceName: {}][deviceId: {}]",
productKey, deviceName, deviceId);
return deviceInfo;
}
}
// 2. 缓存未命中,从远程 API 获取
return loadDeviceInfoFromApi(productKey, deviceName, cacheKey);
}
@Override
public DeviceInfo getDeviceInfoById(Long deviceId) {
if (deviceId == null) {
log.warn("[getDeviceInfoById][deviceId 为空]");
return null;
}
// 1. 先从缓存获取
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
if (deviceInfo != null) {
log.debug("[getDeviceInfoById][缓存命中][deviceId: {}]", deviceId);
return deviceInfo;
}
// 2. 缓存未命中,从远程 API 获取
return loadDeviceInfoByIdFromApi(deviceId);
}
@Override
public void evictDeviceCache(Long deviceId) {
if (deviceId == null) {
return;
}
DeviceInfo deviceInfo = deviceIdCache.remove(deviceId);
if (deviceInfo != null) {
String cacheKey = buildDeviceNameCacheKey(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
deviceNameCache.remove(cacheKey);
log.info("[evictDeviceCache][清除设备缓存][deviceId: {}]", deviceId);
}
}
@Override
public void evictDeviceCache(String productKey, String deviceName) {
if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) {
return;
}
String cacheKey = buildDeviceNameCacheKey(productKey, deviceName);
Long deviceId = deviceNameCache.remove(cacheKey);
if (deviceId != null) {
deviceIdCache.remove(deviceId);
log.info("[evictDeviceCache][清除设备缓存][productKey: {}][deviceName: {}]", productKey, deviceName);
}
}
/**
* 从远程 API 加载设备信息
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @param cacheKey 缓存键
* @return 设备信息
*/
private DeviceInfo loadDeviceInfoFromApi(String productKey, String deviceName, String cacheKey) {
// 使用锁防止并发请求同一设备信息
ReentrantLock lock = lockMap.computeIfAbsent(cacheKey, k -> new ReentrantLock());
lock.lock();
try {
// 双重检查,防止重复请求
Long deviceId = deviceNameCache.get(cacheKey);
if (deviceId != null) {
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
if (deviceInfo != null) {
return deviceInfo;
}
}
log.info("[loadDeviceInfoFromApi][从远程API获取设备信息][productKey: {}][deviceName: {}]",
productKey, deviceName);
try {
// 调用远程 API 获取设备信息
IotDeviceCommonApi deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class);
IotDeviceInfoReqDTO reqDTO = new IotDeviceInfoReqDTO();
reqDTO.setProductKey(productKey);
reqDTO.setDeviceName(deviceName);
CommonResult<IotDeviceInfoRespDTO> result = deviceCommonApi.getDeviceInfo(reqDTO);
if (result == null || !result.isSuccess() || result.getData() == null) {
log.warn("[loadDeviceInfoFromApi][远程API调用失败][productKey: {}][deviceName: {}][result: {}]",
productKey, deviceName, result);
return null;
}
IotDeviceInfoRespDTO respDTO = result.getData();
DeviceInfo deviceInfo = new DeviceInfo(
respDTO.getDeviceId(),
respDTO.getProductKey(),
respDTO.getDeviceName(),
respDTO.getDeviceKey(),
respDTO.getTenantId());
// 缓存设备信息
cacheDeviceInfo(deviceInfo, cacheKey);
log.info("[loadDeviceInfoFromApi][设备信息获取成功并已缓存][deviceInfo: {}]", deviceInfo);
return deviceInfo;
} catch (Exception e) {
log.error("[loadDeviceInfoFromApi][远程API调用异常][productKey: {}][deviceName: {}]",
productKey, deviceName, e);
return null;
}
} finally {
lock.unlock();
// 清理锁对象,避免内存泄漏
if (lockMap.size() > 1000) { // 简单的清理策略
lockMap.entrySet().removeIf(entry -> !entry.getValue().hasQueuedThreads());
}
}
}
/**
* 从远程 API 根据 deviceId 加载设备信息
*
* @param deviceId 设备编号
* @return 设备信息
*/
private DeviceInfo loadDeviceInfoByIdFromApi(Long deviceId) {
String lockKey = "deviceId:" + deviceId;
ReentrantLock lock = lockMap.computeIfAbsent(lockKey, k -> new ReentrantLock());
lock.lock();
try {
// 双重检查
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
if (deviceInfo != null) {
return deviceInfo;
}
log.info("[loadDeviceInfoByIdFromApi][从远程API获取设备信息][deviceId: {}]", deviceId);
try {
// TODO: 这里需要添加根据 deviceId 获取设备信息的 API
// 暂时返回 null等待 API 完善
log.warn("[loadDeviceInfoByIdFromApi][根据deviceId获取设备信息的API尚未实现][deviceId: {}]", deviceId);
return null;
} catch (Exception e) {
log.error("[loadDeviceInfoByIdFromApi][远程API调用异常][deviceId: {}]", deviceId, e);
return null;
}
} finally {
lock.unlock();
}
}
/**
* 缓存设备信息
*
* @param deviceInfo 设备信息
* @param cacheKey 缓存键
*/
private void cacheDeviceInfo(DeviceInfo deviceInfo, String cacheKey) {
if (deviceInfo != null && deviceInfo.getDeviceId() != null) {
deviceIdCache.put(deviceInfo.getDeviceId(), deviceInfo);
deviceNameCache.put(cacheKey, deviceInfo.getDeviceId());
}
}
/**
* 构建设备名称缓存键
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 缓存键
*/
private String buildDeviceNameCacheKey(String productKey, String deviceName) {
return productKey + ":" + deviceName;
}
}

View File

@@ -1,107 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.service.device;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.bean.BeanUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.LinkedHashMap;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* Iot 设备信息 Service 实现类:调用远程的 device http 接口,进行设备认证、设备获取等
*
* @author 芋道源码
*/
@Service
@Slf4j
public class IotDeviceClientServiceImpl implements IotDeviceCommonApi {
@Resource
private IotGatewayProperties gatewayProperties;
private RestTemplate restTemplate;
@PostConstruct
public void init() {
IotGatewayProperties.RpcProperties rpc = gatewayProperties.getRpc();
restTemplate = new RestTemplateBuilder()
.rootUri(rpc.getUrl() + "/rpc-api/iot/device")
.readTimeout(rpc.getReadTimeout())
.connectTimeout(rpc.getConnectTimeout())
.build();
}
@Override
public CommonResult<Boolean> authDevice(IotDeviceAuthReqDTO authReqDTO) {
return doPost("/auth", authReqDTO);
}
@Override
public CommonResult<IotDeviceInfoRespDTO> getDeviceInfo(IotDeviceInfoReqDTO infoReqDTO) {
return doPostForDeviceInfo("/info", infoReqDTO);
}
@SuppressWarnings("unchecked")
private <T> CommonResult<Boolean> doPost(String url, T requestBody) {
try {
CommonResult<Boolean> result = restTemplate.postForObject(url, requestBody,
(Class<CommonResult<Boolean>>) (Class<?>) CommonResult.class);
log.info("[doPost][url({}) requestBody({}) result({})]", url, requestBody, result);
Assert.notNull(result, "请求结果不能为空");
return result;
} catch (Exception e) {
log.error("[doPost][url({}) requestBody({}) 发生异常]", url, requestBody, e);
return CommonResult.error(INTERNAL_SERVER_ERROR);
}
}
@SuppressWarnings("unchecked")
private <T> CommonResult<IotDeviceInfoRespDTO> doPostForDeviceInfo(String url, T requestBody) {
try {
// 使用 ParameterizedTypeReference 来处理泛型类型
ParameterizedTypeReference<CommonResult<LinkedHashMap<String, Object>>> typeRef = new ParameterizedTypeReference<CommonResult<LinkedHashMap<String, Object>>>() {
};
HttpEntity<T> requestEntity = new HttpEntity<>(requestBody);
ResponseEntity<CommonResult<LinkedHashMap<String, Object>>> response = restTemplate.exchange(url,
HttpMethod.POST, requestEntity, typeRef);
CommonResult<LinkedHashMap<String, Object>> rawResult = response.getBody();
log.info("[doPostForDeviceInfo][url({}) requestBody({}) rawResult({})]", url, requestBody, rawResult);
Assert.notNull(rawResult, "请求结果不能为空");
// 手动转换数据类型
CommonResult<IotDeviceInfoRespDTO> result = new CommonResult<>();
result.setCode(rawResult.getCode());
result.setMsg(rawResult.getMsg());
if (rawResult.isSuccess() && rawResult.getData() != null) {
// 将 LinkedHashMap 转换为 IotDeviceInfoRespDTO
IotDeviceInfoRespDTO deviceInfo = BeanUtil.toBean(rawResult.getData(), IotDeviceInfoRespDTO.class);
result.setData(deviceInfo);
}
return result;
} catch (Exception e) {
log.error("[doPostForDeviceInfo][url({}) requestBody({}) 发生异常]", url, requestBody, e);
return CommonResult.error(INTERNAL_SERVER_ERROR);
}
}
}

View File

@@ -0,0 +1,29 @@
package cn.iocoder.yudao.module.iot.gateway.service.device;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
/**
* IoT 设备信息 Service 接口
*
* @author 芋道源码
*/
public interface IotDeviceService {
/**
* 根据 productKey 和 deviceName 获取设备信息
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 设备信息
*/
IotDeviceRespDTO getDeviceFromCache(String productKey, String deviceName);
/**
* 根据 id 获取设备信息
*
* @param id 设备编号
* @return 设备信息
*/
IotDeviceRespDTO getDeviceFromCache(Long id);
}

View File

@@ -0,0 +1,81 @@
package cn.iocoder.yudao.module.iot.gateway.service.device;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.core.KeyValue;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.Duration;
import static cn.iocoder.yudao.framework.common.util.cache.CacheUtils.buildAsyncReloadingCache;
/**
* IoT 设备信息 Service 实现类
*
* @author 芋道源码
*/
@Service
@Slf4j
public class IotDeviceServiceImpl implements IotDeviceService {
private static final Duration CACHE_EXPIRE = Duration.ofMinutes(1);
/**
* 通过 id 查询设备的缓存
*/
private final LoadingCache<Long, IotDeviceRespDTO> deviceCaches = buildAsyncReloadingCache(
CACHE_EXPIRE,
new CacheLoader<>() {
@Override
public IotDeviceRespDTO load(Long id) {
CommonResult<IotDeviceRespDTO> result = deviceApi.getDevice(new IotDeviceGetReqDTO().setId(id));
IotDeviceRespDTO device = result.getCheckedData();
Assert.notNull(device, "设备({}) 不能为空", id);
// 相互缓存
deviceCaches2.put(new KeyValue<>(device.getProductKey(), device.getDeviceName()), device);
return device;
}
});
/**
* 通过 productKey + deviceName 查询设备的缓存
*/
private final LoadingCache<KeyValue<String, String>, IotDeviceRespDTO> deviceCaches2 = buildAsyncReloadingCache(
CACHE_EXPIRE,
new CacheLoader<>() {
@Override
public IotDeviceRespDTO load(KeyValue<String, String> kv) {
CommonResult<IotDeviceRespDTO> result = deviceApi.getDevice(new IotDeviceGetReqDTO()
.setProductKey(kv.getKey()).setDeviceName(kv.getValue()));
IotDeviceRespDTO device = result.getCheckedData();
Assert.notNull(device, "设备({}/{}) 不能为空", kv.getKey(), kv.getValue());
// 相互缓存
deviceCaches.put(device.getId(), device);
return device;
}
});
@Resource
private IotDeviceCommonApi deviceApi;
@Override
public IotDeviceRespDTO getDeviceFromCache(String productKey, String deviceName) {
return deviceCaches2.getUnchecked(new KeyValue<>(productKey, deviceName));
}
@Override
public IotDeviceRespDTO getDeviceFromCache(Long id) {
return deviceCaches.getUnchecked(id);
}
}

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.service.message;
package cn.iocoder.yudao.module.iot.gateway.service.device.message;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -26,30 +26,20 @@ public interface IotDeviceMessageService {
* @param bytes 消息内容
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serverId 设备连接的 serverId
* @return 解码后的消息内容
*/
IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName, String serverId);
String productKey, String deviceName);
/**
* 构建设备上线消息
* 发送消息
*
* @param message 消息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serverId 设备连接的 serverId
* @return 消息
* @param serverId 设备连接的 serverId
*/
IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId);
/**
* 构建设备下线消息
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serverId 设备连接的 serverId
* @return 消息
*/
IotDeviceMessage buildDeviceMessageOfStateOffline(String productKey, String deviceName, String serverId);
void sendDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName, String serverId);
}

View File

@@ -0,0 +1,115 @@
package cn.iocoder.yudao.module.iot.gateway.service.device.message;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_NOT_EXISTS;
/**
* IoT 设备消息 Service 实现类
*
* @author 芋道源码
*/
@Service
@Slf4j
public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
/**
* 编解码器
*/
private final Map<String, IotAlinkDeviceMessageCodec> codes;
@Resource
private IotDeviceService deviceService;
@Resource
private IotDeviceMessageProducer deviceMessageProducer;
public IotDeviceMessageServiceImpl(List<IotAlinkDeviceMessageCodec> codes) {
this.codes = CollectionUtils.convertMap(codes, IotAlinkDeviceMessageCodec::type);
}
@Override
public byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName) {
// 1.1 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
if (device == null) {
throw exception(DEVICE_NOT_EXISTS, productKey, deviceName);
}
// 1.2 获取编解码器
IotAlinkDeviceMessageCodec codec = codes.get(device.getCodecType());
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType()));
}
// 2. 编码消息
return codec.encode(message);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName) {
// 1.1 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
if (device == null) {
throw exception(DEVICE_NOT_EXISTS, productKey, deviceName);
}
// 1.2 获取编解码器
IotAlinkDeviceMessageCodec codec = codes.get(device.getCodecType());
if (codec == null) {
throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType()));
}
// 2. 解码消息
return codec.decode(bytes);
}
@Override
public void sendDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName, String serverId) {
// 1. 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName);
if (device == null) {
throw exception(DEVICE_NOT_EXISTS, productKey, deviceName);
}
// 2. 发送消息
appendDeviceMessage(message, device, serverId);
deviceMessageProducer.sendDeviceMessage(message);
}
/**
* 补充消息的后端字段
*
* @param message 消息
* @param device 设备信息
* @param serverId 设备连接的 serverId
* @return 消息
*/
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message,
IotDeviceRespDTO device, String serverId) {
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
.setDeviceId(device.getId()).setTenantId(device.getTenantId()).setServerId(serverId);
// 特殊:如果设备没有指定 requestId则使用 messageId
if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(message.getId());
}
return message;
}
}

View File

@@ -0,0 +1,74 @@
package cn.iocoder.yudao.module.iot.gateway.service.device.remote;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* Iot 设备信息 Service 实现类:调用远程的 device http 接口,进行设备认证、设备获取等
*
* @author 芋道源码
*/
@Service
@Slf4j
public class IotDeviceApiImpl implements IotDeviceCommonApi {
@Resource
private IotGatewayProperties gatewayProperties;
private RestTemplate restTemplate;
@PostConstruct
public void init() {
IotGatewayProperties.RpcProperties rpc = gatewayProperties.getRpc();
restTemplate = new RestTemplateBuilder()
.rootUri(rpc.getUrl() + "/rpc-api/iot/device")
.readTimeout(rpc.getReadTimeout())
.connectTimeout(rpc.getConnectTimeout())
.build();
}
@Override
public CommonResult<Boolean> authDevice(IotDeviceAuthReqDTO authReqDTO) {
return doPost("/auth", authReqDTO, new ParameterizedTypeReference<>() { });
}
@Override
public CommonResult<IotDeviceRespDTO> getDevice(IotDeviceGetReqDTO getReqDTO) {
return doPost("/get", getReqDTO, new ParameterizedTypeReference<>() { });
}
private <T, R> CommonResult<R> doPost(String url, T body,
ParameterizedTypeReference<CommonResult<R>> responseType) {
try {
// 请求
HttpEntity<T> requestEntity = new HttpEntity<>(body);
ResponseEntity<CommonResult<R>> response = restTemplate.exchange(
url, HttpMethod.POST, requestEntity, responseType);
// 响应
CommonResult<R> result = response.getBody();
Assert.notNull(result, "请求结果不能为空");
return result;
} catch (Exception e) {
log.error("[doPost][url({}) body({}) 发生异常]", url, body, e);
return CommonResult.error(INTERNAL_SERVER_ERROR);
}
}
}

View File

@@ -1,139 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.service.message;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceCacheService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
/**
* IoT 设备消息 Service 实现类
*
* @author 芋道源码
*/
@Service
@Slf4j
public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
/**
* 编解码器
*/
private final Map<String, IotAlinkDeviceMessageCodec> codes;
@Resource
private IotDeviceCacheService deviceCacheService;
public IotDeviceMessageServiceImpl(List<IotAlinkDeviceMessageCodec> codes) {
this.codes = CollectionUtils.convertMap(codes, IotAlinkDeviceMessageCodec::type);
}
@Override
public byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName) {
// 获取设备信息以确定编解码类型
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
if (deviceInfo == null) {
log.warn("[encodeDeviceMessage][设备信息不存在][productKey: {}][deviceName: {}]",
productKey, deviceName);
return null;
}
String codecType = "alink"; // 默认使用 alink 编解码器
IotAlinkDeviceMessageCodec codec = codes.get(codecType);
if (codec == null) {
log.error("[encodeDeviceMessage][编解码器不存在][codecType: {}]", codecType);
return null;
}
return codec.encode(message);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName, String serverId) {
// 获取设备信息
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
if (deviceInfo == null) {
log.warn("[decodeDeviceMessage][设备信息不存在][productKey: {}][deviceName: {}]",
productKey, deviceName);
return null;
}
String codecType = "alink"; // 默认使用 alink 编解码器
IotAlinkDeviceMessageCodec codec = codes.get(codecType);
if (codec == null) {
log.error("[decodeDeviceMessage][编解码器不存在][codecType: {}]", codecType);
return null;
}
IotDeviceMessage message = codec.decode(bytes);
if (message == null) {
log.warn("[decodeDeviceMessage][消息解码失败][productKey: {}][deviceName: {}]",
productKey, deviceName);
return null;
}
// 补充后端字段
return appendDeviceMessage(message, deviceInfo, serverId);
}
@Override
public IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId) {
// 获取设备信息
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
if (deviceInfo == null) {
log.warn("[buildDeviceMessageOfStateOnline][设备信息不存在][productKey: {}][deviceName: {}]",
productKey, deviceName);
return null;
}
IotDeviceMessage message = IotDeviceMessage.requestOf(null,
IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod(), null);
return appendDeviceMessage(message, deviceInfo, serverId);
}
@Override
public IotDeviceMessage buildDeviceMessageOfStateOffline(String productKey, String deviceName, String serverId) {
// 获取设备信息
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
if (deviceInfo == null) {
log.warn("[buildDeviceMessageOfStateOffline][设备信息不存在][productKey: {}][deviceName: {}]",
productKey, deviceName);
return null;
}
IotDeviceMessage message = IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod(),
null);
return appendDeviceMessage(message, deviceInfo, serverId);
}
/**
* 补充消息的后端字段
*
* @param message 消息
* @param device 设备信息
* @param serverId 设备连接的 serverId
* @return 消息
*/
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message,
IotDeviceCacheService.DeviceInfo device, String serverId) {
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
.setDeviceId(device.getDeviceId()).setTenantId(device.getTenantId()).setServerId(serverId);
// 特殊:如果设备没有指定 requestId则使用 messageId
if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(message.getId());
}
return message;
}
}