feat(SMB): 重构SMB文件服务支持多服务器配置和连接池优化

重构SmbFileService以支持多服务器配置,引入连接池和会话池管理机制。主要变更包括:
1. 实现基于主机的多服务器认证配置
2. 新增连接池和会话池管理,提高连接复用率
3. 添加定时清理空闲连接和会话的功能
4. 优化异常处理和重试机制
5. 改进日志记录和资源释放

同时更新相关配置文件和应用属性以支持新功能:
1. 修改application.properties支持多服务器SMB配置
2. 增强SmbConfig类以管理多服务器配置
3. 添加任务映射到tasker_mapper.json
4. 新增客户端和服务端任务规则文档
This commit is contained in:
2025-11-17 12:55:31 +08:00
parent e761990ebf
commit 87290f15b0
14 changed files with 1367 additions and 156 deletions

View File

@@ -0,0 +1,419 @@
# 客户端Tasker实现WebSocketClientTasker接口规范
## 概述
本文档基于 `ContractRepairAllTasker` 实现 `WebSocketClientTasker` 接口的经验总结了客户端Tasker类升级为支持WebSocket通信的最佳实践和规范。
## WebSocketClientTasker接口介绍
`WebSocketClientTasker` 接口定义了通过WebSocket与服务器通信的任务的通用方法包括任务名称、消息更新、进度更新等核心功能。
### 核心方法
1. **getTaskName()** - 获取任务名称用于在WebSocket通信中标识任务
2. **updateMessage(Level, String)** - 更新任务执行过程中的消息
3. **updateTitle(String)** - 更新任务标题
4. **updateProgress(long, long)** - 更新任务进度
5. **cancelTask()** - 取消任务执行(默认实现为空)
6. **callRemoteTask(MessageHolder, Locale, Object...)** - 调用远程WebSocket任务
7. **callRemoteTaskAsync(MessageHolder, Locale, Object...)** - 异步调用远程WebSocket任务
8. **generateTaskId()** - 生成唯一的任务ID
### 典型实现模式概览
通过分析项目中的17个实现类我们发现了以下典型实现模式
1. **标准实现**继承Tasker<Object>并实现WebSocketClientTasker
2. **属性注入**:使用@Setter注解或手动设置属性传递任务参数
3. **Spring Bean获取**通过SpringApp.getBean()获取服务实例
4. **消息更新**:简洁的消息更新方式
5. **参数传递**通过callRemoteTask的可变参数传递任务所需数据
## Tasker实现WebSocketClientTasker最佳实践
### 1. 类定义和继承
```java
/**
* 任务类描述
* 用于通过WebSocket与服务器通信执行具体操作
*/
public class 任务类名 extends Tasker<Object> implements WebSocketClientTasker {
private static final Logger logger = LoggerFactory.getLogger(任务类名.class);
// 实现方法
}
```
### 2. 参数传递模式
#### 2.1 使用@Setter注解注入参数
```java
/**
* 更新供应商评价表任务
*/
public class CompanyVendorEvaluationFormUpdateTask extends Tasker<Object> implements WebSocketClientTasker {
private static final Logger logger = LoggerFactory.getLogger(CompanyVendorEvaluationFormUpdateTask.class);
@Setter
private VendorVo vendor;
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("更新供应商评价表");
return callRemoteTask(holder, getLocale(), vendor.getId());
}
}
```
#### 2.2 使用@Getter和@Setter注解
```java
/**
* 客户文件重建任务类
*/
public class CustomerRebuildFilesTasker extends Tasker<Object> implements WebSocketClientTasker {
@Getter
@Setter
private CustomerVo companyCustomer;
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("重建客户文件");
return callRemoteTask(holder, getLocale(), companyCustomer.getId());
}
}
```
### 2. 必要方法实现
#### 2.1 getTaskName()
```java
@Override
public String getTaskName() {
return "Task名称"; // 必须与服务器端对应Tasker类名匹配
}
```
**注意事项**
- 任务名称必须与服务器端对应的Tasker注册名tasker_mapper.json中的key保持一致
- 名称应简洁明了,反映任务的核心功能
#### 2.2 updateProgress()
updateProgress 方法重载为public用于外部调用更新进度
```java
@Override
public void updateProgress(long current, long total) {
super.updateProgress(current, total); // 调用父类方法更新进度
}
```
#### 2.3 updateTitle()
```java
@Override
public void updateTitle(String title) {
super.updateTitle(title); // 使用Tasker的updateTitle方法更新标题
}
```
#### 2.4 execute() 方法重写
**标准实现**
```java
@Override
protected Object execute(MessageHolder holder) throws Exception {
logger.info("开始执行任务描述");
updateTitle("任务标题");
// 调用远程任务,可选传入参数
Object result = callRemoteTask(holder, getLocale(), 可选参数...);
logger.info("任务执行完成");
return result;
}
```
**简洁实现**(适用于简单任务):
```java
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("更新供应商评价表");
return callRemoteTask(holder, getLocale(), vendor.getId());
}
```
**带消息更新的实现**
```java
@Override
protected Object execute(MessageHolder holder) throws Exception {
// 设置任务标题
updateTitle("全量库存同步任务");
// 更新任务消息
updateMessage("开始执行全量库存同步...");
// 调用远程WebSocket任务
return callRemoteTask(holder, getLocale());
}
```
**关键步骤**
1. 记录任务开始日志(可选)
2. 设置任务标题
3. 可选:添加任务开始消息
4. 调用远程任务执行核心逻辑,传入必要参数
5. 记录任务完成日志(可选)
6. 返回执行结果
### 3. 日志记录
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 在类中定义
private static final Logger logger = LoggerFactory.getLogger(任务类名.class);
// 在关键位置使用日志
logger.info("任务开始");
logger.warn("警告信息");
logger.error("错误信息", exception);
```
**日志使用建议**
- 复杂任务建议记录详细日志
- 简单任务可以简化或省略日志记录
- 确保异常情况下有适当的错误日志记录
### 4. 异常处理
`execute`方法中应妥善处理可能的异常并通过MessageHolder通知用户
```java
@Override
protected Object execute(MessageHolder holder) throws Exception {
try {
// 任务执行逻辑
} catch (Exception e) {
logger.error("任务执行失败", e);
holder.addMessage(Level.SEVERE, "任务执行失败: " + e.getMessage());
throw e; // 向上抛出异常,让框架处理
}
}
```
**异常处理策略**
- 对于简单任务,可以依赖框架的异常处理机制
- 对于复杂任务,建议添加自定义的异常处理逻辑
- 确保异常信息对用户友好且具有足够的调试信息
## 完整实现示例
### 示例1简单任务实现
```java
package com.ecep.contract.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.WebSocketClientTasker;
/**
* 合同修复任务类
* 用于通过WebSocket与服务器通信执行合同数据修复操作
*/
public class ContractRepairAllTasker extends Tasker<Object> implements WebSocketClientTasker {
private static final Logger logger = LoggerFactory.getLogger(ContractRepairAllTasker.class);
@Override
public void updateProgress(long current, long total) {
super.updateProgress(current, total);
}
@Override
public void updateTitle(String title) {
// 使用Tasker的updateTitle方法更新标题
super.updateTitle(title);
}
@Override
public String getTaskName() {
return "ContractRepairAllTask"; // 与服务器端对应Tasker类名匹配
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
logger.info("开始执行合同修复任务");
updateTitle("合同数据修复");
Object result = callRemoteTask(holder, getLocale());
logger.info("合同修复任务执行完成");
return result;
}
}
```
### 示例2带参数的任务实现
```java
package com.ecep.contract.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.WebSocketClientTasker;
import com.ecep.contract.vo.VendorVo;
import lombok.Setter;
/**
* 更新供应商评价表
*/
public class CompanyVendorEvaluationFormUpdateTask extends Tasker<Object> implements WebSocketClientTasker {
private static final Logger logger = LoggerFactory.getLogger(CompanyVendorEvaluationFormUpdateTask.class);
@Setter
private VendorVo vendor;
@Override
public void updateProgress(long current, long total) {
super.updateProgress(current, total);
}
@Override
public String getTaskName() {
return "CompanyVendorEvaluationFormUpdateTask"; // 与服务器端对应Tasker类名匹配
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("更新供应商评价表");
return callRemoteTask(holder, getLocale(), vendor.getId());
}
}
```
### 示例3使用Spring Bean的任务实现
```java
package com.ecep.contract.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.SpringApp;
import com.ecep.contract.WebSocketClientTasker;
import com.ecep.contract.service.YongYouU8Service;
/**
* 合同同步任务
*/
public class ContractSyncTask extends Tasker<Object> implements WebSocketClientTasker {
private static final Logger logger = LoggerFactory.getLogger(ContractSyncTask.class);
private YongYouU8Service yongYouU8Service;
private YongYouU8Service getYongYouU8Service() {
if (yongYouU8Service == null) {
yongYouU8Service = SpringApp.getBean(YongYouU8Service.class);
}
return yongYouU8Service;
}
public String getTaskName() {
return "ContractSyncTask"; // 与服务器端对应Tasker类名匹配
}
@Override
public void updateProgress(long current, long total) {
super.updateProgress(current, total);
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("用友U8系统-同步合同");
return callRemoteTask(holder, getLocale());
}
}
```
## 注意事项和最佳实践
### 1. 命名规范
- 任务类名应采用驼峰命名法,以`Tasker`结尾或描述性名称如`Task`
- getTaskName()返回的名称应与服务器端对应Tasker类名完全匹配
- 类注释应清晰描述任务的功能和用途
### 2. 继承关系
- 必须同时继承Tasker类并实现WebSocketClientTasker接口
- Tasker泛型参数通常为Object
- 确保正确导入所有必要的包
### 3. 参数处理
- 对于需要参数的任务,使用@Setter注解简化属性设置
- 对于需要在多处使用的参数,考虑添加@Getter注解
- 确保参数验证(如果必要)
### 4. Spring Bean获取
- 使用SpringApp.getBean()获取所需的服务实例
- 考虑使用懒加载模式避免不必要的Bean初始化
### 5. 消息和进度更新
- 使用updateTitle()设置有意义的任务标题
- 通过MessageHolder或updateMessage()记录详细的执行消息
- 确保进度更新反映真实的执行进度
### 6. 异常处理
- 在关键操作处添加try-catch块
- 记录异常日志并通知用户
- 适当向上抛出异常以确保框架能正确处理
### 7. 日志级别使用
- INFO: 记录正常的操作流程
- WARNING: 记录可能的问题,但不影响继续执行
- ERROR: 记录严重错误,通常会终止执行
### 8. 远程调用参数
- 确保传入的参数类型与服务器端Tasker期望的一致
- 对于不需要参数的任务,可以不传入额外参数
- 对于需要多个参数的任务,确保参数顺序正确
### 9. 代码风格
- 保持代码简洁明了
- 遵循项目的代码格式化规范
- 添加必要的注释说明核心逻辑
### 10. 实现策略选择
- 简单任务:使用简洁的实现方式,省略不必要的日志
- 复杂任务:添加详细的日志记录和异常处理
- 有特定需求的任务:根据需要重写接口中的其他方法
## 与服务器端交互流程
1. 客户端Tasker通过callRemoteTask()方法提交任务
2. WebSocketClientService负责建立与服务器的连接并发送任务信息
3. 服务器接收到任务后创建对应的Tasker实例并执行
4. 执行过程中的消息、进度等通过WebSocket实时返回给客户端
5. 客户端Tasker通过updateMessage()、updateProgress()等方法更新UI
## 扩展和自定义
如需为特定任务提供自定义功能,可以:
1. 重写cancelTask()方法实现任务取消逻辑
2. 根据需要添加额外的字段和方法
3. 扩展execute()方法实现更复杂的任务流程
## 总结
通过分析项目中的17个WebSocketClientTasker实现类我们总结了客户端Tasker实现的多种模式和最佳实践。这些实现从简单到复杂涵盖了各种使用场景为后续Tasker的编写提供了全面的参考。
客户端Tasker类实现WebSocketClientTasker接口是实现与服务器实时通信的关键步骤。通过遵循本文档中的规范和最佳实践可以确保任务执行的可靠性、进度的实时更新和良好的用户体验。
在实际开发中,应根据任务的复杂度和具体需求,选择合适的实现模式和策略,同时保持代码的一致性和可维护性。

View File

@@ -0,0 +1,452 @@
# WebSocketServerTasker 接口实现规范
## 1. 概述
本文档总结了实现 `WebSocketServerTasker` 接口的标准做法和最佳实践基于对项目中已有实现类的分析。该接口用于服务器端实现支持WebSocket通信的任务处理器实现异步任务的执行和状态监控。
## 2. WebSocketServerTasker 接口介绍
`WebSocketServerTasker` 接口位于 `com.ecep.contract.service.tasker` 包下,继承自 `java.util.concurrent.Callable<Object>`,定义了以下核心方法:
```java
public interface WebSocketServerTasker extends Callable<Object> {
// 初始化任务,处理传入的参数
void init(JsonNode argsNode);
// 设置会话信息(默认实现为空)
default void setSession(SessionInfo session) {}
// 设置消息处理函数
void setMessageHandler(Predicate<Message> messageHandler);
// 设置标题处理函数
void setTitleHandler(Predicate<String> titleHandler);
// 设置属性处理函数
void setPropertyHandler(BiConsumer<String, Object> propertyHandler);
// 设置进度处理函数
void setProgressHandler(BiConsumer<Long, Long> progressHandler);
}
```
## 3. 基础实现模式
### 3.1 推荐实现方式
通过分析项目中的实现类,推荐以下标准实现模式:
1. **继承 Tasker<Object> 并实现 WebSocketServerTasker 接口**
```java
public class YourTasker extends Tasker<Object> implements WebSocketServerTasker {
// 实现代码
}
```
2. **在 Tasker 基类中已实现的方法**
`Tasker` 类已经提供了 `WebSocketServerTasker` 接口中大部分方法的实现,包括:
- `setMessageHandler`
- `setTitleHandler`
- `setPropertyHandler`
- `setProgressHandler`
- `call()` 方法(实现了 `Callable` 接口)
这使得实现类只需要关注特定业务逻辑的实现。
## 3.2 WebSocketServerTasker 注册配置
要使WebSocketServerTasker能够被客户端通过WebSocket调用必须在`tasker_mapper.json`配置文件中进行注册。
### 3.2.1 注册配置文件
配置文件位置:`server/src/main/resources/tasker_mapper.json`
### 3.2.2 注册格式
注册格式为JSON对象包含一个`tasks`字段,该字段是一个键值对映射,其中:
- **键**:任务名称(客户端通过此名称调用任务)
- **值**:任务类的完全限定名
```json
{
"tasks": {
"任务名称": "任务类的完全限定名",
"ContractVerifyTasker": "com.ecep.contract.service.tasker.ContractVerifyTasker"
},
"descriptions": "任务注册信息, 客户端的任务可以通过 WebSocket 调用"
}
```
### 3.2.3 注册示例
假设我们创建了一个名为`CustomTasker`的新任务类,其完全限定名为`com.ecep.contract.service.tasker.CustomTasker`,则注册方式如下:
```json
{
"tasks": {
"CustomTasker": "com.ecep.contract.service.tasker.CustomTasker"
// 其他已注册任务...
},
"descriptions": "任务注册信息, 客户端的任务可以通过 WebSocket 调用"
}
```
### 3.2.4 注册机制说明
WebSocketServerTaskManager类在启动时会读取`tasker_mapper.json`文件,初始化任务类映射表。当客户端请求执行任务时,系统会:
1. 根据客户端提供的任务名称从映射表中查找对应的任务类
2. 使用反射机制实例化任务对象
3. 设置WebSocket会话和各类处理器
4. 执行任务并通过WebSocket返回结果
## 4. ContractRepairAllTasker 升级经验
通过分析 `ContractRepairAllTasker` 的实现,我们总结了以下升级经验:
### 4.1 升级步骤
1. **继承适当的基础类**:根据业务需求选择继承 `Tasker<Object>` 或其他特定的抽象类(如 `AbstContractRepairTasker`
2. **实现 WebSocketServerTasker 接口**:声明实现该接口,自动继承接口定义的方法
3. **实现 init 方法**:处理任务初始化和参数解析
```java
@Override
public void init(JsonNode argsNode) {
// 解析参数或初始化任务状态
// ContractRepairAllTasker 不需要参数,所以此方法为空实现
}
```
4. **实现或重写 execute 方法**:提供具体的业务逻辑实现
```java
@Override
protected Object execute(MessageHolder holder) throws Exception {
// 调用父类初始化
super.execute(holder);
// 执行具体业务逻辑
repair(holder);
return null; // 通常返回 null 或处理结果
}
```
5. **使用更新方法提供状态反馈**:在执行过程中使用 `updateTitle`、`updateProgress` 等方法提供实时反馈
```java
updateTitle("同步修复所有合同");
updateProgress(counter.incrementAndGet(), total);
```
6. **支持任务取消**:定期检查 `isCancelled()` 方法,支持任务的取消操作
```java
if (isCancelled()) {
break;
}
```
### 4.2 关键经验
1. **消息处理**:通过 `MessageHolder` 处理和记录执行过程中的消息,方便调试和错误追踪
2. **进度更新**:对于批量操作,使用计数器和总数定期更新进度,提供良好的用户体验
3. **异常处理**:在循环处理中捕获异常并记录,确保单个项目的失败不会导致整个任务失败
4. **分页处理**:对于大量数据的处理,使用分页查询避免内存溢出
## 5. 共同模式和最佳实践
通过分析项目中的17个实现类我们总结了以下共同模式和最佳实践
### 5.1 共同模式
1. **继承结构**:所有实现类都继承 `Tasker<Object>` 并实现 `WebSocketServerTasker` 接口
2. **核心方法实现**
- 实现 `init(JsonNode argsNode)` 方法处理参数
- 实现或重写 `execute(MessageHolder holder)` 方法实现业务逻辑
3. **状态更新**:使用 `updateTitle`、`updateMessage`、`updateProgress` 等方法更新任务状态
4. **Bean获取**:使用 `getCachedBean` 方法获取Spring管理的Bean
```java
ContractService contractService = getCachedBean(ContractService.class);
```
### 5.2 最佳实践
1. **参数验证**:在 `init` 方法中验证和转换输入参数
2. **进度反馈**:对于长时间运行的任务,提供合理的进度更新
3. **异常处理**:捕获并处理特定异常,提供有意义的错误信息
4. **取消支持**:实现任务取消机制,定期检查 `isCancelled()` 状态
5. **资源清理**:在任务结束或取消时清理资源
6. **日志记录**:使用 `slf4j` 记录关键操作和异常信息
7. **属性传递**:使用 `updateProperty` 方法传递任务执行结果或状态
```java
updateProperty("passed", passed);
```
## 6. 完整实现示例
### 6.1 简单任务实现示例
```java
package com.ecep.contract.service.tasker;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.ds.contract.service.ContractService;
import com.ecep.contract.ui.Tasker;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Getter;
import lombok.Setter;
public class SimpleTasker extends Tasker<Object> implements WebSocketServerTasker {
@Getter
@Setter
private int entityId;
@Getter
@Setter
private boolean success = false;
@Override
public void init(JsonNode argsNode) {
// 解析参数
this.entityId = argsNode.get(0).asInt();
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
// 更新标题
updateTitle("执行简单任务:" + entityId);
try {
// 更新进度
updateProgress(1, 3);
// 执行业务逻辑
ContractService service = getCachedBean(ContractService.class);
updateProgress(2, 3);
// 处理结果
success = true;
holder.info("任务执行成功");
// 更新最终进度
updateProgress(3, 3);
// 更新结果属性
updateProperty("success", success);
} catch (Exception e) {
holder.error("任务执行失败: " + e.getMessage());
success = false;
updateProperty("success", success);
throw e;
}
return null;
}
}
```
### 6.2 批量处理任务实现示例
```java
package com.ecep.contract.service.tasker;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.ds.contract.service.ContractService;
import com.ecep.contract.ds.contract.model.Contract;
import com.ecep.contract.ui.Tasker;
import com.fasterxml.jackson.databind.JsonNode;
public class BatchProcessTasker extends Tasker<Object> implements WebSocketServerTasker {
private ContractService contractService;
private List<Integer> entityIds;
@Override
public void init(JsonNode argsNode) {
// 初始化服务引用
this.contractService = getCachedBean(ContractService.class);
// 解析实体ID列表
entityIds = new ArrayList<>();
for (JsonNode node : argsNode) {
entityIds.add(node.asInt());
}
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("批量处理实体任务");
AtomicInteger counter = new AtomicInteger(0);
long total = entityIds.size();
for (Integer id : entityIds) {
// 检查是否取消
if (isCancelled()) {
holder.info("任务已取消");
break;
}
try {
MessageHolder subHolder = holder.sub("处理 ID: " + id + " > ");
// 执行业务逻辑
Contract entity = contractService.getById(id);
// 处理实体...
subHolder.info("处理成功");
} catch (Exception e) {
holder.error("处理 ID: " + id + " 失败: " + e.getMessage());
// 记录异常但继续处理下一个
}
// 更新进度
updateProgress(counter.incrementAndGet(), total);
}
updateTitle("批量处理完成");
return null;
}
}
```
### 6.3 带有依赖的任务实现示例
```java
package com.ecep.contract.service.tasker;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.ds.contract.service.ContractService;
import com.ecep.contract.ds.customer.service.CustomerService;
import com.ecep.contract.ui.Tasker;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Getter;
import lombok.Setter;
public class DependentTasker extends Tasker<Object> implements WebSocketServerTasker {
@Getter
@Setter
private int contractId;
@Getter
@Setter
private boolean processed = false;
private ContractService contractService;
private CustomerService customerService;
@Override
public void init(JsonNode argsNode) {
// 解析参数
this.contractId = argsNode.get(0).asInt();
// 获取服务依赖
this.contractService = getCachedBean(ContractService.class);
this.customerService = getCachedBean(CustomerService.class);
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("执行带有服务依赖的任务");
updateProgress(1, 5);
// 使用多个服务协同完成任务
try {
// 步骤1: 获取合同信息
var contract = contractService.getById(contractId);
updateProgress(2, 5);
// 步骤2: 获取相关客户信息
var customer = customerService.getById(contract.getCustomerId());
updateProgress(3, 5);
// 步骤3: 执行业务逻辑
// ...
updateProgress(4, 5);
processed = true;
holder.info("任务处理成功");
// 更新结果属性
updateProperty("processed", processed);
} catch (Exception e) {
holder.error("任务处理失败: " + e.getMessage());
throw e;
}
updateProgress(5, 5);
return null;
}
}
```
## 7. 最佳实践总结
1. **继承 Tasker<Object> 并实现 WebSocketServerTasker 接口**:利用已有的基础实现
2. **合理实现 init 方法**:处理参数初始化,避免在 execute 中重复解析
3. **实现清晰的 execute 逻辑**
- 开始时设置任务标题
- 过程中定期更新进度
- 提供详细的消息反馈
- 结束时更新最终状态
4. **使用 Spring Bean**:通过 `getCachedBean` 获取需要的服务依赖
5. **支持任务取消**:在关键循环中检查 `isCancelled()` 状态
6. **异常处理**:捕获异常并记录,提供有意义的错误信息
7. **进度反馈**:对于长时间运行的任务,提供合理的进度更新频率
8. **属性传递**:使用 `updateProperty` 方法传递任务结果给调用方
9. **日志记录**:使用 slf4j 记录关键操作和异常
10. **资源管理**:确保在任务完成或取消时释放相关资源
## 8. 注意事项
1. **不要在构造函数中获取 Spring Bean**:使用 `init` 方法或懒加载方式获取
2. **避免阻塞操作**:长时间阻塞的操作应考虑异步处理
3. **线程安全**:注意多线程环境下的并发访问问题
4. **内存管理**对于处理大量数据的任务注意内存使用避免OOM
5. **超时处理**:考虑设置合理的超时机制
6. **事务管理**:根据需要考虑事务的范围和传播行为
7. **任务注册**创建新的WebSocketServerTasker后必须在`tasker_mapper.json`文件中注册,否则客户端将无法调用
8. **任务名称唯一性**:确保任务名称在`tasker_mapper.json`中唯一,避免名称冲突
9. **类路径正确**:注册时确保使用正确的类完全限定名,否则会导致实例化失败
10. **重启服务**:修改`tasker_mapper.json`后,需要重启服务才能使新的注册生效
通过遵循这些规范和最佳实践,可以确保实现的 `WebSocketServerTasker` 类具有良好的可维护性、可扩展性和用户体验。

View File

@@ -11,10 +11,13 @@ import java.util.logging.Level;
* WebSocket客户端任务接口
* 定义了所有通过WebSocket与服务器通信的任务的通用方法
* 包括任务名称、更新消息、更新标题、更新进度等操作
*
* 所有通过WebSocket与服务器通信的任务类都应实现此接口, 文档参考 .trace/rules/client_task_rules.md
*/
public interface WebSocketClientTasker {
/**
/**s
* 获取任务名称
* 任务名称用于唯一标识任务, 服务器端会根据任务名称来调用对应的任务处理函数
*
* @return 任务名称
*/
@@ -22,14 +25,16 @@ public interface WebSocketClientTasker {
/**
* 更新任务执行过程中的消息
* 客户端可以通过此方法向用户展示任务执行过程中的重要信息或错误提示
*
* @param level 消息级别
* @param message 消息内容
* @param level 消息级别, 用于区分不同类型的消息, 如INFO, WARNING, SEVERE等
* @param message 消息内容, 可以是任意字符串, 用于展示给用户
*/
void updateMessage(Level level, String message);
/**
* 更新任务标题
* 客户端可以通过此方法向用户展示任务的当前执行状态或重要信息
*
* @param title 任务标题
*/
@@ -37,6 +42,7 @@ public interface WebSocketClientTasker {
/**
* 更新任务进度
* 客户端可以通过此方法向用户展示任务的执行进度, 如文件上传进度、数据库操作进度等
*
* @param current 当前进度
* @param total 总进度
@@ -53,6 +59,7 @@ public interface WebSocketClientTasker {
/**
* 调用远程WebSocket任务
* 客户端可以通过此方法向服务器提交任务, 并等待服务器返回任务执行结果
*
* @param holder 消息持有者,用于记录任务执行过程中的消息
* @param locale 语言环境

View File

@@ -1,21 +1,37 @@
package com.ecep.contract.task;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.WebSocketClientTasker;
import com.ecep.contract.service.ContractService;
import com.ecep.contract.vo.ContractGroupVo;
import lombok.extern.slf4j.Slf4j;
import lombok.Setter;
public class ContractFilesRebuildAllTasker extends Tasker<Object>{
@Slf4j
public class ContractFilesRebuildAllTasker extends Tasker<Object> implements WebSocketClientTasker {
@Setter
private ContractService contractService;
@Setter
private ContractGroupVo group;
@Override
public String getTaskName() {
return "ContractFilesRebuildAllTasker";
}
@Override
public void updateProgress(long workDone, long max) {
super.updateProgress(workDone, max);
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'execute'");
updateTitle("重建合同组 " + group.getName() + " 的所有文件");
log.info("开始重建合同组文件: {}", group.getName());
// 调用远程任务
return callRemoteTask(holder, getLocale(), group.getId());
}
}

View File

@@ -1,14 +1,40 @@
package com.ecep.contract.task;
import com.ecep.contract.MessageHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ContractRepairAllTasker extends Tasker<Object>{
import com.ecep.contract.MessageHolder;
import com.ecep.contract.WebSocketClientTasker;
/**
* 合同修复任务类
* 用于通过WebSocket与服务器通信执行合同数据修复操作
*/
public class ContractRepairAllTasker extends Tasker<Object> implements WebSocketClientTasker {
private static final Logger logger = LoggerFactory.getLogger(ContractRepairAllTasker.class);
@Override
public void updateProgress(long current, long total) {
super.updateProgress(current, total);
}
@Override
public void updateTitle(String title) {
// 使用Tasker的updateTitle方法更新标题
super.updateTitle(title);
}
@Override
public String getTaskName() {
return "ContractRepairAllTask";
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'execute'");
logger.info("开始执行合同修复任务");
updateTitle("合同数据修复");
Object result = callRemoteTask(holder, getLocale());
logger.info("合同修复任务执行完成");
return result;
}
}

View File

@@ -43,7 +43,9 @@ public class ContractRepairTask extends Tasker<Object> implements WebSocketClien
@Override
public void updateProgress(long current, long total) {
super.updateProgress(current, total);
double d = (double) current / total;
super.updateProgress(d, 1);
System.out.println("current = " + d + ", total = " + total);
}
@Override

View File

@@ -209,18 +209,22 @@ public class UITools {
Platform.runLater(() -> {
box.getChildren().add(progressBar);
System.out.println("add progressBar = " + progressBar);
});
// progressBar.disabledProperty().bind(task.runningProperty());
progressBar.visibleProperty().bind(task.runningProperty());
progressBar.progressProperty().bind(task.progressProperty());
task.progressProperty().addListener((observable, oldValue, newValue) -> {
System.out.println("progress = " + newValue);
});
if (task instanceof Tasker<?> tasker) {
// 提交任务
Desktop.instance.getExecutorService().submit(tasker);
}
if (init != null) {
init.accept(msg -> consumer.test(msg));
init.accept(consumer::test);
}
dialog.showAndWait();
if (task.isRunning()) {

View File

@@ -1,30 +1,64 @@
package com.ecep.contract.config;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.hierynomus.smbj.event.SMBEventBus;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.auth.NtlmAuthenticator;
@Configuration()
/**
* SMB配置类支持多服务器配置
*/
@Configuration
@ConfigurationProperties(prefix = "smb")
@Data
public class SmbConfig {
// 多服务器配置key为服务器标识
private Map<String, ServerConfig> servers = new ConcurrentHashMap<>();
private SMBEventBus eventBus = new SMBEventBus();
@Value("${smb.server.username}")
@Getter
private String username;
@Value("${smb.server.password}")
@Getter
private String password;
/**
* 获取指定主机的配置
* 从servers中查找匹配的主机配置
*
* @param host 主机名
* @return 对应的服务器配置如果未找到则返回null
*/
public ServerConfig getServerConfig(String host) {
// 遍历servers查找匹配的主机配置
for (Map.Entry<String, ServerConfig> entry : servers.entrySet()) {
if (entry.getValue().getHost().equals(host)) {
return entry.getValue();
}
}
// 如果没有找到匹配的主机配置返回null
return null;
}
@Bean
public SMBClient smbClient() {
var smbConfig = com.hierynomus.smbj.SmbConfig.builder()
.withMultiProtocolNegotiate(true).withSigningRequired(true)
// .withAuthenticators(new NtlmAuthenticator(username, password))
.build();
return new SMBClient(smbConfig);
return new SMBClient(smbConfig, eventBus);
}
public void subscribe(Object listener) {
eventBus.subscribe(listener);
}
/**
* 服务器配置内部类
*/
@Data
public static class ServerConfig {
private String host;
private String username;
private String password;
}
}

View File

@@ -1,5 +1,6 @@
package com.ecep.contract.ds.contract.tasker;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.data.domain.Page;
@@ -13,13 +14,16 @@ import com.ecep.contract.ds.contract.service.ContractService;
import com.ecep.contract.ds.contract.model.Contract;
import com.ecep.contract.model.ContractGroup;
import com.ecep.contract.ui.Tasker;
import com.ecep.contract.service.tasker.WebSocketServerTasker;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import lombok.Setter;
/**
* 对所有合同的文件进行重置
*/
public class ContractFilesRebuildAllTasker extends Tasker<Object> {
public class ContractFilesRebuildAllTasker extends Tasker<Object> implements WebSocketServerTasker {
@Setter
private ContractService contractService;
@@ -41,9 +45,9 @@ public class ContractFilesRebuildAllTasker extends Tasker<Object> {
@Override
protected Object execute(MessageHolder holder) {
updateTitle("遍历所有合同,对每个合同的文件进行\"重置\"操作");
Pageable pageRequest = PageRequest.ofSize(200);
AtomicInteger counter = new AtomicInteger(0);
updateTitle("遍历所有合同,对每个可以合同的文件进行“重置”操作");
Specification<Contract> spec = null;
if (group != null) {
@@ -80,6 +84,12 @@ public class ContractFilesRebuildAllTasker extends Tasker<Object> {
return null;
}
@Override
public void init(JsonNode argsNode) {
// 初始化方法可以根据需要从argsNode获取参数
}
private ContractService getContractService() {
if (contractService == null) {
contractService = getBean(ContractService.class);

View File

@@ -21,13 +21,15 @@ import com.ecep.contract.Message;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.ds.contract.service.ContractService;
import com.ecep.contract.ds.contract.model.Contract;
import com.ecep.contract.service.tasker.WebSocketServerTasker;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.persistence.criteria.Path;
/**
* 对所有合同进行修复
* 对所有合同进行修复的任务类实现WebSocketServerTasker接口以支持WebSocket通信
*/
public class ContractRepairAllTasker extends AbstContractRepairTasker {
public class ContractRepairAllTasker extends AbstContractRepairTasker implements WebSocketServerTasker {
private static final Logger logger = LoggerFactory.getLogger(ContractRepairAllTasker.class);
static class MessageHolderImpl implements MessageHolder {
@@ -40,6 +42,18 @@ public class ContractRepairAllTasker extends AbstContractRepairTasker {
}
}
@Override
public void init(JsonNode argsNode) {
// ContractRepairAllTasker不需要参数argsNode可以为空
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
super.execute(holder);
repair(holder);
return null;
}
@Override
protected void repair(MessageHolder holder) {
updateTitle("同步修复所有合同");

View File

@@ -1,21 +1,5 @@
package com.ecep.contract.service;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ecep.contract.SpringApp;
import com.ecep.contract.config.SmbConfig;
import com.hierynomus.msdtyp.AccessMask;
@@ -26,13 +10,26 @@ import com.hierynomus.mssmb2.SMB2CreateDisposition;
import com.hierynomus.mssmb2.SMB2CreateOptions;
import com.hierynomus.mssmb2.SMB2ShareAccess;
import com.hierynomus.mssmb2.SMBApiException;
import com.hierynomus.protocol.transport.TransportException;
import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.auth.AuthenticationContext;
import com.hierynomus.smbj.common.SMBRuntimeException;
import com.hierynomus.smbj.common.SmbPath;
import com.hierynomus.smbj.event.SessionLoggedOff;
import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.share.File;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* SMB文件服务类提供SMB/CIFS协议的文件操作功能
@@ -41,21 +38,59 @@ import lombok.extern.slf4j.Slf4j;
@Service
public class SmbFileService implements DisposableBean {
private final SMBClient client;
private AuthenticationContext authContext;
private final SmbConfig smbConfig;
private final ReentrantLock authLock = new ReentrantLock();
// 连接空闲超时时间3分钟
private static final long CONNECTION_IDLE_TIMEOUT_MS = 3 * 60 * 1000;
// 连接信息内部类,用于存储连接和最后使用时间
// Session信息内部类,用于存储session和最后使用时间
private static class SessionInfo implements AutoCloseable {
private final com.hierynomus.smbj.session.Session session;
private final String username; // 添加用户名字段
private volatile long lastUsedTimestamp;
public SessionInfo(com.hierynomus.smbj.session.Session session, String username) {
this.session = session;
this.username = username;
this.lastUsedTimestamp = System.currentTimeMillis();
}
public String getUsername() {
return username;
}
public com.hierynomus.smbj.session.Session getSession() {
return session;
}
public long getLastUsedTimestamp() {
return lastUsedTimestamp;
}
public void updateLastUsedTimestamp() {
this.lastUsedTimestamp = System.currentTimeMillis();
}
public boolean isIdle(long timeoutMs) {
return (System.currentTimeMillis() - lastUsedTimestamp) > timeoutMs;
}
public void close() throws IOException {
session.close();
}
}
// 连接信息内部类用于存储连接和session连接池
private static class ConnectionInfo {
private final com.hierynomus.smbj.connection.Connection connection;
private volatile long lastUsedTimestamp;
// Session连接池使用List存储
private final List<SessionInfo> sessionPool;
public ConnectionInfo(com.hierynomus.smbj.connection.Connection connection) {
this.connection = connection;
this.lastUsedTimestamp = System.currentTimeMillis();
this.sessionPool = Collections.synchronizedList(new ArrayList<>());
}
public com.hierynomus.smbj.connection.Connection getConnection() {
@@ -73,6 +108,81 @@ public class SmbFileService implements DisposableBean {
public boolean isIdle(long timeoutMs) {
return (System.currentTimeMillis() - lastUsedTimestamp) > timeoutMs;
}
// 清理空闲的session
public int cleanupIdleSessions(long timeoutMs) {
List<SessionInfo> idleSessions = new java.util.ArrayList<>();
// 查找所有空闲session
for (SessionInfo sessionInfo : sessionPool) {
if (sessionInfo != null && sessionInfo.isIdle(timeoutMs)) {
idleSessions.add(sessionInfo);
}
}
// 关闭并移除空闲session
synchronized (sessionPool) {
for (SessionInfo sessionInfo : idleSessions) {
if (sessionInfo != null && sessionPool.contains(sessionInfo)) {
try {
sessionInfo.close();
} catch (IOException e) {
log.error("Error closing idle session for username: {}", sessionInfo.getUsername(), e);
}
sessionPool.remove(sessionInfo);
}
}
}
return idleSessions.size();
}
// 从session池中获取任意一个有效的session
public SessionInfo peekSession() {
if (sessionPool.isEmpty()) {
return null;
}
return sessionPool.removeFirst();
}
// 创建新session并添加到池中
public SessionInfo createSession(AuthenticationContext authContext) throws IOException {
String username = authContext.getUsername();
// 创建新session
com.hierynomus.smbj.session.Session session = connection.authenticate(authContext);
SessionInfo newSession = new SessionInfo(session, username);
return newSession;
}
// 更新session的最后使用时间
public void returnSession(SessionInfo sessionInfo) {
// 重新添加到池中
sessionPool.addLast(sessionInfo);
sessionInfo.updateLastUsedTimestamp();
}
// 关闭所有session
public void closeAllSessions() {
// 创建副本以避免并发修改异常
List<SessionInfo> sessionsCopy = new ArrayList<>();
// 先获取副本并清空池
synchronized (sessionPool) {
sessionsCopy.addAll(sessionPool);
sessionPool.clear();
}
// 关闭所有session
for (SessionInfo sessionInfo : sessionsCopy) {
try {
if (sessionInfo != null) {
sessionInfo.close();
}
} catch (IOException e) {
log.error("Error closing session for username: {}", sessionInfo.getUsername(), e);
}
}
}
}
// 连接池使用ConcurrentHashMap确保线程安全
@@ -91,123 +201,148 @@ public class SmbFileService implements DisposableBean {
this.client = smbClient;
this.smbConfig = SpringApp.getBean(SmbConfig.class);
this.smbConfig.subscribe(this);
// 初始化定时清理任务每30秒运行一次
this.cleanupScheduler = executor;
// 启动定时清理任务延迟1分钟后开始每30秒执行一次
this.cleanupScheduler.scheduleAtFixedRate(this::cleanupIdleConnections, 1, 30, TimeUnit.SECONDS);
}
//
// @net.engio.mbassy.listener.Handler
// private void onSessionLoggedOff(SessionLoggedOff sessionLoggedOffEvent) {
//
// }
/**
* 获取认证上下文,线程安全实现
* 获取认证上下文,根据主机名获取对应的认证信息
*
* @param host 主机名
* @return 认证上下文
* @throws IOException 如果找不到对应的服务器配置
*/
private AuthenticationContext getAuthenticationContext(String host) {
// 双重检查锁定模式,确保线程安全
if (authContext == null) {
authLock.lock();
try {
if (authContext == null) {
log.debug("Creating new AuthenticationContext for host: {}", host);
authContext = new AuthenticationContext(
smbConfig.getUsername(),
smbConfig.getPassword().toCharArray(),
"");
}
} finally {
authLock.unlock();
}
private AuthenticationContext getAuthenticationContext(String host) throws IOException {
log.debug("Creating AuthenticationContext for host: {}", host);
// 获取该主机对应的配置信息
SmbConfig.ServerConfig serverConfig = smbConfig.getServerConfig(host);
// 检查是否找到配置
if (serverConfig == null) {
String errorMsg = String.format("No SMB configuration found for host: %s", host);
log.error(errorMsg);
throw new IOException(errorMsg);
}
return authContext;
// 检查配置是否完整
if (serverConfig.getUsername() == null || serverConfig.getPassword() == null) {
String errorMsg = String.format("Incomplete SMB configuration for host: %s, username or password missing",
host);
log.error(errorMsg);
throw new IOException(errorMsg);
}
return new AuthenticationContext(
serverConfig.getUsername(),
serverConfig.getPassword().toCharArray(),
"");
}
/**
* 执行SMB操作的通用方法简化连接和会话的创建
*
* @param smbPath SMB路径
* @param operation 要执行的操作
* @param <T> 操作返回类型
* @return 操作结果
* @throws IOException 如果操作失败
*/
/**
* 从连接池获取或创建连接
*
*
* @param hostname 主机名
* @return SMB连接
* @throws IOException 如果创建连接失败
*/
private ConnectionInfo getConnectionInfo(String hostname) throws IOException {
// 首先检查连接池是否已有该主机的连接
com.hierynomus.smbj.connection.Connection connection = null;
int maxTrys = 3;
while (maxTrys-- > 0) {
ConnectionInfo connectionInfo = connectionPool.get(hostname);
// 如果连接存在且有效,则更新最后使用时间并返回
if (connectionInfo != null) {
connection = connectionInfo.getConnection();
if (connection != null && connection.isConnected()) {
// 更新连接的最后使用时间
connectionInfo.updateLastUsedTimestamp();
log.debug("Reusing SMB connection for host: {}", hostname);
return connectionInfo;
}
log.debug("Closing invalid SMB connection for host: {}", hostname);
connectionInfo.closeAllSessions();
}
// 如果连接不存在或已关闭,则创建新连接
connectionPoolLock.lock();
try {
// 创建新连接
log.debug("Creating new SMB connection for host: {}", hostname);
connection = client.connect(hostname);
connectionInfo = new ConnectionInfo(connection);
connectionPool.put(hostname, connectionInfo);
return connectionInfo;
} finally {
connectionPoolLock.unlock();
}
}
return null;
}
/**
* 从连接池获取或创建连接
*
* 从连接池获取或创建连接(兼容旧方法签名)
*
* @param hostname 主机名
* @return SMB连接
* @throws IOException 如果创建连接失败
*/
private com.hierynomus.smbj.connection.Connection getConnection(String hostname) throws IOException {
// 首先检查连接池是否已有该主机的连接
ConnectionInfo connectionInfo = connectionPool.get(hostname);
com.hierynomus.smbj.connection.Connection connection = null;
// 如果连接存在且有效,则更新最后使用时间并返回
if (connectionInfo != null) {
connection = connectionInfo.getConnection();
if (connection != null && connection.isConnected()) {
// 更新连接的最后使用时间
connectionInfo.updateLastUsedTimestamp();
log.debug("Reusing SMB connection for host: {}", hostname);
return connection;
}
}
// 如果连接不存在或已关闭,则创建新连接
connectionPoolLock.lock();
try {
// 双重检查锁定模式
connectionInfo = connectionPool.get(hostname);
if (connectionInfo != null) {
connection = connectionInfo.getConnection();
if (connection != null && connection.isConnected()) {
connectionInfo.updateLastUsedTimestamp();
log.debug("Reusing SMB connection for host: {}", hostname);
return connection;
}
// 如果连接已失效,从池中移除
connectionPool.remove(hostname);
}
// 创建新连接
log.debug("Creating new SMB connection for host: {}", hostname);
connection = client.connect(hostname);
connectionInfo = new ConnectionInfo(connection);
connectionPool.put(hostname, connectionInfo);
} finally {
connectionPoolLock.unlock();
}
return connection;
return getConnectionInfo(hostname).getConnection();
}
// Session空闲超时时间2分钟比连接超时时间短
private static final long SESSION_IDLE_TIMEOUT_MS = 2 * 60 * 1000;
/**
* 清理空闲连接的定时任务
* 清理空闲连接和session的定时任务
* 1. 检查并关闭所有超时的session
* 2. 检查并关闭所有超时的连接
*/
private void cleanupIdleConnections() {
log.debug("Running idle connections cleanup task");
log.debug(
"Running idle connections and sessions cleanup task with session timeout: {}ms, connection timeout: {}ms",
SESSION_IDLE_TIMEOUT_MS, CONNECTION_IDLE_TIMEOUT_MS);
// 创建要移除的连接列表避免在迭代时修改Map
List<String> idleHostnames = new java.util.ArrayList<>();
int totalClosedSessions = 0;
// 查找所有空闲连接
// 首先清理每个连接中的空闲session
for (Map.Entry<String, ConnectionInfo> entry : connectionPool.entrySet()) {
String hostname = entry.getKey();
ConnectionInfo connectionInfo = entry.getValue();
// 检查连接是否空闲超时
if (connectionInfo != null && connectionInfo.isIdle(CONNECTION_IDLE_TIMEOUT_MS)) {
idleHostnames.add(hostname);
log.debug("Found idle connection for host: {}, will be closed", hostname);
if (connectionInfo != null) {
// 清理该连接下的空闲session - 检查session是否超时超时则关闭
log.debug("Checking for idle sessions on host: {}", hostname);
int closedSessions = connectionInfo.cleanupIdleSessions(SESSION_IDLE_TIMEOUT_MS);
totalClosedSessions += closedSessions;
if (closedSessions > 0) {
log.debug("Closed {} idle/expired sessions for host: {}", closedSessions, hostname);
}
// 然后检查连接是否空闲超时
if (connectionInfo.isIdle(CONNECTION_IDLE_TIMEOUT_MS)) {
idleHostnames.add(hostname);
log.debug("Found idle connection for host: {}, will be closed", hostname);
}
}
}
@@ -219,12 +354,18 @@ public class SmbFileService implements DisposableBean {
ConnectionInfo connectionInfo = connectionPool.get(hostname);
if (connectionInfo != null) {
try {
// 先关闭所有session
connectionInfo.closeAllSessions();
log.debug("Closed all remaining sessions for host: {}", hostname);
// 再关闭连接
log.debug("Closing idle connection for host: {}", hostname);
connectionInfo.getConnection().close();
} catch (IOException e) {
log.error("Error closing idle connection for host: {}", hostname, e);
} finally {
connectionPool.remove(hostname);
log.debug("Removed connection from pool for host: {}", hostname);
}
connectionPool.remove(hostname);
}
}
} finally {
@@ -232,12 +373,14 @@ public class SmbFileService implements DisposableBean {
}
}
log.debug("Idle connections cleanup completed, closed {} connections", idleHostnames.size());
log.debug(
"Idle connections and sessions cleanup completed successfully. Results: closed {} connections and {} expired sessions",
idleHostnames.size(), totalClosedSessions);
}
/**
* 执行SMB操作的通用方法使用连接池
*
* 执行SMB操作的通用方法使用连接池和session池
*
* @param smbPath SMB路径
* @param operation 要执行的操作
* @param <T> 操作返回类型
@@ -246,26 +389,69 @@ public class SmbFileService implements DisposableBean {
*/
private <T> T executeSmbOperation(SmbPath smbPath, SmbOperation<T> operation) throws IOException {
String hostname = smbPath.getHostname();
com.hierynomus.smbj.connection.Connection connection = null;
ConnectionInfo connectionInfo = null;
SessionInfo sessionInfo = null;
T re = null;
try {
// 从连接池获取连接
connection = getConnection(hostname);
// 尝试执行获取连接执行操作,当发生 TransportException 时 尝试重试最多3次
int maxTrys = 3;
while (maxTrys-- > 0) {
try {
// 获取连接
connectionInfo = getConnectionInfo(hostname);
// 使用获取的连接进行身份验证
var session = connection.authenticate(getAuthenticationContext(hostname));
// 从session池获取session
sessionInfo = connectionInfo.peekSession();
// 如果session不存在
if (sessionInfo == null) {
// 获取认证上下文
AuthenticationContext authContext = getAuthenticationContext(hostname);
// 创建新session并添加到池中
sessionInfo = connectionInfo.createSession(authContext);
log.debug("Created new SMB session for host: {}", hostname);
} else {
log.debug("Reusing SMB session for host: {}", hostname);
}
try (var share = (DiskShare) session.connectShare(smbPath.getShareName())) {
return operation.execute(share, smbPath.getPath());
// 连接共享
try (var share = (DiskShare) sessionInfo.getSession().connectShare(smbPath.getShareName())) {
re = operation.execute(share, smbPath.getPath());
// 操作完成后更新session的最后使用时间将session放回池中
connectionInfo.returnSession(sessionInfo);
log.debug("Returned SMB session to pool for host: {}", hostname);
} catch (SMBRuntimeException e) {
sessionInfo.close();
throw e;
} finally {
}
break;
} catch (TransportException e) {
log.warn("TransportException occurred while trying to connect to host: {}. Retrying...", hostname);
// 延迟1秒
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
continue;
} catch (IOException e) {
// 如果操作失败且连接信息存在,检查连接状态
if (connectionInfo != null) {
com.hierynomus.smbj.connection.Connection connection = connectionInfo.getConnection();
if (connection != null && !connection.isConnected()) {
// 从连接池移除失效的连接并关闭所有session
connectionInfo.closeAllSessions();
connectionPool.remove(hostname);
log.debug("Removed disconnected SMB connection from pool for host: {}", hostname);
}
}
throw e;
}
} catch (IOException e) {
// 如果操作失败且连接存在,检查连接状态
if (connection != null && !connection.isConnected()) {
// 从连接池移除失效的连接
connectionPool.remove(hostname);
}
throw e;
}
return re;
}
/**
@@ -447,10 +633,10 @@ public class SmbFileService implements DisposableBean {
}
/**
* 关闭并清理所有连接资源
* 关闭并清理所有连接和session资源
*/
public void shutdown() {
log.debug("Shutting down SMB connection pool");
log.debug("Shutting down SMB connection pool and sessions");
// 关闭定时清理任务
try {
@@ -463,15 +649,20 @@ public class SmbFileService implements DisposableBean {
Thread.currentThread().interrupt();
}
// 关闭所有连接
// 关闭所有连接和session
connectionPoolLock.lock();
try {
for (Map.Entry<String, ConnectionInfo> entry : connectionPool.entrySet()) {
String hostname = entry.getKey();
ConnectionInfo connectionInfo = entry.getValue();
try {
log.debug("Closing connection to host: {}", entry.getKey());
entry.getValue().getConnection().close();
// 先关闭所有session
connectionInfo.closeAllSessions();
// 再关闭连接
log.debug("Closing connection to host: {}", hostname);
connectionInfo.getConnection().close();
} catch (IOException e) {
log.error("Error closing connection to host: {}", entry.getKey(), e);
log.error("Error closing connection or sessions to host: {}", hostname, e);
}
}
connectionPool.clear();

View File

@@ -8,7 +8,20 @@ import com.ecep.contract.Message;
import com.ecep.contract.handler.SessionInfo;
import com.fasterxml.jackson.databind.JsonNode;
/**
* WebSocket服务器任务接口
* 定义了所有通过WebSocket与客户端通信的任务的通用方法
* 包括任务名称、初始化参数、设置会话、更新消息、更新标题、更新进度等操作
*
* 所有通过WebSocket与客户端通信的任务类都应实现此接口, 文档参考 .trace/rules/server_task_rules.md
* tips检查是否在 tasker_mapper.json 中注册
*/
public interface WebSocketServerTasker extends Callable<Object> {
/**
* 初始化任务参数
*
* @param argsNode 任务参数的JSON节点
*/
void init(JsonNode argsNode);
default void setSession(SessionInfo session) {
@@ -19,10 +32,19 @@ public interface WebSocketServerTasker extends Callable<Object> {
*/
void setMessageHandler(Predicate<Message> messageHandler);
/**
* 设置标题处理函数
*/
void setTitleHandler(Predicate<String> titleHandler);
/**
* 设置属性处理函数
*/
void setPropertyHandler(BiConsumer<String, Object> propertyHandler);
/**
* 设置进度处理函数
*/
void setProgressHandler(BiConsumer<Long, Long> progressHandler);
}

View File

@@ -66,5 +66,17 @@ server.error.whitelabel.enabled=false
# 设置错误处理路径确保404等错误能被全局异常处理器捕获
spring.web.resources.add-mappings=true
smb.server.username=qiqing.song
smb.server.password=huez8310
# 多服务器配置(请根据实际情况配置)
# 格式:smb.servers.[服务器标识].host=主机地址
# smb.servers.[服务器标识].username=用户名
# smb.servers.[服务器标识].password=密码
#
# 当前配置的服务器:
smb.servers.server1.host=10.84.209.8
smb.servers.server1.username=qiqing.song
smb.servers.server1.password=huez8310
#
# 可以添加更多服务器配置:
# smb.servers.server2.host=10.84.209.9
# smb.servers.server2.username=user2
# smb.servers.server2.password=pass2

View File

@@ -16,7 +16,9 @@
"CompanyVendorEvaluationFormUpdateTask": "com.ecep.contract.service.tasker.CompanyVendorEvaluationFormUpdateTask",
"VendorNextSignDateTask": "com.ecep.contract.service.tasker.VendorNextSignDateTask",
"InventorySyncTask": "com.ecep.contract.ds.other.controller.InventorySyncTask",
"InventoryAllSyncTask": "com.ecep.contract.ds.other.controller.InventoryAllSyncTask"
"InventoryAllSyncTask": "com.ecep.contract.ds.other.controller.InventoryAllSyncTask",
"ContractRepairAllTask": "com.ecep.contract.ds.contract.tasker.ContractRepairAllTasker",
"ContractFilesRebuildAllTasker": "com.ecep.contract.ds.contract.tasker.ContractFilesRebuildAllTasker"
},
"descriptions": "任务注册信息, 客户端的任务可以通过 WebSocket 调用"
}