review:【iot 物联网】场景联动的缓存等
This commit is contained in:
@@ -33,7 +33,7 @@ public class IotTcpDataRuleAction extends
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected IotTcpClient initProducer(IotDataSinkTcpConfig config) throws Exception {
|
protected IotTcpClient initProducer(IotDataSinkTcpConfig config) throws Exception {
|
||||||
// 1. 参数校验
|
// 1.1 参数校验
|
||||||
if (config.getHost() == null || config.getHost().trim().isEmpty()) {
|
if (config.getHost() == null || config.getHost().trim().isEmpty()) {
|
||||||
throw new IllegalArgumentException("TCP 服务器地址不能为空");
|
throw new IllegalArgumentException("TCP 服务器地址不能为空");
|
||||||
}
|
}
|
||||||
@@ -41,7 +41,7 @@ public class IotTcpDataRuleAction extends
|
|||||||
throw new IllegalArgumentException("TCP 服务器端口无效");
|
throw new IllegalArgumentException("TCP 服务器端口无效");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. 创建 TCP 客户端
|
// 2.1 创建 TCP 客户端
|
||||||
IotTcpClient tcpClient = new IotTcpClient(
|
IotTcpClient tcpClient = new IotTcpClient(
|
||||||
config.getHost(),
|
config.getHost(),
|
||||||
config.getPort(),
|
config.getPort(),
|
||||||
@@ -51,13 +51,10 @@ public class IotTcpDataRuleAction extends
|
|||||||
config.getSslCertPath(),
|
config.getSslCertPath(),
|
||||||
config.getDataFormat()
|
config.getDataFormat()
|
||||||
);
|
);
|
||||||
|
// 2.2 连接服务器
|
||||||
// 3. 连接服务器
|
|
||||||
tcpClient.connect();
|
tcpClient.connect();
|
||||||
|
|
||||||
log.info("[initProducer][TCP 客户端创建并连接成功,服务器: {}:{},SSL: {},数据格式: {}]",
|
log.info("[initProducer][TCP 客户端创建并连接成功,服务器: {}:{},SSL: {},数据格式: {}]",
|
||||||
config.getHost(), config.getPort(), config.getSsl(), config.getDataFormat());
|
config.getHost(), config.getPort(), config.getSsl(), config.getDataFormat());
|
||||||
|
|
||||||
return tcpClient;
|
return tcpClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,22 +68,19 @@ public class IotTcpDataRuleAction extends
|
|||||||
@Override
|
@Override
|
||||||
protected void execute(IotDeviceMessage message, IotDataSinkTcpConfig config) throws Exception {
|
protected void execute(IotDeviceMessage message, IotDataSinkTcpConfig config) throws Exception {
|
||||||
try {
|
try {
|
||||||
// 1. 获取或创建 TCP 客户端
|
// 1.1 获取或创建 TCP 客户端
|
||||||
IotTcpClient tcpClient = getProducer(config);
|
IotTcpClient tcpClient = getProducer(config);
|
||||||
|
// 1.2 检查连接状态,如果断开则重新连接
|
||||||
// 2. 检查连接状态,如果断开则重新连接
|
|
||||||
if (!tcpClient.isConnected()) {
|
if (!tcpClient.isConnected()) {
|
||||||
log.warn("[execute][TCP 连接已断开,尝试重新连接,服务器: {}:{}]", config.getHost(), config.getPort());
|
log.warn("[execute][TCP 连接已断开,尝试重新连接,服务器: {}:{}]", config.getHost(), config.getPort());
|
||||||
tcpClient.connect();
|
tcpClient.connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 发送消息并等待结果
|
// 2.1 发送消息并等待结果
|
||||||
tcpClient.sendMessage(message);
|
tcpClient.sendMessage(message);
|
||||||
|
// 2.2 记录发送成功日志
|
||||||
// 4. 记录发送成功日志
|
|
||||||
log.info("[execute][message({}) config({}) 发送成功,TCP 服务器: {}:{}]",
|
log.info("[execute][message({}) config({}) 发送成功,TCP 服务器: {}:{}]",
|
||||||
message, config, config.getHost(), config.getPort());
|
message, config, config.getHost(), config.getPort());
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[execute][message({}) config({}) 发送失败,TCP 服务器: {}:{}]",
|
log.error("[execute][message({}) config({}) 发送失败,TCP 服务器: {}:{}]",
|
||||||
message, config, config.getHost(), config.getPort(), e);
|
message, config, config.getHost(), config.getPort(), e);
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ public class IotTcpClient {
|
|||||||
private BufferedReader reader;
|
private BufferedReader reader;
|
||||||
private final AtomicBoolean connected = new AtomicBoolean(false);
|
private final AtomicBoolean connected = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// TODO @puhui999:default 值,IotDataSinkTcpConfig.java 枚举起来哈;
|
||||||
public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
|
public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
|
||||||
Boolean ssl, String sslCertPath, String dataFormat) {
|
Boolean ssl, String sslCertPath, String dataFormat) {
|
||||||
this.host = host;
|
this.host = host;
|
||||||
@@ -76,9 +77,9 @@ public class IotTcpClient {
|
|||||||
outputStream = socket.getOutputStream();
|
outputStream = socket.getOutputStream();
|
||||||
reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
|
reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
// 更新状态
|
||||||
connected.set(true);
|
connected.set(true);
|
||||||
log.info("[connect][TCP 客户端连接成功,服务器地址: {}:{}]", host, port);
|
log.info("[connect][TCP 客户端连接成功,服务器地址: {}:{}]", host, port);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
close();
|
close();
|
||||||
log.error("[connect][TCP 客户端连接失败,服务器地址: {}:{}]", host, port, e);
|
log.error("[connect][TCP 客户端连接失败,服务器地址: {}:{}]", host, port, e);
|
||||||
@@ -98,6 +99,7 @@ public class IotTcpClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// TODO @puhui999:枚举值
|
||||||
String messageData;
|
String messageData;
|
||||||
if ("JSON".equalsIgnoreCase(dataFormat)) {
|
if ("JSON".equalsIgnoreCase(dataFormat)) {
|
||||||
// JSON 格式
|
// JSON 格式
|
||||||
@@ -111,10 +113,8 @@ public class IotTcpClient {
|
|||||||
outputStream.write(messageData.getBytes(StandardCharsets.UTF_8));
|
outputStream.write(messageData.getBytes(StandardCharsets.UTF_8));
|
||||||
outputStream.write('\n'); // 添加换行符作为消息分隔符
|
outputStream.write('\n'); // 添加换行符作为消息分隔符
|
||||||
outputStream.flush();
|
outputStream.flush();
|
||||||
|
|
||||||
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
|
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
|
||||||
message.getDeviceId(), messageData.length());
|
message.getDeviceId(), messageData.length());
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
|
log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
|
||||||
throw e;
|
throw e;
|
||||||
@@ -153,9 +153,9 @@ public class IotTcpClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 更新状态
|
||||||
connected.set(false);
|
connected.set(false);
|
||||||
log.info("[close][TCP 客户端连接已关闭,服务器地址: {}:{}]", host, port);
|
log.info("[close][TCP 客户端连接已关闭,服务器地址: {}:{}]", host, port);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[close][关闭 TCP 客户端连接异常]", e);
|
log.error("[close][关闭 TCP 客户端连接异常]", e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,13 +25,13 @@ class IotTcpDataRuleActionTest {
|
|||||||
private IotTcpClient mockTcpClient;
|
private IotTcpClient mockTcpClient;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
public void setUp() {
|
||||||
MockitoAnnotations.openMocks(this);
|
MockitoAnnotations.openMocks(this);
|
||||||
tcpDataRuleAction = new IotTcpDataRuleAction();
|
tcpDataRuleAction = new IotTcpDataRuleAction();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testGetType() {
|
public void testGetType() {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
Integer expectedType = 2; // 数据接收类型枚举中 TCP 类型的值
|
Integer expectedType = 2; // 数据接收类型枚举中 TCP 类型的值
|
||||||
|
|
||||||
@@ -42,8 +42,9 @@ class IotTcpDataRuleActionTest {
|
|||||||
assertEquals(expectedType, actualType);
|
assertEquals(expectedType, actualType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO @puhui999:_ 后面是小写哈,单测的命名规则。
|
||||||
@Test
|
@Test
|
||||||
void testInitProducer_Success() throws Exception {
|
public void testInitProducer_Success() throws Exception {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
||||||
config.setHost("localhost");
|
config.setHost("localhost");
|
||||||
@@ -59,7 +60,7 @@ class IotTcpDataRuleActionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testInitProducer_InvalidHost() {
|
public void testInitProducer_InvalidHost() {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
||||||
config.setHost("");
|
config.setHost("");
|
||||||
@@ -77,7 +78,7 @@ class IotTcpDataRuleActionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testInitProducer_InvalidPort() {
|
public void testInitProducer_InvalidPort() {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
||||||
config.setHost("localhost");
|
config.setHost("localhost");
|
||||||
@@ -92,7 +93,7 @@ class IotTcpDataRuleActionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testCloseProducer() throws Exception {
|
public void testCloseProducer() throws Exception {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
IotTcpClient client = mock(IotTcpClient.class);
|
IotTcpClient client = mock(IotTcpClient.class);
|
||||||
|
|
||||||
@@ -104,7 +105,7 @@ class IotTcpDataRuleActionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testExecute_WithValidConfig() {
|
public void testExecute_WithValidConfig() {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.report",
|
IotDeviceMessage message = IotDeviceMessage.requestOf("thing.property.report",
|
||||||
"{\"temperature\": 25.5, \"humidity\": 60}");
|
"{\"temperature\": 25.5, \"humidity\": 60}");
|
||||||
@@ -124,7 +125,7 @@ class IotTcpDataRuleActionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testConfig_DefaultValues() {
|
public void testConfig_DefaultValues() {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
IotDataSinkTcpConfig config = new IotDataSinkTcpConfig();
|
||||||
|
|
||||||
@@ -140,7 +141,7 @@ class IotTcpDataRuleActionTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testMessageSerialization() {
|
public void testMessageSerialization() {
|
||||||
// 准备参数
|
// 准备参数
|
||||||
IotDeviceMessage message = IotDeviceMessage.builder()
|
IotDeviceMessage message = IotDeviceMessage.builder()
|
||||||
.deviceId(123L)
|
.deviceId(123L)
|
||||||
|
|||||||
Reference in New Issue
Block a user