docs(task): 更新任务通信规则文档并添加任务注册描述

添加任务注册信息的描述字段到tasker_mapper.json
完善WebSocket通信机制文档,补充核心组件说明
修正属性同步机制中的空指针问题
优化代码格式和注释
This commit is contained in:
2025-09-25 09:56:27 +08:00
parent 2057c3ca67
commit ad42a49858
7 changed files with 351 additions and 24 deletions

View File

@@ -1,20 +1,31 @@
package com.ecep.contract;
import com.ecep.contract.constant.WebSocketConstant;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Getter;
import org.springframework.beans.BeanUtils;
import java.beans.PropertyDescriptor;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import com.ecep.contract.constant.WebSocketConstant;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Getter;
/**
*
*/
public class WebSocketClientSession {
private static final Logger logger = LoggerFactory.getLogger(WebSocketClientSession.class);
/**
* 会话ID由客户端创建服务器不保存会话只回传会话ID
*/
@Getter
private String sessionId = UUID.randomUUID().toString();
private final String sessionId = UUID.randomUUID().toString();
private WebSocketClientTasker tasker;
@@ -90,11 +101,17 @@ public class WebSocketClientSession {
Object object = webSocketService.getObjectMapper().convertValue(value, descriptor.getPropertyType());
System.out.println("object = " + object);
System.out.println("descriptor.getWriteMethod() = " + descriptor.getWriteMethod());
System.out.println("descriptor.getWriteMethod().getParameterTypes() = "
+ descriptor.getWriteMethod().getParameterTypes());
descriptor.getWriteMethod().invoke(tasker, object);
if (descriptor.getWriteMethod() == null) {
tasker.updateMessage(java.util.logging.Level.SEVERE, "属性 " + name + " 不可写");
} else {
System.out.println("descriptor.getWriteMethod().getParameterTypes() = "
+ descriptor.getWriteMethod().getParameterTypes());
descriptor.getWriteMethod().invoke(tasker, object);
}
} catch (Exception e) {
tasker.updateMessage(java.util.logging.Level.SEVERE, "属性设置失败: " + name + " = " + value);
logger.error("set {} = {}", name, value, e);
}
}

View File

