feat: 新增WebSocket任务管理器及相关任务实现

实现WebSocketServerTaskManager用于管理WebSocket任务,并添加多个任务类:
- CompanyContext和CloudRkContext接口定义
- WebSocketServerTasker接口及多个具体任务实现类
- ContractVerifyTasker合同验证任务
- ContractRepairTasker合同修复任务
- CompanyCustomerRebuildFilesTasker客户文件重建任务
- CompanyVerifyTasker企业验证任务
- CustomerFileMoveTasker客户文件移动任务
- CompanyCompositeUpdateTasker企业综合更新任务
- ProjectCostImportItemsFromContractsTasker项目成本导入任务
- 其他相关辅助任务类

这些任务类通过WebSocket与前端交互,实现各种业务功能
This commit is contained in:
2025-09-28 18:18:32 +08:00
parent 510952d72e
commit df6188db40
13 changed files with 124 additions and 104 deletions

View File

@@ -0,0 +1,4 @@
package com.ecep.contract.cloud.rk.ctx;
public interface CloudRkContext {
}

View File

@@ -0,0 +1,4 @@
package com.ecep.contract.ds.company;
public interface CompanyContext {
}

View File

@@ -1,13 +1,5 @@
package com.ecep.contract.ds.company.tasker;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import com.ecep.contract.MessageHolder;
import com.ecep.contract.cloud.rk.CloudRkService;
import com.ecep.contract.cloud.rk.ctx.CloudRkCtx;
@@ -23,9 +15,16 @@ import com.ecep.contract.model.CloudYu;
import com.ecep.contract.model.Company;
import com.ecep.contract.service.WebSocketServerTasker;
import com.ecep.contract.ui.Tasker;
import com.ecep.contract.util.MyStringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.util.StringUtils;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* 合并更新
@@ -34,6 +33,7 @@ public class CompanyCompositeUpdateTasker extends Tasker<Object> implements WebS
private static final Logger logger = LoggerFactory.getLogger(CompanyCompositeUpdateTasker.class);
CloudRkCtx cloudRkCtx = new CloudRkCtx();
// CloudYuCtx cloudYuCtx = new CloudYuCtx();
@Setter
private CloudRkService cloudRkService;
@@ -52,25 +52,22 @@ public class CompanyCompositeUpdateTasker extends Tasker<Object> implements WebS
@Override
protected Object execute(MessageHolder holder) throws Exception {
holder.debug("1. 从 " + CloudServiceConstant.RK_NAME + " 更新...");
updateProgress(0.1, 1);
syncFromCloudRk(holder);
holder.debug("2. 从 " + CloudServiceConstant.U8_NAME + " 更新...");
updateProgress(0.3, 1);
syncFromYongYouU8(holder);
holder.debug("3. 从 " + CloudServiceConstant.TYC_NAME + " 更新...");
updateProgress(0.5, 1);
syncFromCloudTyc(holder);
updateProgress(0.9, 1);
updateProgress(0.7, 1);
return null;
}
private void syncFromCloudRk(MessageHolder holder) {
holder.debug("1. 从 " + CloudServiceConstant.RK_NAME + " 更新...");
try {
cloudRkService = getBean(CloudRkService.class);
cloudRkService = getCachedBean(CloudRkService.class);
} catch (BeansException e) {
holder.warn("未启用 " + CloudServiceConstant.RK_NAME + " 服务");
holder.warn("服务未启用");
return;
}
CloudRk cloudRk = cloudRkService.getOrCreateCloudRk(company);
@@ -80,22 +77,29 @@ public class CompanyCompositeUpdateTasker extends Tasker<Object> implements WebS
}
try {
cloudRkCtx.setCloudRkService(cloudRkService);
if (cloudRkCtx.syncCompany(company, cloudRk, holder)) {
}
} catch (Exception e) {
cloudRk.setDescription(e.getMessage());
String message = e.getMessage();
if (message.length() > 50) {
message = message.substring(0, 50);
}
cloudRk.setDescription(message);
} finally {
cloudRk.setLatestUpdate(LocalDateTime.now());
cloudRkService.save(cloudRk);
try {
cloudRkService.save(cloudRk);
} catch (Exception e) {
holder.error("保存 CloudRk 错误: " + cloudRk.getDescription());
}
}
}
private void syncFromYongYouU8(MessageHolder holder) {
holder.debug("2. 从 " + CloudServiceConstant.U8_NAME + " 更新...");
try {
yongYouU8Service = getBean(YongYouU8Service.class);
yongYouU8Service = getCachedBean(YongYouU8Service.class);
} catch (BeansException e) {
holder.warn("未启用 " + CloudServiceConstant.U8_NAME + " 服务");
return;
@@ -150,7 +154,7 @@ public class CompanyCompositeUpdateTasker extends Tasker<Object> implements WebS
private void syncFromCloudTyc(MessageHolder holder) {
holder.debug("3. 从 " + CloudServiceConstant.TYC_NAME + " 更新...");
try {
cloudTycService = getBean(CloudTycService.class);
cloudTycService = getCachedBean(CloudTycService.class);
} catch (BeansException e) {
holder.warn("未启用 " + CloudServiceConstant.TYC_NAME + " 服务");
return;

View File

@@ -29,19 +29,12 @@ public class CompanyVerifyTasker extends Tasker<Object> implements WebSocketServ
ContractVerifyComm comm = new ContractVerifyComm();
public CompanyService getCompanyService() {
return getCachedBean(CompanyService.class);
}
public ContractService getContractService() {
return getCachedBean(ContractService.class);
}
/**
* 核验公司名下的所有合同
*
* @param company 公司
* @param holder 输出
*/
@Override
public void init(JsonNode argsNode) {

View File

@@ -1,7 +1,5 @@
package com.ecep.contract.ds.project;
import static com.ecep.contract.SpringApp.getBean;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
@@ -10,12 +8,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Collectors;
import com.ecep.contract.service.WebSocketServerTasker;
import com.ecep.contract.ui.MessageHolderImpl;
import com.fasterxml.jackson.databind.JsonNode;
import org.hibernate.Hibernate;
import com.ecep.contract.MessageHolder;
@@ -24,6 +18,7 @@ import com.ecep.contract.ds.contract.service.ContractService;
import com.ecep.contract.ds.other.service.InventoryService;
import com.ecep.contract.ds.project.service.ProjectCostItemService;
import com.ecep.contract.ds.project.service.ProjectCostService;
import com.ecep.contract.handler.SessionInfo;
import com.ecep.contract.model.Contract;
import com.ecep.contract.model.ContractItem;
import com.ecep.contract.model.Employee;
@@ -31,9 +26,11 @@ import com.ecep.contract.model.Inventory;
import com.ecep.contract.model.Project;
import com.ecep.contract.model.ProjectCost;
import com.ecep.contract.model.ProjectCostItem;
import com.ecep.contract.service.WebSocketServerTasker;
import com.ecep.contract.ui.Tasker;
import com.ecep.contract.util.NumberUtils;
import com.ecep.contract.util.TaxRateUtils;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Setter;
@@ -45,15 +42,24 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker<Object> im
AtomicBoolean verified = new AtomicBoolean(true);
@Override
public void init(JsonNode argsNode) {
int contractId = argsNode.get(0).asInt();
cost = getCachedBean(ProjectCostService.class).findById(contractId);
public void setSession(SessionInfo session) {
currentUser = () -> {
return getEmployeeService().findById(session.getEmployeeId());
};
}
@Override
protected Void execute(MessageHolder holder) throws Exception {
importFromContracts(cost, holder);
return null;
public Employee getCurrentUser() {
if (currentUser == null) {
return null;
}
return currentUser.get();
}
@Override
public void init(JsonNode argsNode) {
int contractId = argsNode.get(0).asInt();
cost = getCachedBean(ProjectCostService.class).findById(contractId);
}
public void importFromContracts(ProjectCost projectCost, MessageHolder holder) {
@@ -134,6 +140,20 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker<Object> im
- projectCost.getTaxAndSurchargesFee()) * projectCost.getTaxAndSurcharges() / 100);
}
public ProjectCostItemService getItemService() {
return getCachedBean(ProjectCostItemService.class);
}
public ProjectCostItem save(ProjectCostItem entity) {
return getItemService().save(entity);
}
@Override
protected Void execute(MessageHolder holder) throws Exception {
importFromContracts(cost, holder);
return null;
}
String toKey(ContractItem item) {
return item.getTitle() + "_" + item.getSpecification();
}
@@ -142,8 +162,24 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker<Object> im
return item.getTitle() + "_" + item.getSpecification();
}
ProjectCostService getCostService() {
return getCachedBean(ProjectCostService.class);
}
ContractService getContractService() {
return getCachedBean(ContractService.class);
}
ContractItemService getContractItemService() {
return getCachedBean(ContractItemService.class);
}
InventoryService getInventoryService() {
return getCachedBean(InventoryService.class);
}
private void compositeOut(Map<Inventory, List<ContractItem>> map, List<ProjectCostItem> results,
MessageHolder holder) {
MessageHolder holder) {
// 根据存货匹配可对多个相同的存货进行合并
for (Map.Entry<Inventory, List<ContractItem>> entry : map.entrySet()) {
Inventory inventory = Hibernate.isInitialized(entry.getKey()) ? entry.getKey()
@@ -336,29 +372,4 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker<Object> im
save(projectCostItem);
}
ProjectCostService getCostService() {
return getCachedBean(ProjectCostService.class);
}
public ProjectCostItemService getItemService() {
return getCachedBean(ProjectCostItemService.class);
}
ContractService getContractService() {
return getCachedBean(ContractService.class);
}
ContractItemService getContractItemService() {
return getCachedBean(ContractItemService.class);
}
InventoryService getInventoryService() {
return getCachedBean(InventoryService.class);
}
public ProjectCostItem save(ProjectCostItem entity) {
return getItemService().save(entity);
}
}

View File

@@ -2,6 +2,7 @@ package com.ecep.contract.service;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -14,11 +15,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import com.ecep.contract.Message;
import com.ecep.contract.constant.WebSocketConstant;
import com.ecep.contract.handler.SessionInfo;
import com.ecep.contract.model.Voable;
import com.ecep.contract.ui.Tasker;
import com.fasterxml.jackson.databind.JsonNode;
@@ -58,18 +58,26 @@ public class WebSocketServerTaskManager implements InitializingBean {
}
}
public void onMessage(WebSocketSession session, JsonNode jsonNode) {
public void onMessage(SessionInfo session, JsonNode jsonNode) {
// 处理 sessionId 的消息
String sessionId = jsonNode.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText();
try {
handleAsSessionCallback(session, sessionId, jsonNode);
} catch (Exception e) {
sendError(session, sessionId, e.getMessage());
sendError(session, sessionId, WebSocketConstant.ERROR_CODE_INTERNAL_SERVER_ERROR, e.getMessage());
logger.warn("处理会话回调失败 (会话ID: {}): {}", sessionId, e.getMessage(), e);
}
}
private void handleAsSessionCallback(WebSocketSession session, String sessionId, JsonNode jsonNode) {
private void sendError(SessionInfo session, String sessionId, int errorCode, String message) {
try {
session.sendError(WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, errorCode, message);
} catch (IOException e) {
logger.warn("发送错误消息失败 (消息ID: {}): {}", sessionId, e.getMessage(), e);
}
}
private void handleAsSessionCallback(SessionInfo session, String sessionId, JsonNode jsonNode) {
if (!jsonNode.has("type")) {
throw new IllegalArgumentException("缺失 type 参数");
}
@@ -79,7 +87,7 @@ public class WebSocketServerTaskManager implements InitializingBean {
}
}
private void createTask(WebSocketSession session, String sessionId, JsonNode jsonNode) {
private void createTask(SessionInfo session, String sessionId, JsonNode jsonNode) {
if (!jsonNode.has(WebSocketConstant.ARGUMENTS_FIELD_NAME)) {
throw new IllegalArgumentException("缺失 " + WebSocketConstant.ARGUMENTS_FIELD_NAME + " 参数");
}
@@ -106,6 +114,7 @@ public class WebSocketServerTaskManager implements InitializingBean {
}
if (tasker instanceof WebSocketServerTasker t) {
t.setSession(session);
t.setTitleHandler(title -> sendToSession(session, sessionId, "title", title));
t.setMessageHandler(msg -> sendMessageToSession(session, sessionId, msg));
t.setPropertyHandler((name, value) -> {
@@ -125,7 +134,6 @@ public class WebSocketServerTaskManager implements InitializingBean {
callable.call();
sendToSession(session, sessionId, "done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
@@ -133,45 +141,36 @@ public class WebSocketServerTaskManager implements InitializingBean {
}
private boolean sendMessageToSession(WebSocketSession session, String sessionId, Message msg) {
private boolean sendMessageToSession(SessionInfo session, String sessionId, Message msg) {
return sendToSession(session, sessionId, "message", msg.getLevel().getName(), msg.getMessage());
}
private boolean sendToSession(WebSocketSession session, String sessionId, String type, Object... args) {
private boolean sendToSession(SessionInfo session, String sessionId, String type, Object... args) {
try {
String text = objectMapper.writeValueAsString(Map.of(
session.send(Map.of(
WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
"type", type,
WebSocketConstant.ARGUMENTS_FIELD_NAME, args));
session.sendMessage(new TextMessage(text));
return true;
} catch (IOException e) {
// 捕获所有可能的异常防止影响主流程
logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
logger.warn("发送消息失败 (消息ID: {}): {}", sessionId, e.getMessage(), e);
return false;
}
return true;
}
private void sendError(WebSocketSession session, String sessionId, String message) {
if (session == null || !session.isOpen()) {
logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message);
return;
private void send(SessionInfo session, String sessionId, Object data) {
Map<String, Object> map = new HashMap<>();
if (data instanceof Voable<?>) {
map.put("data", ((Voable<?>) data).toVo());
} else {
map.put("data", data);
}
map.put(WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId);
map.put(WebSocketConstant.SUCCESS_FIELD_NAME, true);
try {
String errorMessage = objectMapper.writeValueAsString(Map.of(
WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
WebSocketConstant.SUCCESS_FIELD_VALUE, false,
WebSocketConstant.MESSAGE_FIELD_NAME, message));
// 检查会话状态并尝试发送错误消息
if (session.isOpen()) {
session.sendMessage(new TextMessage(errorMessage));
} else {
logger.warn("会话已关闭,无法发送错误消息: {}", message);
}
} catch (Exception e) {
// 捕获所有可能的异常防止影响主流程
logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
session.send(map);
} catch (IOException e) {
logger.warn("发送消息失败 (消息ID: {}): {}", sessionId, e.getMessage(), e);
}
}
}

View File

@@ -1,13 +1,19 @@
package com.ecep.contract.service;
import com.ecep.contract.Message;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import com.ecep.contract.Message;
import com.ecep.contract.handler.SessionInfo;
import com.fasterxml.jackson.databind.JsonNode;
public interface WebSocketServerTasker extends Callable<Object> {
void init(JsonNode argsNode);
default void setSession(SessionInfo session) {
}
/**
* 设置消息处理函数
*/
@@ -17,7 +23,6 @@ public interface WebSocketServerTasker extends Callable<Object> {
void setPropertyHandler(BiConsumer<String, Object> propertyHandler);
void init(JsonNode argsNode);
void setProgressHandler(BiConsumer<Long, Long> progressHandler);
}