重构合同文件管理逻辑,增加错误处理和日志记录 新增ContractBalance实体、Repository和VO类 完善Voable接口文档和实现规范 更新项目架构文档和数据库设计 修复SmbFileService的连接问题 移动合同相关TabSkin类到contract包 添加合同文件重建任务的WebSocket支持
460 lines
14 KiB
Markdown
460 lines
14 KiB
Markdown
# WebSocketServerTasker 接口实现规范
|
||
|
||
## 1. 概述
|
||
|
||
本文档总结了实现 `WebSocketServerTasker` 接口的标准做法和最佳实践,基于对项目中已有实现类的分析。该接口用于服务器端实现支持WebSocket通信的任务处理器,实现异步任务的执行和状态监控。
|
||
|
||
## 2. WebSocketServerTasker 接口介绍
|
||
|
||
`WebSocketServerTasker` 接口位于 `com.ecep.contract.service.tasker` 包下,继承自 `java.util.concurrent.Callable<Object>`,定义了以下核心方法:
|
||
|
||
```java
|
||
public interface WebSocketServerTasker extends Callable<Object> {
|
||
// 初始化任务,处理传入的参数
|
||
void init(JsonNode argsNode);
|
||
|
||
// 设置会话信息(默认实现为空)
|
||
default void setSession(SessionInfo session) {}
|
||
|
||
// 设置消息处理函数
|
||
void setMessageHandler(Predicate<Message> messageHandler);
|
||
|
||
// 设置标题处理函数
|
||
void setTitleHandler(Predicate<String> titleHandler);
|
||
|
||
// 设置属性处理函数
|
||
void setPropertyHandler(BiConsumer<String, Object> propertyHandler);
|
||
|
||
// 设置进度处理函数
|
||
void setProgressHandler(BiConsumer<Long, Long> progressHandler);
|
||
}
|
||
```
|
||
|
||
## 3. 基础实现模式
|
||
|
||
### 3.1 推荐实现方式
|
||
|
||
通过分析项目中的实现类,推荐以下标准实现模式:
|
||
|
||
1. **继承 Tasker<Object> 并实现 WebSocketServerTasker 接口**
|
||
|
||
```java
|
||
public class YourTasker extends Tasker<Object> implements WebSocketServerTasker {
|
||
// 实现代码
|
||
}
|
||
```
|
||
|
||
2. **在 Tasker 基类中已实现的方法**
|
||
|
||
`Tasker` 类已经提供了 `WebSocketServerTasker` 接口中大部分方法的实现,包括:
|
||
- `setMessageHandler`
|
||
- `setTitleHandler`
|
||
- `setPropertyHandler`
|
||
- `setProgressHandler`
|
||
- `call()` 方法(实现了 `Callable` 接口)
|
||
|
||
这使得实现类只需要关注特定业务逻辑的实现。
|
||
|
||
## 3.2 WebSocketServerTasker 注册配置
|
||
|
||
要使WebSocketServerTasker能够被客户端通过WebSocket调用,必须在`tasker_mapper.json`配置文件中进行注册。
|
||
|
||
### 3.2.1 注册配置文件
|
||
|
||
配置文件位置:`server/src/main/resources/tasker_mapper.json`
|
||
|
||
### 3.2.2 注册格式
|
||
|
||
注册格式为JSON对象,包含一个`tasks`字段,该字段是一个键值对映射,其中:
|
||
- **键**:任务名称(客户端通过此名称调用任务)
|
||
- **值**:任务类的完全限定名
|
||
|
||
```json
|
||
{
|
||
"tasks": {
|
||
"任务名称": "任务类的完全限定名",
|
||
"ContractVerifyTasker": "com.ecep.contract.service.tasker.ContractVerifyTasker"
|
||
},
|
||
"descriptions": "任务注册信息, 客户端的任务可以通过 WebSocket 调用"
|
||
}
|
||
```
|
||
|
||
### 3.2.3 注册示例
|
||
|
||
假设我们创建了一个名为`CustomTasker`的新任务类,其完全限定名为`com.ecep.contract.service.tasker.CustomTasker`,则注册方式如下:
|
||
|
||
```json
|
||
{
|
||
"tasks": {
|
||
"CustomTasker": "com.ecep.contract.service.tasker.CustomTasker"
|
||
// 其他已注册任务...
|
||
},
|
||
"descriptions": "任务注册信息, 客户端的任务可以通过 WebSocket 调用"
|
||
}
|
||
```
|
||
|
||
### 3.2.4 注册机制说明
|
||
|
||
WebSocketServerTaskManager类在启动时会读取`tasker_mapper.json`文件,初始化任务类映射表。当客户端请求执行任务时,系统会:
|
||
|
||
1. 根据客户端提供的任务名称从映射表中查找对应的任务类
|
||
2. 使用反射机制实例化任务对象
|
||
3. 设置WebSocket会话和各类处理器
|
||
4. 执行任务并通过WebSocket返回结果
|
||
|
||
## 4. ContractRepairAllTasker 升级经验
|
||
|
||
通过分析 `ContractRepairAllTasker` 的实现,我们总结了以下升级经验:
|
||
|
||
### 4.1 升级步骤
|
||
|
||
1. **继承适当的基础类**:根据业务需求选择继承 `Tasker<Object>` 或其他特定的抽象类(如 `AbstContractRepairTasker`)
|
||
|
||
2. **实现 WebSocketServerTasker 接口**:声明实现该接口,自动继承接口定义的方法
|
||
|
||
3. **实现 init 方法**:处理任务初始化和参数解析
|
||
|
||
```java
|
||
@Override
|
||
public void init(JsonNode argsNode) {
|
||
// 解析参数或初始化任务状态
|
||
// 如果 Client 没有传递参数,就不做处理
|
||
// do nothing
|
||
|
||
// 如果有参数,正确做法:检查参数有效性并安全解析
|
||
if (argsNode != null && argsNode.size() > 0) {
|
||
ContractService contractService = getCachedBean(ContractService.class);
|
||
int contractId = argsNode.get(0).asInt();
|
||
this.contract = contractService.findById(contractId);
|
||
}
|
||
}
|
||
```
|
||
|
||
4. **实现或重写 execute 方法**:提供具体的业务逻辑实现
|
||
|
||
```java
|
||
@Override
|
||
protected Object execute(MessageHolder holder) throws Exception {
|
||
// 调用父类初始化
|
||
super.execute(holder);
|
||
// 执行具体业务逻辑
|
||
repair(holder);
|
||
return null; // 通常返回 null 或处理结果
|
||
}
|
||
```
|
||
|
||
5. **使用更新方法提供状态反馈**:在执行过程中使用 `updateTitle`、`updateProgress` 等方法提供实时反馈
|
||
|
||
```java
|
||
updateTitle("同步修复所有合同");
|
||
updateProgress(counter.incrementAndGet(), total);
|
||
```
|
||
|
||
6. **支持任务取消**:定期检查 `isCancelled()` 方法,支持任务的取消操作
|
||
|
||
```java
|
||
if (isCancelled()) {
|
||
break;
|
||
}
|
||
```
|
||
|
||
### 4.2 关键经验
|
||
|
||
1. **消息处理**:通过 `MessageHolder` 处理和记录执行过程中的消息,方便调试和错误追踪
|
||
|
||
2. **进度更新**:对于批量操作,使用计数器和总数定期更新进度,提供良好的用户体验
|
||
|
||
3. **异常处理**:在循环处理中捕获异常并记录,确保单个项目的失败不会导致整个任务失败
|
||
|
||
4. **分页处理**:对于大量数据的处理,使用分页查询避免内存溢出
|
||
|
||
## 5. 共同模式和最佳实践
|
||
|
||
通过分析项目中的17个实现类,我们总结了以下共同模式和最佳实践:
|
||
|
||
### 5.1 共同模式
|
||
|
||
1. **继承结构**:所有实现类都继承 `Tasker<Object>` 并实现 `WebSocketServerTasker` 接口
|
||
|
||
2. **核心方法实现**:
|
||
- 实现 `init(JsonNode argsNode)` 方法处理参数
|
||
- 实现或重写 `execute(MessageHolder holder)` 方法实现业务逻辑
|
||
|
||
3. **状态更新**:使用 `updateTitle`、`updateMessage`、`updateProgress` 等方法更新任务状态
|
||
|
||
4. **Bean获取**:使用 `getCachedBean` 方法获取Spring管理的Bean
|
||
|
||
```java
|
||
ContractService contractService = getCachedBean(ContractService.class);
|
||
```
|
||
|
||
### 5.2 最佳实践
|
||
|
||
1. **参数验证**:在 `init` 方法中验证和转换输入参数
|
||
|
||
2. **进度反馈**:对于长时间运行的任务,提供合理的进度更新
|
||
|
||
3. **异常处理**:捕获并处理特定异常,提供有意义的错误信息
|
||
|
||
4. **取消支持**:实现任务取消机制,定期检查 `isCancelled()` 状态
|
||
|
||
5. **资源清理**:在任务结束或取消时清理资源
|
||
|
||
6. **日志记录**:使用 `slf4j` 记录关键操作和异常信息
|
||
|
||
7. **属性传递**:使用 `updateProperty` 方法传递任务执行结果或状态
|
||
|
||
```java
|
||
updateProperty("passed", passed);
|
||
```
|
||
|
||
## 6. 完整实现示例
|
||
|
||
### 6.1 简单任务实现示例
|
||
|
||
```java
|
||
package com.ecep.contract.service.tasker;
|
||
|
||
import com.ecep.contract.MessageHolder;
|
||
import com.ecep.contract.ds.contract.service.ContractService;
|
||
import com.ecep.contract.ui.Tasker;
|
||
import com.fasterxml.jackson.databind.JsonNode;
|
||
|
||
import lombok.Getter;
|
||
import lombok.Setter;
|
||
|
||
public class SimpleTasker extends Tasker<Object> implements WebSocketServerTasker {
|
||
@Getter
|
||
@Setter
|
||
private int entityId;
|
||
@Getter
|
||
@Setter
|
||
private boolean success = false;
|
||
|
||
@Override
|
||
public void init(JsonNode argsNode) {
|
||
// 解析参数
|
||
this.entityId = argsNode.get(0).asInt();
|
||
}
|
||
|
||
@Override
|
||
protected Object execute(MessageHolder holder) throws Exception {
|
||
// 更新标题
|
||
updateTitle("执行简单任务:" + entityId);
|
||
|
||
try {
|
||
// 更新进度
|
||
updateProgress(1, 3);
|
||
|
||
// 执行业务逻辑
|
||
ContractService service = getCachedBean(ContractService.class);
|
||
updateProgress(2, 3);
|
||
|
||
// 处理结果
|
||
success = true;
|
||
holder.info("任务执行成功");
|
||
|
||
// 更新最终进度
|
||
updateProgress(3, 3);
|
||
|
||
// 更新结果属性
|
||
updateProperty("success", success);
|
||
} catch (Exception e) {
|
||
holder.error("任务执行失败: " + e.getMessage());
|
||
success = false;
|
||
updateProperty("success", success);
|
||
throw e;
|
||
}
|
||
|
||
return null;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 6.2 批量处理任务实现示例
|
||
|
||
```java
|
||
package com.ecep.contract.service.tasker;
|
||
|
||
import java.util.List;
|
||
import java.util.concurrent.atomic.AtomicInteger;
|
||
|
||
import com.ecep.contract.MessageHolder;
|
||
import com.ecep.contract.ds.contract.service.ContractService;
|
||
import com.ecep.contract.ds.contract.model.Contract;
|
||
import com.ecep.contract.ui.Tasker;
|
||
import com.fasterxml.jackson.databind.JsonNode;
|
||
|
||
public class BatchProcessTasker extends Tasker<Object> implements WebSocketServerTasker {
|
||
private ContractService contractService;
|
||
private List<Integer> entityIds;
|
||
|
||
@Override
|
||
public void init(JsonNode argsNode) {
|
||
// 初始化服务引用
|
||
this.contractService = getCachedBean(ContractService.class);
|
||
|
||
// 解析实体ID列表
|
||
entityIds = new ArrayList<>();
|
||
for (JsonNode node : argsNode) {
|
||
entityIds.add(node.asInt());
|
||
}
|
||
}
|
||
|
||
@Override
|
||
protected Object execute(MessageHolder holder) throws Exception {
|
||
updateTitle("批量处理实体任务");
|
||
|
||
AtomicInteger counter = new AtomicInteger(0);
|
||
long total = entityIds.size();
|
||
|
||
for (Integer id : entityIds) {
|
||
// 检查是否取消
|
||
if (isCancelled()) {
|
||
holder.info("任务已取消");
|
||
break;
|
||
}
|
||
|
||
try {
|
||
MessageHolder subHolder = holder.sub("处理 ID: " + id + " > ");
|
||
|
||
// 执行业务逻辑
|
||
Contract entity = contractService.getById(id);
|
||
// 处理实体...
|
||
|
||
subHolder.info("处理成功");
|
||
} catch (Exception e) {
|
||
holder.error("处理 ID: " + id + " 失败: " + e.getMessage());
|
||
// 记录异常但继续处理下一个
|
||
}
|
||
|
||
// 更新进度
|
||
updateProgress(counter.incrementAndGet(), total);
|
||
}
|
||
|
||
updateTitle("批量处理完成");
|
||
return null;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 6.3 带有依赖的任务实现示例
|
||
|
||
```java
|
||
package com.ecep.contract.service.tasker;
|
||
|
||
import com.ecep.contract.MessageHolder;
|
||
import com.ecep.contract.ds.contract.service.ContractService;
|
||
import com.ecep.contract.ds.customer.service.CustomerService;
|
||
import com.ecep.contract.ui.Tasker;
|
||
import com.fasterxml.jackson.databind.JsonNode;
|
||
|
||
import lombok.Getter;
|
||
import lombok.Setter;
|
||
|
||
public class DependentTasker extends Tasker<Object> implements WebSocketServerTasker {
|
||
@Getter
|
||
@Setter
|
||
private int contractId;
|
||
@Getter
|
||
@Setter
|
||
private boolean processed = false;
|
||
|
||
private ContractService contractService;
|
||
private CustomerService customerService;
|
||
|
||
@Override
|
||
public void init(JsonNode argsNode) {
|
||
// 解析参数
|
||
this.contractId = argsNode.get(0).asInt();
|
||
|
||
// 获取服务依赖
|
||
this.contractService = getCachedBean(ContractService.class);
|
||
this.customerService = getCachedBean(CustomerService.class);
|
||
}
|
||
|
||
@Override
|
||
protected Object execute(MessageHolder holder) throws Exception {
|
||
updateTitle("执行带有服务依赖的任务");
|
||
|
||
updateProgress(1, 5);
|
||
|
||
// 使用多个服务协同完成任务
|
||
try {
|
||
// 步骤1: 获取合同信息
|
||
var contract = contractService.getById(contractId);
|
||
updateProgress(2, 5);
|
||
|
||
// 步骤2: 获取相关客户信息
|
||
var customer = customerService.getById(contract.getCustomerId());
|
||
updateProgress(3, 5);
|
||
|
||
// 步骤3: 执行业务逻辑
|
||
// ...
|
||
updateProgress(4, 5);
|
||
|
||
processed = true;
|
||
holder.info("任务处理成功");
|
||
|
||
// 更新结果属性
|
||
updateProperty("processed", processed);
|
||
} catch (Exception e) {
|
||
holder.error("任务处理失败: " + e.getMessage());
|
||
throw e;
|
||
}
|
||
|
||
updateProgress(5, 5);
|
||
return null;
|
||
}
|
||
}
|
||
```
|
||
|
||
## 7. 最佳实践总结
|
||
|
||
1. **继承 Tasker<Object> 并实现 WebSocketServerTasker 接口**:利用已有的基础实现
|
||
|
||
2. **合理实现 init 方法**:处理参数初始化,避免在 execute 中重复解析
|
||
|
||
3. **实现清晰的 execute 逻辑**:
|
||
- 开始时设置任务标题
|
||
- 过程中定期更新进度
|
||
- 提供详细的消息反馈
|
||
- 结束时更新最终状态
|
||
|
||
4. **使用 Spring Bean**:通过 `getCachedBean` 获取需要的服务依赖
|
||
|
||
5. **支持任务取消**:在关键循环中检查 `isCancelled()` 状态
|
||
|
||
6. **异常处理**:捕获异常并记录,提供有意义的错误信息
|
||
|
||
7. **进度反馈**:对于长时间运行的任务,提供合理的进度更新频率
|
||
|
||
8. **属性传递**:使用 `updateProperty` 方法传递任务结果给调用方
|
||
|
||
9. **日志记录**:使用 slf4j 记录关键操作和异常
|
||
|
||
10. **资源管理**:确保在任务完成或取消时释放相关资源
|
||
|
||
## 8. 注意事项
|
||
|
||
1. **不要在构造函数中获取 Spring Bean**:使用 `init` 方法或懒加载方式获取
|
||
|
||
2. **避免阻塞操作**:长时间阻塞的操作应考虑异步处理
|
||
|
||
3. **线程安全**:注意多线程环境下的并发访问问题
|
||
|
||
4. **内存管理**:对于处理大量数据的任务,注意内存使用,避免OOM
|
||
|
||
5. **超时处理**:考虑设置合理的超时机制
|
||
|
||
6. **事务管理**:根据需要考虑事务的范围和传播行为
|
||
|
||
7. **任务注册**:创建新的WebSocketServerTasker后,必须在`tasker_mapper.json`文件中注册,否则客户端将无法调用
|
||
|
||
8. **任务名称唯一性**:确保任务名称在`tasker_mapper.json`中唯一,避免名称冲突
|
||
|
||
9. **类路径正确**:注册时确保使用正确的类完全限定名,否则会导致实例化失败
|
||
|
||
10. **重启服务**:修改`tasker_mapper.json`后,需要重启服务才能使新的注册生效
|
||
|
||
通过遵循这些规范和最佳实践,可以确保实现的 `WebSocketServerTasker` 类具有良好的可维护性、可扩展性和用户体验。 |