feat: 实现员工同步任务的WebSocket支持及合同名称锁定功能

- 为EmployeesSyncTask添加WebSocket客户端和服务端支持,实现实时任务进度反馈
- 新增合同名称锁定功能,防止误修改重要合同名称
- 优化SmbFileService的连接异常处理,提高稳定性
- 重构ContractFilesRebuildTasker的任务执行逻辑,改进错误处理
- 更新tasker_mapper.json注册EmployeesSyncTask
- 添加相关任务文档和验收报告

修复WebSocketClientSession的任务完成状态处理问题
改进UITools中任务执行的线程管理
优化DepartmentService的findByCode方法返回类型
This commit is contained in:
2025-11-20 16:26:34 +08:00
parent 02afa189f8
commit a784438e97
28 changed files with 983 additions and 329 deletions

View File

@@ -6,6 +6,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import com.ecep.contract.vo.DepartmentVo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@@ -16,18 +17,33 @@ import com.ecep.contract.ds.other.service.DepartmentService;
import com.ecep.contract.ds.other.service.EmployeeService;
import com.ecep.contract.model.Department;
import com.ecep.contract.model.Employee;
import com.ecep.contract.service.tasker.WebSocketServerTasker;
import com.ecep.contract.ui.Tasker;
import com.ecep.contract.vo.EmployeeVo;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 用友U8系统-同步员工信息
* 支持WebSocket通信的服务器端任务实现
*/
public class EmployeesSyncTask extends Tasker<Object> {
public class EmployeesSyncTask extends Tasker<Object> implements WebSocketServerTasker {
private static final Logger logger = LoggerFactory.getLogger(EmployeesSyncTask.class);
private final AtomicInteger counter = new AtomicInteger(0);
DepartmentService departmentService;
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SyncArgs {
private boolean fullSync = false; // 是否全量同步
private String departmentCode; // 指定部门代码(可选)
}
public EmployeesSyncTask() {
updateTitle("用友U8系统-同步员工信息");
}
@@ -39,18 +55,50 @@ public class EmployeesSyncTask extends Tasker<Object> {
return departmentService;
}
@Override
public void init(JsonNode argsNode) {
// 解析初始化参数,支持空参数调用
if (argsNode != null && argsNode.size() > 0) {
SyncArgs syncArgs = new SyncArgs();
// 检查是否有参数对象
if (argsNode.size() > 0) {
JsonNode firstArg = argsNode.get(0);
if (firstArg.isObject()) {
// 如果是对象参数解析fullSync和departmentCode
if (firstArg.has("fullSync")) {
syncArgs.setFullSync(firstArg.get("fullSync").asBoolean(false));
}
if (firstArg.has("departmentCode")) {
syncArgs.setDepartmentCode(firstArg.get("departmentCode").asText(null));
}
}
}
logger.info("初始化员工同步任务参数: {}", syncArgs);
} else {
logger.info("初始化员工同步任务,使用默认参数");
}
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
YongYouU8Service service = SpringApp.getBean(YongYouU8Service.class);
// 发送任务开始消息
holder.info("开始从U8系统同步员工信息");
holder.debug("读取 U8 系统 Person 数据表...");
List<Map<String, Object>> list = service.queryAllPerson();
int size = list.size();
holder.debug("总共读取 Person 数据 " + size + "");
// 发送总进度信息
updateProgress(10, 1000);
for (Map<String, Object> rs : list) {
if (isCancelled()) {
holder.debug("Cancelled");
holder.debug("任务已取消");
return null;
}
@@ -58,8 +106,13 @@ public class EmployeesSyncTask extends Tasker<Object> {
sync(rs, sub);
// 更新进度
updateProgress(counter.incrementAndGet(), size);
int current = counter.incrementAndGet();
updateProgress(10 + current * 1000 / size, 1000);
}
// 发送任务完成消息
holder.info("员工信息同步任务完成,共处理 " + counter.get() + " 条记录");
return null;
}
@@ -101,7 +154,7 @@ public class EmployeesSyncTask extends Tasker<Object> {
}
if (StringUtils.hasText(departmentCode)) {
Department departmentByCode = getDepartmentService().findByCode(departmentCode);
DepartmentVo departmentByCode = getDepartmentService().findByCode(departmentCode);
if (departmentByCode == null) {
subHolder.warn("部门代码:" + departmentCode + "未匹配到部门");
} else {

View File

@@ -224,9 +224,16 @@ public class ContractCtx extends AbstractYongYouU8Ctx {
if (updateText(contract::getCode, contract::setCode, code, holder, "合同编号")) {
modified = true;
}
if (updateText(contract::getName, contract::setName, name, holder, "合同名称")) {
modified = true;
// 合同名称是否锁定,锁定后不允许修改合同名称
if (contract.isNameLocked()) {
holder.debug("合同名称已锁定,不允许修改合同名称");
} else {
if (updateText(contract::getName, contract::setName, name, holder, "合同名称")) {
modified = true;
}
}
if (StringUtils.hasText(parentCode)) {
if (updateText(contract::getParentCode, contract::setParentCode, parentCode, holder, "父合同号")) {
modified = true;

View File

@@ -3,7 +3,15 @@ package com.ecep.contract.ds;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
/**
* 自定义JPA仓库接口
* 继承JpaRepository和JpaSpecificationExecutor, 提供基本的CRUD操作和查询功能
*
* 所有实体类都应实现此接口, 文档参考 .trace/rules/server_repository_rules.md、.trace/rules/repository_comprehensive_analysis_report.md
*
* @param <T> 实体类型
* @param <ID> 实体ID类型
*/
public interface MyRepository<T, ID> extends JpaRepository<T, ID>, JpaSpecificationExecutor<T> {
}

View File

@@ -57,6 +57,9 @@ public class Contract
@Column(name = "NAME")
private String name;
@Column(name = "NAME_LOCKED")
private boolean nameLocked = false;
/**
* 合同状态U8 系统中的合同状态,目前含义未知
* <table>
@@ -301,20 +304,21 @@ public class Contract
vo.setId(id);
vo.setGuid(getGuid());
vo.setCode(getCode());
vo.setName(name);
vo.setName(getName());
vo.setNameLocked(isNameLocked());
if (getCompany() != null) {
vo.setCompanyId(getCompany().getId());
}
if (group != null) {
if (getGroup() != null) {
vo.setGroupId(group.getId());
}
if (type != null) {
if (getType() != null) {
vo.setTypeId(type.getId());
}
if (kind != null) {
if (getKind() != null) {
vo.setKindId(kind.getId());
}
if (project != null) {
if (getProject() != null) {
vo.setProject(project.getId());
}
vo.setParentCode(getParentCode());
@@ -356,5 +360,4 @@ public class Contract
vo.setVersion(getVersion());
return vo;
}
}

View File

@@ -39,7 +39,6 @@ import com.ecep.contract.ds.project.service.ProjectService;
import com.ecep.contract.ds.company.model.Company;
import com.ecep.contract.ds.customer.model.CompanyCustomer;
import com.ecep.contract.ds.contract.model.Contract;
import com.ecep.contract.model.ContractCatalog;
import com.ecep.contract.ds.project.model.Project;
import com.ecep.contract.ds.vendor.model.Vendor;
import com.ecep.contract.service.VoableService;
@@ -453,6 +452,7 @@ public class ContractService extends EntityService<Contract, ContractVo, Integer
contract.setCode(vo.getCode());
contract.setName(vo.getName());
contract.setNameLocked(vo.isNameLocked());
contract.setGuid(vo.getGuid());
contract.setState(vo.getState());
contract.setPath(vo.getPath());

View File

@@ -22,12 +22,22 @@ public class ContractFilesRebuildTasker extends Tasker<Object> implements WebSoc
private ContractVo contract;
private boolean repaired = false;
public ContractFilesRebuildTasker() {
updateTitle("合同文件重置");
@Override
public void init(JsonNode argsNode) {
log.info("初始化合同文件重建任务,参数: {}", argsNode);
// 从JSON参数中提取合同信息
if (argsNode != null && !argsNode.isEmpty()) {
ContractService contractService = getCachedBean(ContractService.class);
int contractId = argsNode.get(0).asInt();
this.contract = contractService.findById(contractId);
}
}
@Override
protected Object execute(MessageHolder holder) throws Exception {
updateTitle("合同文件重置");
log.info("开始执行合同文件重建任务: {}", contract != null ? contract.getCode() : "未知合同");
try {
@@ -41,23 +51,22 @@ public class ContractFilesRebuildTasker extends Tasker<Object> implements WebSoc
// 执行文件重建逻辑
ContractCtx contractCtx = new ContractCtx();
boolean success = contractCtx.syncContractFiles(contract, holder);
boolean modified = contractCtx.syncContractFiles(contract, holder);
updateProgress(75, 100);
if (success) {
if (modified) {
repaired = true;
updateMessage("合同文件重建成功");
log.info("合同文件重建成功: {}", contract.getCode());
} else {
updateMessage("合同文件重建失败");
log.warn("合同文件重建失败: {}", contract.getCode());
}
updateMessage("合同文件重建成功");
log.info("合同文件重建成功: {}", contract.getCode());
updateProperty("repaired", repaired);
updateProgress(100, 100);
updateMessage("任务完成");
return success;
return modified;
} catch (Exception e) {
log.error("合同文件重建任务执行失败: {}", contract != null ? contract.getCode() : "未知合同", e);
@@ -66,15 +75,4 @@ public class ContractFilesRebuildTasker extends Tasker<Object> implements WebSoc
}
}
@Override
public void init(JsonNode argsNode) {
log.info("初始化合同文件重建任务,参数: {}", argsNode);
// 从JSON参数中提取合同信息
if (argsNode != null && argsNode.size() > 0) {
ContractService contractService = getCachedBean(ContractService.class);
int contractId = argsNode.get(0).asInt();
this.contract = contractService.findById(contractId);
}
}
}

View File

@@ -40,14 +40,14 @@ public class DepartmentService implements IEntityService<Department>, QueryServi
public DepartmentVo findById(Integer id) {
return repository.findById(id).map(Department::toVo).orElse(null);
}
public Department getById(Integer id) {
return repository.findById(id).orElse(null);
}
@Cacheable(key = "'code-'+#p0")
public Department findByCode(String code) {
return repository.findByCode(code).orElse(null);
public DepartmentVo findByCode(String code) {
return repository.findByCode(code).map(Department::toVo).orElse(null);
}
@Override
@@ -106,7 +106,7 @@ public class DepartmentService implements IEntityService<Department>, QueryServi
department.setCode(vo.getCode());
department.setName(vo.getName());
department.setActive(vo.isActive());
// 处理leader字段
if (vo.getLeaderId() == null) {
department.setLeader(null);

View File

@@ -15,7 +15,6 @@ import com.hierynomus.smbj.SMBClient;
import com.hierynomus.smbj.auth.AuthenticationContext;
import com.hierynomus.smbj.common.SMBRuntimeException;
import com.hierynomus.smbj.common.SmbPath;
import com.hierynomus.smbj.event.SessionLoggedOff;
import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.share.File;
import lombok.extern.slf4j.Slf4j;
@@ -25,6 +24,7 @@ import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -271,6 +271,7 @@ public class SmbFileService implements DisposableBean {
// 更新连接的最后使用时间
connectionInfo.updateLastUsedTimestamp();
log.debug("Reusing SMB connection for host: {}", hostname);
return connectionInfo;
}
log.debug("Closing invalid SMB connection for host: {}", hostname);
@@ -289,7 +290,6 @@ public class SmbFileService implements DisposableBean {
} finally {
connectionPoolLock.unlock();
}
}
return null;
}
@@ -305,6 +305,27 @@ public class SmbFileService implements DisposableBean {
return getConnectionInfo(hostname).getConnection();
}
/**
* 判断异常是否为连接相关异常
*
* @param e 异常
* @return 如果是连接相关异常返回true否则返回false
*/
private boolean isConnectionException(Exception e) {
if (e instanceof SMBRuntimeException) {
Throwable cause = e.getCause();
while (cause != null) {
if (cause instanceof TransportException || cause instanceof SocketException) {
return true;
}
cause = cause.getCause();
}
} else if (e instanceof IOException) {
return e instanceof SocketException || e.getMessage().contains("连接");
}
return false;
}
// Session空闲超时时间2分钟比连接超时时间短
private static final long SESSION_IDLE_TIMEOUT_MS = 2 * 60 * 1000;
@@ -416,6 +437,15 @@ public class SmbFileService implements DisposableBean {
log.debug("Created new SMB session for host: {}", hostname);
} catch (SMBRuntimeException ex) {
log.error("Failed to create SMB session for host: {}, maxTrys:{}", hostname, maxTrys, ex);
// 检查是否是连接问题,如果是则从池中移除连接
if (isConnectionException(ex)) {
connectionPoolLock.lock();
try {
connectionPool.remove(hostname);
} finally {
connectionPoolLock.unlock();
}
}
continue;
}
} else {
@@ -435,6 +465,10 @@ public class SmbFileService implements DisposableBean {
} catch (IOException ignored) {
}
log.error("Failed to execute SMB operation for host: {}, maxTrys:{}", hostname, maxTrys, e);
// 检查是否是连接问题,如果是则从池中移除连接
if (isConnectionException(e)) {
connectionPool.remove(hostname);
}
continue;
} finally {
@@ -451,6 +485,11 @@ public class SmbFileService implements DisposableBean {
log.debug("Removed disconnected SMB connection from pool for host: {}", hostname);
}
}
// 如果是连接异常且还有重试次数,继续重试
if (isConnectionException(e) && maxTrys > 0) {
log.debug("Retrying SMB operation due to connection exception, remaining tries: {}", maxTrys);
continue;
}
throw e;
}
}

View File

@@ -144,7 +144,6 @@ public class WebSocketServerTaskManager implements InitializingBean {
}
});
}
}
private boolean sendMessageToSession(SessionInfo session, String sessionId, Message msg) {

View File

@@ -12,14 +12,14 @@ import com.fasterxml.jackson.databind.JsonNode;
* WebSocket服务器任务接口
* 定义了所有通过WebSocket与客户端通信的任务的通用方法
* 包括任务名称、初始化参数、设置会话、更新消息、更新标题、更新进度等操作
*
* <p>
* 所有通过WebSocket与客户端通信的任务类都应实现此接口, 文档参考 .trace/rules/server_task_rules.md
* tips检查是否在 tasker_mapper.json 中注册
*/
public interface WebSocketServerTasker extends Callable<Object> {
/**
* 初始化任务参数
*
*
* @param argsNode 任务参数的JSON节点
*/
void init(JsonNode argsNode);

View File

@@ -43,6 +43,7 @@
<logger name="com.ecep.contract.manager.ds.customer.controller.CompanyCustomerWindowController" level="debug"/>
<logger name="com.ecep.contract.manager.cloud" level="debug"/>
<logger name="com.ecep.contract.manager.cloud.u8.ContractSyncTask" level="debug"/>
<logger name="com.ecep.contract.service.SmbFileService" level="debug"/>
</configuration>

View File

@@ -19,7 +19,8 @@
"InventoryAllSyncTask": "com.ecep.contract.ds.other.controller.InventoryAllSyncTask",
"ContractRepairAllTask": "com.ecep.contract.ds.contract.tasker.ContractRepairAllTasker",
"ContractFilesRebuildAllTasker": "com.ecep.contract.ds.contract.tasker.ContractFilesRebuildAllTasker",
"ContractFilesRebuildTasker": "com.ecep.contract.ds.contract.tasker.ContractFilesRebuildTasker"
"ContractFilesRebuildTasker": "com.ecep.contract.ds.contract.tasker.ContractFilesRebuildTasker",
"EmployeesSyncTask": "com.ecep.contract.cloud.u8.EmployeesSyncTask"
},
"descriptions": "任务注册信息, 客户端的任务可以通过 WebSocket 调用"
}