@@ -15,20 +15,21 @@ public class CompanyCustomerRebuildFilesTasker extends Tasker<Object> implements
@Getter
@Setter
private CompanyCustomerVo companyCustomer;
@Getter
@Setter
protected boolean filesUpdated = false;
@Override
public String getTaskName() {
return "CompanyCustomerRebuildFilesTasker";
}
@Override
public void updateProgress(long current, long total) {
super.updateProgress(current, total);
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("重建客户文件");

View File

@@ -12,6 +12,27 @@
- **服务器端重量级**:负责实际业务逻辑处理和数据操作
- **WebSocket通信**使用WebSocket实现客户端与服务器端的任务通信和进度同步
### 1.1 核心通信组件
#### WebSocketClientService
`WebSocketClientService`类是客户端WebSocket通信的核心服务组件负责建立、维护与服务器的WebSocket连接并提供消息发送和接收的功能。主要职责包括
- **连接管理**初始化WebSocket连接、处理连接关闭和重连逻辑
- **心跳维护**:定期发送心跳消息保持连接活跃
- **会话管理**创建和管理WebSocket会话`WebSocketClientSession`
- **消息路由**:接收并路由服务器消息到对应的会话处理器
- **服务调用**:提供`invoke`方法调用服务器端服务
#### WebSocketClientSession
`WebSocketClientSession`类代表一个特定的WebSocket会话每个Tasker任务执行时都会创建一个对应的会话实例。主要职责包括
- **会话标识**维护唯一的会话ID
- **任务提交**将Tasker任务提交到服务器端执行
- **消息处理**:处理服务器返回的各类消息(包括进度更新、属性更新、状态变更等)
- **属性同步**根据服务器消息更新Tasker的属性值
## 2. 类命名与结构规范
### 2.1 命名规则
@@ -77,7 +98,7 @@ public void afterPropertiesSet() throws Exception {
### 3.2 核心方法实现
- **getTaskName()**返回任务名称通常使用类名
- **updateProgress()**继承或重写进度更新方法
- **execute()**调用`callRemoteTask()`方法将任务发送到服务器端传递必要参数
- **execute()**调用`callRemoteTask()`方法将任务发送到服务器端传递必要参数, 参数类型只允许基本类和Vo类对象
### 3.3 示例实现
```java
@@ -117,6 +138,9 @@ public class CompanyCustomerEvaluationFormUpdateTask extends Tasker<Object> impl
- 实现`execute(MessageHolder holder)`方法包含实际业务逻辑
- 使用`holder.info()/error()`等方法记录任务执行状态
- 调用`updateProgress()`方法更新任务进度
- 调用`updateTitle()`方法更新任务标题
- 调用`updateProperty()`方法更新任务属性
- `holder``updateProgress()``updateTitle()``updateProperty()` 等方法通过 webSocket 把数据传输到 client 端的 taskerclient 端的 tasker 收到数据后会调用对应的方法更新任务属性
### 4.4 示例实现
```java
@@ -196,12 +220,292 @@ public void onUpdateEvaluationFormAction(ActionEvent event) {
- 客户端通过任务对话框展示错误信息
- 服务器端在关键操作点进行异常捕获和处理
## 8. 数据一致性保障
## 9. 数据一致性保障
- 任务完成后客户端通常调用数据刷新方法(如`loadTableDataSet()`确保UI显示最新数据
- 服务器端负责业务数据的持久化操作
## 9. 最佳实践建议
## 10. WebSocketServerTaskManager功能与作用
`WebSocketServerTaskManager`是服务器端管理WebSocket任务的核心组件负责处理客户端发起的任务请求创建并执行对应的任务实例并维护任务执行过程中的双向通信。
### 10.1 核心职责
- **任务注册管理**:从`tasker_mapper.json`文件加载并管理任务名称与对应实现类的映射关系
- **消息处理分发**:处理客户端发送的任务创建请求
- **任务实例化**根据任务名称动态创建服务器端Tasker实例
- **处理器设置**为Tasker设置各类回调处理器实现服务器端到客户端的消息推送
- **异步任务执行**:使用虚拟线程异步执行任务
- **消息通信**:维护任务执行过程中的双向通信,包括进度更新、状态通知等
- **异常处理**:处理任务执行过程中的异常情况并通知客户端
### 10.2 任务注册机制
`WebSocketServerTaskManager`在初始化阶段(通过实现`InitializingBean`接口的`afterPropertiesSet()`方法)从`tasker_mapper.json`文件中加载任务注册信息:
```java
@Override
public void afterPropertiesSet() throws Exception {
// 从tasker_mapper.json文件中加载任务注册信息
try {
Resource resource = resourceLoader.getResource("classpath:tasker_mapper.json");
try (InputStream inputStream = resource.getInputStream()) {
JsonNode rootNode = objectMapper.readTree(inputStream);
JsonNode taskersNode = rootNode.get("taskers");
if (taskersNode != null && taskersNode.isObject()) {
Map<String, String> taskMap = new java.util.HashMap<>();
taskersNode.fields().forEachRemaining(entry -> {
taskMap.put(entry.getKey(), entry.getValue().asText());
});
taskClzMap = taskMap;
}
}
} catch (Exception e) {
logger.error("Failed to load tasker_mapper.json", e);
// 使用默认值作为fallback
taskClzMap = Map.of();
}
}
```
#### 10.2 消息处理流程
`WebSocketServerTaskManager`的消息处理流程如下:
1. **接收消息**:通过`onMessage`方法接收来自WebSocketSession的消息
2. **解析会话ID**:从消息中提取会话标识
3. **分发处理**:调用`handleAsSessionCallback`根据消息类型进行分发
4. **异常处理**:捕获处理过程中的异常并发送错误消息
```java
public void onMessage(WebSocketSession session, JsonNode jsonNode) {
// 处理 sessionId 的消息
String sessionId = jsonNode.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText();
try {
handleAsSessionCallback(session, sessionId, jsonNode);
} catch (Exception e) {
sendError(session, sessionId, e.getMessage());
logger.warn("处理会话回调失败 (会话ID: {}): {}", sessionId, e.getMessage(), e);
}
}
private void handleAsSessionCallback(WebSocketSession session, String sessionId, JsonNode jsonNode) {
if (!jsonNode.has("type")) {
throw new IllegalArgumentException("缺失 type 参数");
}
String type = jsonNode.get("type").asText();
if (type.equals("createTask")) {
createTask(session, sessionId, jsonNode);
}
}
private void sendError(WebSocketSession session, String sessionId, String message) {
if (session == null || !session.isOpen()) {
logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message);
return;
}
try {
String errorMessage = objectMapper.writeValueAsString(Map.of(
WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
WebSocketConstant.SUCCESS_FIELD_VALUE, false,
WebSocketConstant.MESSAGE_FIELD_NAME, message));
// 检查会话状态并尝试发送错误消息
if (session.isOpen()) {
session.sendMessage(new TextMessage(errorMessage));
} else {
logger.warn("会话已关闭,无法发送错误消息: {}", message);
}
} catch (Exception e) {
// 捕获所有可能的异常,防止影响主流程
logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
}
}
```
#### 10.3 任务创建与初始化
`createTask()`方法负责创建和初始化任务实例:
1. **参数提取**:从消息中提取任务名称和参数
2. **类型查找**:根据任务名称查找对应的类名
3. **实例创建**:通过反射创建任务实例
4. **国际化设置**设置任务的Locale信息
5. **处理器绑定**:为任务绑定各类消息处理器
6. **参数初始化**:调用任务的`init()`方法进行参数初始化
7. **异步执行**:使用虚拟线程启动任务执行
```java
private void createTask(WebSocketSession session, String sessionId, JsonNode jsonNode) {
if (!jsonNode.has(WebSocketConstant.ARGUMENTS_FIELD_NAME)) {
throw new IllegalArgumentException("缺失 " + WebSocketConstant.ARGUMENTS_FIELD_NAME + " 参数");
}
JsonNode argsNode = jsonNode.get(WebSocketConstant.ARGUMENTS_FIELD_NAME);
String taskName = argsNode.get(0).asText();
String clzName = taskClzMap.get(taskName);
if (clzName == null) {
throw new IllegalArgumentException("未知的任务类型: " + taskName);
}
Object tasker = null;
try {
Class<?> clz = Class.forName(clzName);
tasker = clz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("未知的任务类型: " + taskName + ", class: " + clzName);
} catch (Exception e) {
throw new IllegalArgumentException("任务类型: " + taskName + ", class: " + clzName + " 实例化失败");
}
if (tasker instanceof Tasker<?> t) {
String locale = argsNode.get(1).asText();
t.setLocale(Locale.forLanguageTag(locale));
}
if (tasker instanceof WebSocketServerTasker t) {
t.setTitleHandler(title -> sendToSession(session, sessionId, "title", title));
t.setMessageHandler(msg -> sendMessageToSession(session, sessionId, msg));
t.setPropertyHandler((name, value) -> sendToSession(session, sessionId, "property", name, value));
t.setProgressHandler((current, total) -> sendToSession(session, sessionId, "progress", current, total));
t.init(argsNode.get(2));
}
if (tasker instanceof Callable<?> callable) {
Thread.ofVirtual().start(() -> {
try {
sendToSession(session, sessionId, "start");
callable.call();
sendToSession(session, sessionId, "done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
```
#### 10.4 消息通信机制
`WebSocketServerTaskManager`提供了多种消息发送方法,实现服务器端到客户端的通信:
- **`sendToSession()`**发送各类消息title、property、progress、start、done等
- **`sendMessageToSession()`**:专门发送带级别的消息
- **`sendError()`**:发送错误消息给客户端
```java
private boolean sendToSession(WebSocketSession session, String sessionId, String type, Object... args) {
try {
String text = objectMapper.writeValueAsString(Map.of(
WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
"type", type,
WebSocketConstant.ARGUMENTS_FIELD_NAME, args));
session.sendMessage(new TextMessage(text));
} catch (IOException e) {
// 捕获所有可能的异常,防止影响主流程
logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
}
return true;
}
private boolean sendMessageToSession(WebSocketSession session, String sessionId, Message msg) {
return sendToSession(session, sessionId, "message", msg.getLevel().getName(), msg.getMessage());
}
private void sendError(WebSocketSession session, String sessionId, String message) {
if (session == null || !session.isOpen()) {
logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message);
return;
}
try {
String errorMessage = objectMapper.writeValueAsString(Map.of(
WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
WebSocketConstant.SUCCESS_FIELD_VALUE, false,
WebSocketConstant.MESSAGE_FIELD_NAME, message));
// 检查会话状态并尝试发送错误消息
if (session.isOpen()) {
session.sendMessage(new TextMessage(errorMessage));
} else {
logger.warn("会话已关闭,无法发送错误消息: {}", message);
}
} catch (Exception e) {
// 捕获所有可能的异常,防止影响主流程
logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
}
}
#### 10.5 与WebSocketServerHandler的关系
`WebSocketServerTaskManager`与`WebSocketServerHandler`协同工作共同处理客户端的WebSocket请求
- `WebSocketServerHandler`作为WebSocket消息的入口点接收所有客户端消息
- 对于与任务相关的消息(如创建任务),`WebSocketServerHandler`将消息转发给`WebSocketServerTaskManager`处理
- `WebSocketServerTaskManager`负责具体的任务创建、初始化和执行逻辑
- 两者结合实现了完整的客户端-服务器端Tasker通信机制
## 11. WebSocketClientService工作机制
### 11.1 连接管理流程
`WebSocketClientService`的连接管理流程包括以下几个核心步骤:
1. **初始化连接**:通过`initWebSocket()`方法创建与服务器的WebSocket连接
2. **连接状态监控**:使用`online`和`message`属性实时反映连接状态
3. **自动重连机制**:当连接断开时,通过`scheduleReconnect()`方法安排自动重连
4. **心跳维护**:通过`startHeartbeat()`和`heartbeat()`方法定期发送ping消息
### 11.2 消息处理机制
`WebSocketClientService`使用内置的`WebSocketListener`处理各类WebSocket事件
- `onOpen`:连接建立时触发,启动心跳并更新连接状态
- `onMessage`:接收服务器消息时触发,解析并路由消息到对应处理器
- `onClosing`/`onClosed`:连接关闭时触发,停止心跳并准备重连
- `onFailure`:连接失败时触发,记录错误并尝试重连
### 11.3 消息路由策略
`WebSocketClientService`采用以下策略路由接收到的消息:
1. **回调消息**:如果消息包含`messageId`,则路由到对应的`CompletableFuture`回调
2. **会话消息**:如果消息包含`sessionId`,则路由到对应的`WebSocketClientSession`处理
3. **错误消息**:如果消息包含错误码,则按错误类型进行处理(如未授权时自动重新登录)
## 12. WebSocketClientSession工作机制
### 12.1 会话生命周期管理
每个`WebSocketClientSession`实例的生命周期包括:
1. **创建**:通过`WebSocketClientService.createSession()`方法创建
2. **任务提交**:通过`submitTask()`方法提交Tasker任务到服务器
3. **消息处理**:处理服务器返回的各类消息
4. **关闭**:任务完成后通过`close()`方法关闭会话
### 12.2 消息类型处理
`WebSocketClientSession`能够处理以下几种主要的消息类型:
- **message**:普通文本消息,通过`handleAsMessage()`处理
- **title**:任务标题更新,通过`handleAsTitle()`处理
- **property**:任务属性更新,通过`handleAsProperty()`处理
- **progress**:任务进度更新,通过`handleAsProgress()`处理
- **start**:任务开始通知,通过`handleAsStart()`处理
- **done**:任务完成通知,通过`handleAsDone()`处理
### 12.3 属性同步机制
`WebSocketClientSession`的`handleAsProperty()`方法实现了服务器到客户端的属性同步机制:
1. 接收属性名和属性值
2. 通过Java反射获取Tasker类的属性描述符
3. 将接收到的属性值转换为正确的Java类型
4. 调用属性的setter方法更新Tasker实例的属性值
## 13. 最佳实践建议
1. **任务拆分**:复杂任务应拆分为多个小任务,便于进度跟踪和错误定位
2. **状态反馈**:在关键节点提供清晰的状态信息,增强用户体验
@@ -211,5 +515,7 @@ public void onUpdateEvaluationFormAction(ActionEvent event) {
6. **配置管理**:使用`tasker_mapper.json`文件统一管理任务注册信息,避免在代码中硬编码任务映射
7. **异常处理**确保实现配置文件加载失败的fallback机制保证系统稳定性
8. **版本控制**:对任务配置文件进行版本控制,便于追踪变更历史
9. **会话管理**确保任务完成后正确关闭WebSocket会话避免资源泄露
10. **属性同步**:合理使用属性同步机制,确保客户端显示与服务器状态一致
通过遵循以上规则和模式可以确保Contract-Manager项目中客户端与服务器端Tasker通信的一致性、可靠性和可维护性。

View File

@@ -24,8 +24,8 @@ import com.ecep.contract.CustomerFileType;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.SpringApp;
import com.ecep.contract.cloud.tyc.CloudTycService;
import com.ecep.contract.ds.customer.service.CompanyCustomerFileService;
import com.ecep.contract.ds.customer.service.CompanyCustomerEvaluationFormFileService;
import com.ecep.contract.ds.customer.service.CompanyCustomerFileService;
import com.ecep.contract.ds.customer.service.CompanyCustomerService;
import com.ecep.contract.model.CloudTyc;
import com.ecep.contract.model.Company;
@@ -35,8 +35,6 @@ import com.ecep.contract.model.CompanyCustomerFile;
import com.ecep.contract.service.WebSocketServerTasker;
import com.ecep.contract.ui.Tasker;
import com.ecep.contract.util.CompanyUtils;
import com.ecep.contract.vo.CompanyCustomerEvaluationFormFileVo;
import com.ecep.contract.vo.CompanyCustomerFileVo;
import com.fasterxml.jackson.databind.JsonNode;
public class CompanyCustomerEvaluationFormUpdateTask extends Tasker<Object> implements WebSocketServerTasker {

View File

@@ -30,6 +30,10 @@ public class CompanyCustomerRebuildFilesTasker extends Tasker<Object> implements
customer = getCompanyCustomerService().findById(customerId);
}
void setFilesUpdated(boolean filesUpdated) {
updateProperty("filesUpdated", filesUpdated);
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("重建客户文件");
@@ -59,10 +63,10 @@ public class CompanyCustomerRebuildFilesTasker extends Tasker<Object> implements
if (result) {
holder.info("客户文件重建成功");
updateProperty("filesUpdated", true);
setFilesUpdated(true);
} else {
holder.info("客户文件重建完成,但没有更新任何文件");
updateProperty("filesUpdated", false);
setFilesUpdated(false);
}
updateProgress(100, 100);

View File

@@ -52,7 +52,7 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker<Object> im
@Override
protected Void execute(MessageHolder holder) throws Exception {
importFromContracts(cost, holder);
importFromContracts(cost, holder);
return null;
}

View File

@@ -7,5 +7,6 @@
"CompanyCustomerEvaluationFormUpdateTask": "com.ecep.contract.ds.customer.tasker.CompanyCustomerEvaluationFormUpdateTask",
"CompanyCustomerNextSignDateTask": "com.ecep.contract.ds.customer.tasker.CompanyCustomerNextSignDateTask",
"CompanyCustomerRebuildFilesTasker": "com.ecep.contract.ds.customer.tasker.CompanyCustomerRebuildFilesTasker"
}
},
"descriptions": "任务注册信息"
}