diff --git a/client/src/main/java/com/ecep/contract/WebSocketClientSession.java b/client/src/main/java/com/ecep/contract/WebSocketClientSession.java index 984b4b8..5cee132 100644 --- a/client/src/main/java/com/ecep/contract/WebSocketClientSession.java +++ b/client/src/main/java/com/ecep/contract/WebSocketClientSession.java @@ -1,20 +1,31 @@ package com.ecep.contract; -import com.ecep.contract.constant.WebSocketConstant; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import lombok.Getter; -import org.springframework.beans.BeanUtils; - import java.beans.PropertyDescriptor; import java.util.Locale; import java.util.Map; import java.util.UUID; import java.util.logging.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeanUtils; + +import com.ecep.contract.constant.WebSocketConstant; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; + +import lombok.Getter; + +/** + * + */ public class WebSocketClientSession { + private static final Logger logger = LoggerFactory.getLogger(WebSocketClientSession.class); + /** + * 会话ID由客户端创建,服务器不保存会话只回传会话ID + */ @Getter - private String sessionId = UUID.randomUUID().toString(); + private final String sessionId = UUID.randomUUID().toString(); private WebSocketClientTasker tasker; @@ -90,11 +101,17 @@ public class WebSocketClientSession { Object object = webSocketService.getObjectMapper().convertValue(value, descriptor.getPropertyType()); System.out.println("object = " + object); System.out.println("descriptor.getWriteMethod() = " + descriptor.getWriteMethod()); - System.out.println("descriptor.getWriteMethod().getParameterTypes() = " - + descriptor.getWriteMethod().getParameterTypes()); - descriptor.getWriteMethod().invoke(tasker, object); + + if (descriptor.getWriteMethod() == null) { + tasker.updateMessage(java.util.logging.Level.SEVERE, "属性 " + name + " 不可写"); + } else { + System.out.println("descriptor.getWriteMethod().getParameterTypes() = " + + descriptor.getWriteMethod().getParameterTypes()); + descriptor.getWriteMethod().invoke(tasker, object); + } } catch (Exception e) { tasker.updateMessage(java.util.logging.Level.SEVERE, "属性设置失败: " + name + " = " + value); + logger.error("set {} = {}", name, value, e); } } diff --git a/client/src/main/java/com/ecep/contract/task/CompanyCustomerRebuildFilesTasker.java b/client/src/main/java/com/ecep/contract/task/CompanyCustomerRebuildFilesTasker.java index 4132222..a8b7bcc 100644 --- a/client/src/main/java/com/ecep/contract/task/CompanyCustomerRebuildFilesTasker.java +++ b/client/src/main/java/com/ecep/contract/task/CompanyCustomerRebuildFilesTasker.java @@ -15,20 +15,21 @@ public class CompanyCustomerRebuildFilesTasker extends Tasker implements @Getter @Setter private CompanyCustomerVo companyCustomer; - + @Getter + @Setter protected boolean filesUpdated = false; - + @Override public String getTaskName() { return "CompanyCustomerRebuildFilesTasker"; } - + @Override public void updateProgress(long current, long total) { super.updateProgress(current, total); } - + @Override protected Object execute(MessageHolder holder) throws Exception { updateTitle("重建客户文件"); diff --git a/docs/task/client_server_tasker_communication_rules.md b/docs/task/client_server_tasker_communication_rules.md index 131c055..2e1d841 100644 --- a/docs/task/client_server_tasker_communication_rules.md +++ b/docs/task/client_server_tasker_communication_rules.md @@ -12,6 +12,27 @@ - **服务器端重量级**:负责实际业务逻辑处理和数据操作 - **WebSocket通信**:使用WebSocket实现客户端与服务器端的任务通信和进度同步 +### 1.1 核心通信组件 + +#### WebSocketClientService + +`WebSocketClientService`类是客户端WebSocket通信的核心服务组件,负责建立、维护与服务器的WebSocket连接,并提供消息发送和接收的功能。主要职责包括: + +- **连接管理**:初始化WebSocket连接、处理连接关闭和重连逻辑 +- **心跳维护**:定期发送心跳消息保持连接活跃 +- **会话管理**:创建和管理WebSocket会话(`WebSocketClientSession`) +- **消息路由**:接收并路由服务器消息到对应的会话处理器 +- **服务调用**:提供`invoke`方法调用服务器端服务 + +#### WebSocketClientSession + +`WebSocketClientSession`类代表一个特定的WebSocket会话,每个Tasker任务执行时都会创建一个对应的会话实例。主要职责包括: + +- **会话标识**:维护唯一的会话ID +- **任务提交**:将Tasker任务提交到服务器端执行 +- **消息处理**:处理服务器返回的各类消息(包括进度更新、属性更新、状态变更等) +- **属性同步**:根据服务器消息更新Tasker的属性值 + ## 2. 类命名与结构规范 ### 2.1 命名规则 @@ -77,7 +98,7 @@ public void afterPropertiesSet() throws Exception { ### 3.2 核心方法实现 - **getTaskName()**:返回任务名称,通常使用类名 - **updateProgress()**:继承或重写进度更新方法 -- **execute()**:调用`callRemoteTask()`方法将任务发送到服务器端,传递必要参数 +- **execute()**:调用`callRemoteTask()`方法将任务发送到服务器端,传递必要参数, 参数类型只允许基本类和Vo类对象 ### 3.3 示例实现 ```java @@ -117,6 +138,9 @@ public class CompanyCustomerEvaluationFormUpdateTask extends Tasker impl - 实现`execute(MessageHolder holder)`方法包含实际业务逻辑 - 使用`holder.info()/error()`等方法记录任务执行状态 - 调用`updateProgress()`方法更新任务进度 +- 调用`updateTitle()`方法更新任务标题 +- 调用`updateProperty()`方法更新任务属性 +- `holder`、`updateProgress()`、`updateTitle()`和`updateProperty()` 等方法通过 webSocket 把数据传输到 client 端的 tasker,client 端的 tasker 收到数据后会调用对应的方法更新任务属性 ### 4.4 示例实现 ```java @@ -196,12 +220,292 @@ public void onUpdateEvaluationFormAction(ActionEvent event) { - 客户端通过任务对话框展示错误信息 - 服务器端在关键操作点进行异常捕获和处理 -## 8. 数据一致性保障 +## 9. 数据一致性保障 - 任务完成后客户端通常调用数据刷新方法(如`loadTableDataSet()`)确保UI显示最新数据 - 服务器端负责业务数据的持久化操作 -## 9. 最佳实践建议 +## 10. WebSocketServerTaskManager功能与作用 + +`WebSocketServerTaskManager`是服务器端管理WebSocket任务的核心组件,负责处理客户端发起的任务请求,创建并执行对应的任务实例,并维护任务执行过程中的双向通信。 + +### 10.1 核心职责 + +- **任务注册管理**:从`tasker_mapper.json`文件加载并管理任务名称与对应实现类的映射关系 +- **消息处理分发**:处理客户端发送的任务创建请求 +- **任务实例化**:根据任务名称动态创建服务器端Tasker实例 +- **处理器设置**:为Tasker设置各类回调处理器,实现服务器端到客户端的消息推送 +- **异步任务执行**:使用虚拟线程异步执行任务 +- **消息通信**:维护任务执行过程中的双向通信,包括进度更新、状态通知等 +- **异常处理**:处理任务执行过程中的异常情况并通知客户端 + +### 10.2 任务注册机制 + +`WebSocketServerTaskManager`在初始化阶段(通过实现`InitializingBean`接口的`afterPropertiesSet()`方法)从`tasker_mapper.json`文件中加载任务注册信息: + +```java +@Override +public void afterPropertiesSet() throws Exception { + // 从tasker_mapper.json文件中加载任务注册信息 + try { + Resource resource = resourceLoader.getResource("classpath:tasker_mapper.json"); + try (InputStream inputStream = resource.getInputStream()) { + JsonNode rootNode = objectMapper.readTree(inputStream); + JsonNode taskersNode = rootNode.get("taskers"); + if (taskersNode != null && taskersNode.isObject()) { + Map taskMap = new java.util.HashMap<>(); + taskersNode.fields().forEachRemaining(entry -> { + taskMap.put(entry.getKey(), entry.getValue().asText()); + }); + taskClzMap = taskMap; + } + } + } catch (Exception e) { + logger.error("Failed to load tasker_mapper.json", e); + // 使用默认值作为fallback + taskClzMap = Map.of(); + } +} +``` + +#### 10.2 消息处理流程 + +`WebSocketServerTaskManager`的消息处理流程如下: + +1. **接收消息**:通过`onMessage`方法接收来自WebSocketSession的消息 +2. **解析会话ID**:从消息中提取会话标识 +3. **分发处理**:调用`handleAsSessionCallback`根据消息类型进行分发 +4. **异常处理**:捕获处理过程中的异常并发送错误消息 + +```java +public void onMessage(WebSocketSession session, JsonNode jsonNode) { + // 处理 sessionId 的消息 + String sessionId = jsonNode.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText(); + try { + handleAsSessionCallback(session, sessionId, jsonNode); + } catch (Exception e) { + sendError(session, sessionId, e.getMessage()); + logger.warn("处理会话回调失败 (会话ID: {}): {}", sessionId, e.getMessage(), e); + } +} + +private void handleAsSessionCallback(WebSocketSession session, String sessionId, JsonNode jsonNode) { + if (!jsonNode.has("type")) { + throw new IllegalArgumentException("缺失 type 参数"); + } + String type = jsonNode.get("type").asText(); + if (type.equals("createTask")) { + createTask(session, sessionId, jsonNode); + } +} + +private void sendError(WebSocketSession session, String sessionId, String message) { + if (session == null || !session.isOpen()) { + logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message); + return; + } + + try { + String errorMessage = objectMapper.writeValueAsString(Map.of( + WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, + WebSocketConstant.SUCCESS_FIELD_VALUE, false, + WebSocketConstant.MESSAGE_FIELD_NAME, message)); + + // 检查会话状态并尝试发送错误消息 + if (session.isOpen()) { + session.sendMessage(new TextMessage(errorMessage)); + } else { + logger.warn("会话已关闭,无法发送错误消息: {}", message); + } + } catch (Exception e) { + // 捕获所有可能的异常,防止影响主流程 + logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + } +} +``` + + +#### 10.3 任务创建与初始化 + +`createTask()`方法负责创建和初始化任务实例: + +1. **参数提取**:从消息中提取任务名称和参数 +2. **类型查找**:根据任务名称查找对应的类名 +3. **实例创建**:通过反射创建任务实例 +4. **国际化设置**:设置任务的Locale信息 +5. **处理器绑定**:为任务绑定各类消息处理器 +6. **参数初始化**:调用任务的`init()`方法进行参数初始化 +7. **异步执行**:使用虚拟线程启动任务执行 + +```java +private void createTask(WebSocketSession session, String sessionId, JsonNode jsonNode) { + if (!jsonNode.has(WebSocketConstant.ARGUMENTS_FIELD_NAME)) { + throw new IllegalArgumentException("缺失 " + WebSocketConstant.ARGUMENTS_FIELD_NAME + " 参数"); + } + JsonNode argsNode = jsonNode.get(WebSocketConstant.ARGUMENTS_FIELD_NAME); + + String taskName = argsNode.get(0).asText(); + String clzName = taskClzMap.get(taskName); + if (clzName == null) { + throw new IllegalArgumentException("未知的任务类型: " + taskName); + } + Object tasker = null; + try { + Class clz = Class.forName(clzName); + tasker = clz.getDeclaredConstructor().newInstance(); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("未知的任务类型: " + taskName + ", class: " + clzName); + } catch (Exception e) { + throw new IllegalArgumentException("任务类型: " + taskName + ", class: " + clzName + " 实例化失败"); + } + + if (tasker instanceof Tasker t) { + String locale = argsNode.get(1).asText(); + t.setLocale(Locale.forLanguageTag(locale)); + } + + if (tasker instanceof WebSocketServerTasker t) { + t.setTitleHandler(title -> sendToSession(session, sessionId, "title", title)); + t.setMessageHandler(msg -> sendMessageToSession(session, sessionId, msg)); + t.setPropertyHandler((name, value) -> sendToSession(session, sessionId, "property", name, value)); + t.setProgressHandler((current, total) -> sendToSession(session, sessionId, "progress", current, total)); + t.init(argsNode.get(2)); + } + + if (tasker instanceof Callable callable) { + Thread.ofVirtual().start(() -> { + try { + sendToSession(session, sessionId, "start"); + callable.call(); + sendToSession(session, sessionId, "done"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} +``` + +#### 10.4 消息通信机制 + +`WebSocketServerTaskManager`提供了多种消息发送方法,实现服务器端到客户端的通信: + +- **`sendToSession()`**:发送各类消息(title、property、progress、start、done等) +- **`sendMessageToSession()`**:专门发送带级别的消息 +- **`sendError()`**:发送错误消息给客户端 + +```java +private boolean sendToSession(WebSocketSession session, String sessionId, String type, Object... args) { + try { + String text = objectMapper.writeValueAsString(Map.of( + WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, + "type", type, + WebSocketConstant.ARGUMENTS_FIELD_NAME, args)); + session.sendMessage(new TextMessage(text)); + } catch (IOException e) { + // 捕获所有可能的异常,防止影响主流程 + logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + } + return true; +} + +private boolean sendMessageToSession(WebSocketSession session, String sessionId, Message msg) { + return sendToSession(session, sessionId, "message", msg.getLevel().getName(), msg.getMessage()); +} + +private void sendError(WebSocketSession session, String sessionId, String message) { + if (session == null || !session.isOpen()) { + logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message); + return; + } + + try { + String errorMessage = objectMapper.writeValueAsString(Map.of( + WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, + WebSocketConstant.SUCCESS_FIELD_VALUE, false, + WebSocketConstant.MESSAGE_FIELD_NAME, message)); + + // 检查会话状态并尝试发送错误消息 + if (session.isOpen()) { + session.sendMessage(new TextMessage(errorMessage)); + } else { + logger.warn("会话已关闭,无法发送错误消息: {}", message); + } + } catch (Exception e) { + // 捕获所有可能的异常,防止影响主流程 + logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + } +} + +#### 10.5 与WebSocketServerHandler的关系 + +`WebSocketServerTaskManager`与`WebSocketServerHandler`协同工作,共同处理客户端的WebSocket请求: + +- `WebSocketServerHandler`作为WebSocket消息的入口点,接收所有客户端消息 +- 对于与任务相关的消息(如创建任务),`WebSocketServerHandler`将消息转发给`WebSocketServerTaskManager`处理 +- `WebSocketServerTaskManager`负责具体的任务创建、初始化和执行逻辑 +- 两者结合实现了完整的客户端-服务器端Tasker通信机制 + +## 11. WebSocketClientService工作机制 + +### 11.1 连接管理流程 + +`WebSocketClientService`的连接管理流程包括以下几个核心步骤: + +1. **初始化连接**:通过`initWebSocket()`方法创建与服务器的WebSocket连接 +2. **连接状态监控**:使用`online`和`message`属性实时反映连接状态 +3. **自动重连机制**:当连接断开时,通过`scheduleReconnect()`方法安排自动重连 +4. **心跳维护**:通过`startHeartbeat()`和`heartbeat()`方法定期发送ping消息 + +### 11.2 消息处理机制 + +`WebSocketClientService`使用内置的`WebSocketListener`处理各类WebSocket事件: + +- `onOpen`:连接建立时触发,启动心跳并更新连接状态 +- `onMessage`:接收服务器消息时触发,解析并路由消息到对应处理器 +- `onClosing`/`onClosed`:连接关闭时触发,停止心跳并准备重连 +- `onFailure`:连接失败时触发,记录错误并尝试重连 + +### 11.3 消息路由策略 + +`WebSocketClientService`采用以下策略路由接收到的消息: + +1. **回调消息**:如果消息包含`messageId`,则路由到对应的`CompletableFuture`回调 +2. **会话消息**:如果消息包含`sessionId`,则路由到对应的`WebSocketClientSession`处理 +3. **错误消息**:如果消息包含错误码,则按错误类型进行处理(如未授权时自动重新登录) + +## 12. WebSocketClientSession工作机制 + +### 12.1 会话生命周期管理 + +每个`WebSocketClientSession`实例的生命周期包括: + +1. **创建**:通过`WebSocketClientService.createSession()`方法创建 +2. **任务提交**:通过`submitTask()`方法提交Tasker任务到服务器 +3. **消息处理**:处理服务器返回的各类消息 +4. **关闭**:任务完成后通过`close()`方法关闭会话 + +### 12.2 消息类型处理 + +`WebSocketClientSession`能够处理以下几种主要的消息类型: + +- **message**:普通文本消息,通过`handleAsMessage()`处理 +- **title**:任务标题更新,通过`handleAsTitle()`处理 +- **property**:任务属性更新,通过`handleAsProperty()`处理 +- **progress**:任务进度更新,通过`handleAsProgress()`处理 +- **start**:任务开始通知,通过`handleAsStart()`处理 +- **done**:任务完成通知,通过`handleAsDone()`处理 + +### 12.3 属性同步机制 + +`WebSocketClientSession`的`handleAsProperty()`方法实现了服务器到客户端的属性同步机制: + +1. 接收属性名和属性值 +2. 通过Java反射获取Tasker类的属性描述符 +3. 将接收到的属性值转换为正确的Java类型 +4. 调用属性的setter方法更新Tasker实例的属性值 + +## 13. 最佳实践建议 1. **任务拆分**:复杂任务应拆分为多个小任务,便于进度跟踪和错误定位 2. **状态反馈**:在关键节点提供清晰的状态信息,增强用户体验 @@ -211,5 +515,7 @@ public void onUpdateEvaluationFormAction(ActionEvent event) { 6. **配置管理**:使用`tasker_mapper.json`文件统一管理任务注册信息,避免在代码中硬编码任务映射 7. **异常处理**:确保实现配置文件加载失败的fallback机制,保证系统稳定性 8. **版本控制**:对任务配置文件进行版本控制,便于追踪变更历史 +9. **会话管理**:确保任务完成后正确关闭WebSocket会话,避免资源泄露 +10. **属性同步**:合理使用属性同步机制,确保客户端显示与服务器状态一致 通过遵循以上规则和模式,可以确保Contract-Manager项目中客户端与服务器端Tasker通信的一致性、可靠性和可维护性。 \ No newline at end of file diff --git a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerEvaluationFormUpdateTask.java b/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerEvaluationFormUpdateTask.java index ecba303..f2d319b 100644 --- a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerEvaluationFormUpdateTask.java +++ b/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerEvaluationFormUpdateTask.java @@ -24,8 +24,8 @@ import com.ecep.contract.CustomerFileType; import com.ecep.contract.MessageHolder; import com.ecep.contract.SpringApp; import com.ecep.contract.cloud.tyc.CloudTycService; -import com.ecep.contract.ds.customer.service.CompanyCustomerFileService; import com.ecep.contract.ds.customer.service.CompanyCustomerEvaluationFormFileService; +import com.ecep.contract.ds.customer.service.CompanyCustomerFileService; import com.ecep.contract.ds.customer.service.CompanyCustomerService; import com.ecep.contract.model.CloudTyc; import com.ecep.contract.model.Company; @@ -35,8 +35,6 @@ import com.ecep.contract.model.CompanyCustomerFile; import com.ecep.contract.service.WebSocketServerTasker; import com.ecep.contract.ui.Tasker; import com.ecep.contract.util.CompanyUtils; -import com.ecep.contract.vo.CompanyCustomerEvaluationFormFileVo; -import com.ecep.contract.vo.CompanyCustomerFileVo; import com.fasterxml.jackson.databind.JsonNode; public class CompanyCustomerEvaluationFormUpdateTask extends Tasker implements WebSocketServerTasker { diff --git a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerRebuildFilesTasker.java b/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerRebuildFilesTasker.java index 910621e..6f96d8c 100644 --- a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerRebuildFilesTasker.java +++ b/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerRebuildFilesTasker.java @@ -30,6 +30,10 @@ public class CompanyCustomerRebuildFilesTasker extends Tasker implements customer = getCompanyCustomerService().findById(customerId); } + void setFilesUpdated(boolean filesUpdated) { + updateProperty("filesUpdated", filesUpdated); + } + @Override protected Object execute(MessageHolder holder) throws Exception { updateTitle("重建客户文件"); @@ -59,10 +63,10 @@ public class CompanyCustomerRebuildFilesTasker extends Tasker implements if (result) { holder.info("客户文件重建成功"); - updateProperty("filesUpdated", true); + setFilesUpdated(true); } else { holder.info("客户文件重建完成,但没有更新任何文件"); - updateProperty("filesUpdated", false); + setFilesUpdated(false); } updateProgress(100, 100); diff --git a/server/src/main/java/com/ecep/contract/ds/project/ProjectCostImportItemsFromContractsTasker.java b/server/src/main/java/com/ecep/contract/ds/project/ProjectCostImportItemsFromContractsTasker.java index 875d2e8..ee153d9 100644 --- a/server/src/main/java/com/ecep/contract/ds/project/ProjectCostImportItemsFromContractsTasker.java +++ b/server/src/main/java/com/ecep/contract/ds/project/ProjectCostImportItemsFromContractsTasker.java @@ -52,7 +52,7 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker im @Override protected Void execute(MessageHolder holder) throws Exception { - importFromContracts(cost, holder); + importFromContracts(cost, holder); return null; } diff --git a/server/src/main/resources/tasker_mapper.json b/server/src/main/resources/tasker_mapper.json index d76eb0c..f25e3bd 100644 --- a/server/src/main/resources/tasker_mapper.json +++ b/server/src/main/resources/tasker_mapper.json @@ -7,5 +7,6 @@ "CompanyCustomerEvaluationFormUpdateTask": "com.ecep.contract.ds.customer.tasker.CompanyCustomerEvaluationFormUpdateTask", "CompanyCustomerNextSignDateTask": "com.ecep.contract.ds.customer.tasker.CompanyCustomerNextSignDateTask", "CompanyCustomerRebuildFilesTasker": "com.ecep.contract.ds.customer.tasker.CompanyCustomerRebuildFilesTasker" - } + }, + "descriptions": "任务注册信息" } \ No newline at end of file