From df6188db402bd39962e7c0323484c0619cc00925 Mon Sep 17 00:00:00 2001 From: songqq Date: Sun, 28 Sep 2025 18:18:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9EWebSocket=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86=E5=99=A8=E5=8F=8A=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现WebSocketServerTaskManager用于管理WebSocket任务,并添加多个任务类: - CompanyContext和CloudRkContext接口定义 - WebSocketServerTasker接口及多个具体任务实现类 - ContractVerifyTasker合同验证任务 - ContractRepairTasker合同修复任务 - CompanyCustomerRebuildFilesTasker客户文件重建任务 - CompanyVerifyTasker企业验证任务 - CustomerFileMoveTasker客户文件移动任务 - CompanyCompositeUpdateTasker企业综合更新任务 - ProjectCostImportItemsFromContractsTasker项目成本导入任务 - 其他相关辅助任务类 这些任务类通过WebSocket与前端交互,实现各种业务功能 --- .../contract/cloud/rk/ctx/CloudRkContext.java | 4 + .../contract/ds/company/CompanyContext.java | 4 + .../tasker/CompanyCompositeUpdateTasker.java | 46 +++++----- ...mpanyCustomerEvaluationFormUpdateTask.java | 0 .../CompanyCustomerNextSignDateTask.java | 0 .../CompanyCustomerRebuildFilesTasker.java | 0 .../tasker/CompanyVerifyTasker.java | 7 -- .../tasker/ContractRepairTasker.java} | 0 .../tasker/ContractVerifyTasker.java | 0 .../tasker/CustomerFileMoveTasker.java | 0 ...ectCostImportItemsFromContractsTasker.java | 87 +++++++++++-------- .../WebSocketServerTaskManager.java | 65 +++++++------- .../{ => tasker}/WebSocketServerTasker.java | 15 ++-- 13 files changed, 124 insertions(+), 104 deletions(-) create mode 100644 server/src/main/java/com/ecep/contract/cloud/rk/ctx/CloudRkContext.java create mode 100644 server/src/main/java/com/ecep/contract/ds/company/CompanyContext.java rename server/src/main/java/com/ecep/contract/{ds/company => service}/tasker/CompanyCompositeUpdateTasker.java (85%) rename server/src/main/java/com/ecep/contract/{ds/customer => service}/tasker/CompanyCustomerEvaluationFormUpdateTask.java (100%) rename server/src/main/java/com/ecep/contract/{ds/customer => service}/tasker/CompanyCustomerNextSignDateTask.java (100%) rename server/src/main/java/com/ecep/contract/{ds/customer => service}/tasker/CompanyCustomerRebuildFilesTasker.java (100%) rename server/src/main/java/com/ecep/contract/{ds/company => service}/tasker/CompanyVerifyTasker.java (94%) rename server/src/main/java/com/ecep/contract/{ds/contract/tasker/ContractRepairTask.java => service/tasker/ContractRepairTasker.java} (100%) rename server/src/main/java/com/ecep/contract/{ds/contract => service}/tasker/ContractVerifyTasker.java (100%) rename server/src/main/java/com/ecep/contract/{ds/customer => service}/tasker/CustomerFileMoveTasker.java (100%) rename server/src/main/java/com/ecep/contract/{ds/project => service/tasker}/ProjectCostImportItemsFromContractsTasker.java (97%) rename server/src/main/java/com/ecep/contract/service/{ => tasker}/WebSocketServerTaskManager.java (73%) rename server/src/main/java/com/ecep/contract/service/{ => tasker}/WebSocketServerTasker.java (86%) diff --git a/server/src/main/java/com/ecep/contract/cloud/rk/ctx/CloudRkContext.java b/server/src/main/java/com/ecep/contract/cloud/rk/ctx/CloudRkContext.java new file mode 100644 index 0000000..adc3204 --- /dev/null +++ b/server/src/main/java/com/ecep/contract/cloud/rk/ctx/CloudRkContext.java @@ -0,0 +1,4 @@ +package com.ecep.contract.cloud.rk.ctx; + +public interface CloudRkContext { +} diff --git a/server/src/main/java/com/ecep/contract/ds/company/CompanyContext.java b/server/src/main/java/com/ecep/contract/ds/company/CompanyContext.java new file mode 100644 index 0000000..f148148 --- /dev/null +++ b/server/src/main/java/com/ecep/contract/ds/company/CompanyContext.java @@ -0,0 +1,4 @@ +package com.ecep.contract.ds.company; + +public interface CompanyContext { +} diff --git a/server/src/main/java/com/ecep/contract/ds/company/tasker/CompanyCompositeUpdateTasker.java b/server/src/main/java/com/ecep/contract/service/tasker/CompanyCompositeUpdateTasker.java similarity index 85% rename from server/src/main/java/com/ecep/contract/ds/company/tasker/CompanyCompositeUpdateTasker.java rename to server/src/main/java/com/ecep/contract/service/tasker/CompanyCompositeUpdateTasker.java index db4b537..d3c0c68 100644 --- a/server/src/main/java/com/ecep/contract/ds/company/tasker/CompanyCompositeUpdateTasker.java +++ b/server/src/main/java/com/ecep/contract/service/tasker/CompanyCompositeUpdateTasker.java @@ -1,13 +1,5 @@ package com.ecep.contract.ds.company.tasker; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.BeansException; - import com.ecep.contract.MessageHolder; import com.ecep.contract.cloud.rk.CloudRkService; import com.ecep.contract.cloud.rk.ctx.CloudRkCtx; @@ -23,9 +15,16 @@ import com.ecep.contract.model.CloudYu; import com.ecep.contract.model.Company; import com.ecep.contract.service.WebSocketServerTasker; import com.ecep.contract.ui.Tasker; +import com.ecep.contract.util.MyStringUtils; import com.fasterxml.jackson.databind.JsonNode; - import lombok.Setter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.util.StringUtils; + +import java.time.LocalDate; +import java.time.LocalDateTime; /** * 合并更新 @@ -34,6 +33,7 @@ public class CompanyCompositeUpdateTasker extends Tasker implements WebS private static final Logger logger = LoggerFactory.getLogger(CompanyCompositeUpdateTasker.class); CloudRkCtx cloudRkCtx = new CloudRkCtx(); + // CloudYuCtx cloudYuCtx = new CloudYuCtx(); @Setter private CloudRkService cloudRkService; @@ -52,25 +52,22 @@ public class CompanyCompositeUpdateTasker extends Tasker implements WebS @Override protected Object execute(MessageHolder holder) throws Exception { - - holder.debug("1. 从 " + CloudServiceConstant.RK_NAME + " 更新..."); updateProgress(0.1, 1); syncFromCloudRk(holder); - holder.debug("2. 从 " + CloudServiceConstant.U8_NAME + " 更新..."); updateProgress(0.3, 1); syncFromYongYouU8(holder); - holder.debug("3. 从 " + CloudServiceConstant.TYC_NAME + " 更新..."); + updateProgress(0.5, 1); syncFromCloudTyc(holder); - updateProgress(0.9, 1); + updateProgress(0.7, 1); return null; } private void syncFromCloudRk(MessageHolder holder) { holder.debug("1. 从 " + CloudServiceConstant.RK_NAME + " 更新..."); try { - cloudRkService = getBean(CloudRkService.class); + cloudRkService = getCachedBean(CloudRkService.class); } catch (BeansException e) { - holder.warn("未启用 " + CloudServiceConstant.RK_NAME + " 服务"); + holder.warn("服务未启用"); return; } CloudRk cloudRk = cloudRkService.getOrCreateCloudRk(company); @@ -80,22 +77,29 @@ public class CompanyCompositeUpdateTasker extends Tasker implements WebS } try { - cloudRkCtx.setCloudRkService(cloudRkService); if (cloudRkCtx.syncCompany(company, cloudRk, holder)) { } } catch (Exception e) { - cloudRk.setDescription(e.getMessage()); + String message = e.getMessage(); + if (message.length() > 50) { + message = message.substring(0, 50); + } + cloudRk.setDescription(message); } finally { cloudRk.setLatestUpdate(LocalDateTime.now()); - cloudRkService.save(cloudRk); + try { + cloudRkService.save(cloudRk); + } catch (Exception e) { + holder.error("保存 CloudRk 错误: " + cloudRk.getDescription()); + } } } private void syncFromYongYouU8(MessageHolder holder) { holder.debug("2. 从 " + CloudServiceConstant.U8_NAME + " 更新..."); try { - yongYouU8Service = getBean(YongYouU8Service.class); + yongYouU8Service = getCachedBean(YongYouU8Service.class); } catch (BeansException e) { holder.warn("未启用 " + CloudServiceConstant.U8_NAME + " 服务"); return; @@ -150,7 +154,7 @@ public class CompanyCompositeUpdateTasker extends Tasker implements WebS private void syncFromCloudTyc(MessageHolder holder) { holder.debug("3. 从 " + CloudServiceConstant.TYC_NAME + " 更新..."); try { - cloudTycService = getBean(CloudTycService.class); + cloudTycService = getCachedBean(CloudTycService.class); } catch (BeansException e) { holder.warn("未启用 " + CloudServiceConstant.TYC_NAME + " 服务"); return; diff --git a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerEvaluationFormUpdateTask.java b/server/src/main/java/com/ecep/contract/service/tasker/CompanyCustomerEvaluationFormUpdateTask.java similarity index 100% rename from server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerEvaluationFormUpdateTask.java rename to server/src/main/java/com/ecep/contract/service/tasker/CompanyCustomerEvaluationFormUpdateTask.java diff --git a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerNextSignDateTask.java b/server/src/main/java/com/ecep/contract/service/tasker/CompanyCustomerNextSignDateTask.java similarity index 100% rename from server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerNextSignDateTask.java rename to server/src/main/java/com/ecep/contract/service/tasker/CompanyCustomerNextSignDateTask.java diff --git a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerRebuildFilesTasker.java b/server/src/main/java/com/ecep/contract/service/tasker/CompanyCustomerRebuildFilesTasker.java similarity index 100% rename from server/src/main/java/com/ecep/contract/ds/customer/tasker/CompanyCustomerRebuildFilesTasker.java rename to server/src/main/java/com/ecep/contract/service/tasker/CompanyCustomerRebuildFilesTasker.java diff --git a/server/src/main/java/com/ecep/contract/ds/company/tasker/CompanyVerifyTasker.java b/server/src/main/java/com/ecep/contract/service/tasker/CompanyVerifyTasker.java similarity index 94% rename from server/src/main/java/com/ecep/contract/ds/company/tasker/CompanyVerifyTasker.java rename to server/src/main/java/com/ecep/contract/service/tasker/CompanyVerifyTasker.java index b5311c1..1b83d48 100644 --- a/server/src/main/java/com/ecep/contract/ds/company/tasker/CompanyVerifyTasker.java +++ b/server/src/main/java/com/ecep/contract/service/tasker/CompanyVerifyTasker.java @@ -29,19 +29,12 @@ public class CompanyVerifyTasker extends Tasker implements WebSocketServ ContractVerifyComm comm = new ContractVerifyComm(); - public CompanyService getCompanyService() { - return getCachedBean(CompanyService.class); - } - public ContractService getContractService() { return getCachedBean(ContractService.class); } /** * 核验公司名下的所有合同 - * - * @param company 公司 - * @param holder 输出 */ @Override public void init(JsonNode argsNode) { diff --git a/server/src/main/java/com/ecep/contract/ds/contract/tasker/ContractRepairTask.java b/server/src/main/java/com/ecep/contract/service/tasker/ContractRepairTasker.java similarity index 100% rename from server/src/main/java/com/ecep/contract/ds/contract/tasker/ContractRepairTask.java rename to server/src/main/java/com/ecep/contract/service/tasker/ContractRepairTasker.java diff --git a/server/src/main/java/com/ecep/contract/ds/contract/tasker/ContractVerifyTasker.java b/server/src/main/java/com/ecep/contract/service/tasker/ContractVerifyTasker.java similarity index 100% rename from server/src/main/java/com/ecep/contract/ds/contract/tasker/ContractVerifyTasker.java rename to server/src/main/java/com/ecep/contract/service/tasker/ContractVerifyTasker.java diff --git a/server/src/main/java/com/ecep/contract/ds/customer/tasker/CustomerFileMoveTasker.java b/server/src/main/java/com/ecep/contract/service/tasker/CustomerFileMoveTasker.java similarity index 100% rename from server/src/main/java/com/ecep/contract/ds/customer/tasker/CustomerFileMoveTasker.java rename to server/src/main/java/com/ecep/contract/service/tasker/CustomerFileMoveTasker.java diff --git a/server/src/main/java/com/ecep/contract/ds/project/ProjectCostImportItemsFromContractsTasker.java b/server/src/main/java/com/ecep/contract/service/tasker/ProjectCostImportItemsFromContractsTasker.java similarity index 97% rename from server/src/main/java/com/ecep/contract/ds/project/ProjectCostImportItemsFromContractsTasker.java rename to server/src/main/java/com/ecep/contract/service/tasker/ProjectCostImportItemsFromContractsTasker.java index ee153d9..b940312 100644 --- a/server/src/main/java/com/ecep/contract/ds/project/ProjectCostImportItemsFromContractsTasker.java +++ b/server/src/main/java/com/ecep/contract/service/tasker/ProjectCostImportItemsFromContractsTasker.java @@ -1,7 +1,5 @@ package com.ecep.contract.ds.project; -import static com.ecep.contract.SpringApp.getBean; - import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; @@ -10,12 +8,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; -import java.util.logging.Level; import java.util.stream.Collectors; -import com.ecep.contract.service.WebSocketServerTasker; -import com.ecep.contract.ui.MessageHolderImpl; -import com.fasterxml.jackson.databind.JsonNode; import org.hibernate.Hibernate; import com.ecep.contract.MessageHolder; @@ -24,6 +18,7 @@ import com.ecep.contract.ds.contract.service.ContractService; import com.ecep.contract.ds.other.service.InventoryService; import com.ecep.contract.ds.project.service.ProjectCostItemService; import com.ecep.contract.ds.project.service.ProjectCostService; +import com.ecep.contract.handler.SessionInfo; import com.ecep.contract.model.Contract; import com.ecep.contract.model.ContractItem; import com.ecep.contract.model.Employee; @@ -31,9 +26,11 @@ import com.ecep.contract.model.Inventory; import com.ecep.contract.model.Project; import com.ecep.contract.model.ProjectCost; import com.ecep.contract.model.ProjectCostItem; +import com.ecep.contract.service.WebSocketServerTasker; import com.ecep.contract.ui.Tasker; import com.ecep.contract.util.NumberUtils; import com.ecep.contract.util.TaxRateUtils; +import com.fasterxml.jackson.databind.JsonNode; import lombok.Setter; @@ -45,15 +42,24 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker im AtomicBoolean verified = new AtomicBoolean(true); @Override - public void init(JsonNode argsNode) { - int contractId = argsNode.get(0).asInt(); - cost = getCachedBean(ProjectCostService.class).findById(contractId); + public void setSession(SessionInfo session) { + currentUser = () -> { + return getEmployeeService().findById(session.getEmployeeId()); + }; } @Override - protected Void execute(MessageHolder holder) throws Exception { - importFromContracts(cost, holder); - return null; + public Employee getCurrentUser() { + if (currentUser == null) { + return null; + } + return currentUser.get(); + } + + @Override + public void init(JsonNode argsNode) { + int contractId = argsNode.get(0).asInt(); + cost = getCachedBean(ProjectCostService.class).findById(contractId); } public void importFromContracts(ProjectCost projectCost, MessageHolder holder) { @@ -134,6 +140,20 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker im - projectCost.getTaxAndSurchargesFee()) * projectCost.getTaxAndSurcharges() / 100); } + public ProjectCostItemService getItemService() { + return getCachedBean(ProjectCostItemService.class); + } + + public ProjectCostItem save(ProjectCostItem entity) { + return getItemService().save(entity); + } + + @Override + protected Void execute(MessageHolder holder) throws Exception { + importFromContracts(cost, holder); + return null; + } + String toKey(ContractItem item) { return item.getTitle() + "_" + item.getSpecification(); } @@ -142,8 +162,24 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker im return item.getTitle() + "_" + item.getSpecification(); } + ProjectCostService getCostService() { + return getCachedBean(ProjectCostService.class); + } + + ContractService getContractService() { + return getCachedBean(ContractService.class); + } + + ContractItemService getContractItemService() { + return getCachedBean(ContractItemService.class); + } + + InventoryService getInventoryService() { + return getCachedBean(InventoryService.class); + } + private void compositeOut(Map> map, List results, - MessageHolder holder) { + MessageHolder holder) { // 根据存货匹配,可对多个相同的存货进行合并 for (Map.Entry> entry : map.entrySet()) { Inventory inventory = Hibernate.isInitialized(entry.getKey()) ? entry.getKey() @@ -336,29 +372,4 @@ public class ProjectCostImportItemsFromContractsTasker extends Tasker im save(projectCostItem); } - ProjectCostService getCostService() { - return getCachedBean(ProjectCostService.class); - } - - public ProjectCostItemService getItemService() { - return getCachedBean(ProjectCostItemService.class); - } - - ContractService getContractService() { - return getCachedBean(ContractService.class); - } - - ContractItemService getContractItemService() { - return getCachedBean(ContractItemService.class); - } - - InventoryService getInventoryService() { - return getCachedBean(InventoryService.class); - } - - public ProjectCostItem save(ProjectCostItem entity) { - return getItemService().save(entity); - } - - } diff --git a/server/src/main/java/com/ecep/contract/service/WebSocketServerTaskManager.java b/server/src/main/java/com/ecep/contract/service/tasker/WebSocketServerTaskManager.java similarity index 73% rename from server/src/main/java/com/ecep/contract/service/WebSocketServerTaskManager.java rename to server/src/main/java/com/ecep/contract/service/tasker/WebSocketServerTaskManager.java index d7d690f..d23408d 100644 --- a/server/src/main/java/com/ecep/contract/service/WebSocketServerTaskManager.java +++ b/server/src/main/java/com/ecep/contract/service/tasker/WebSocketServerTaskManager.java @@ -2,6 +2,7 @@ package com.ecep.contract.service; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.concurrent.Callable; @@ -14,11 +15,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.stereotype.Service; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; import com.ecep.contract.Message; import com.ecep.contract.constant.WebSocketConstant; +import com.ecep.contract.handler.SessionInfo; import com.ecep.contract.model.Voable; import com.ecep.contract.ui.Tasker; import com.fasterxml.jackson.databind.JsonNode; @@ -58,18 +58,26 @@ public class WebSocketServerTaskManager implements InitializingBean { } } - public void onMessage(WebSocketSession session, JsonNode jsonNode) { + public void onMessage(SessionInfo session, JsonNode jsonNode) { // 处理 sessionId 的消息 String sessionId = jsonNode.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText(); try { handleAsSessionCallback(session, sessionId, jsonNode); } catch (Exception e) { - sendError(session, sessionId, e.getMessage()); + sendError(session, sessionId, WebSocketConstant.ERROR_CODE_INTERNAL_SERVER_ERROR, e.getMessage()); logger.warn("处理会话回调失败 (会话ID: {}): {}", sessionId, e.getMessage(), e); } } - private void handleAsSessionCallback(WebSocketSession session, String sessionId, JsonNode jsonNode) { + private void sendError(SessionInfo session, String sessionId, int errorCode, String message) { + try { + session.sendError(WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, errorCode, message); + } catch (IOException e) { + logger.warn("发送错误消息失败 (消息ID: {}): {}", sessionId, e.getMessage(), e); + } + } + + private void handleAsSessionCallback(SessionInfo session, String sessionId, JsonNode jsonNode) { if (!jsonNode.has("type")) { throw new IllegalArgumentException("缺失 type 参数"); } @@ -79,7 +87,7 @@ public class WebSocketServerTaskManager implements InitializingBean { } } - private void createTask(WebSocketSession session, String sessionId, JsonNode jsonNode) { + private void createTask(SessionInfo session, String sessionId, JsonNode jsonNode) { if (!jsonNode.has(WebSocketConstant.ARGUMENTS_FIELD_NAME)) { throw new IllegalArgumentException("缺失 " + WebSocketConstant.ARGUMENTS_FIELD_NAME + " 参数"); } @@ -106,6 +114,7 @@ public class WebSocketServerTaskManager implements InitializingBean { } if (tasker instanceof WebSocketServerTasker t) { + t.setSession(session); t.setTitleHandler(title -> sendToSession(session, sessionId, "title", title)); t.setMessageHandler(msg -> sendMessageToSession(session, sessionId, msg)); t.setPropertyHandler((name, value) -> { @@ -125,7 +134,6 @@ public class WebSocketServerTaskManager implements InitializingBean { callable.call(); sendToSession(session, sessionId, "done"); } catch (Exception e) { - throw new RuntimeException(e); } }); @@ -133,45 +141,36 @@ public class WebSocketServerTaskManager implements InitializingBean { } - private boolean sendMessageToSession(WebSocketSession session, String sessionId, Message msg) { + private boolean sendMessageToSession(SessionInfo session, String sessionId, Message msg) { return sendToSession(session, sessionId, "message", msg.getLevel().getName(), msg.getMessage()); } - private boolean sendToSession(WebSocketSession session, String sessionId, String type, Object... args) { + private boolean sendToSession(SessionInfo session, String sessionId, String type, Object... args) { try { - String text = objectMapper.writeValueAsString(Map.of( + session.send(Map.of( WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, "type", type, WebSocketConstant.ARGUMENTS_FIELD_NAME, args)); - session.sendMessage(new TextMessage(text)); + return true; } catch (IOException e) { - // 捕获所有可能的异常,防止影响主流程 - logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + logger.warn("发送消息失败 (消息ID: {}): {}", sessionId, e.getMessage(), e); + return false; } - return true; } - private void sendError(WebSocketSession session, String sessionId, String message) { - if (session == null || !session.isOpen()) { - logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message); - return; + private void send(SessionInfo session, String sessionId, Object data) { + Map map = new HashMap<>(); + if (data instanceof Voable) { + map.put("data", ((Voable) data).toVo()); + } else { + map.put("data", data); } - + map.put(WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId); + map.put(WebSocketConstant.SUCCESS_FIELD_NAME, true); try { - String errorMessage = objectMapper.writeValueAsString(Map.of( - WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, - WebSocketConstant.SUCCESS_FIELD_VALUE, false, - WebSocketConstant.MESSAGE_FIELD_NAME, message)); - - // 检查会话状态并尝试发送错误消息 - if (session.isOpen()) { - session.sendMessage(new TextMessage(errorMessage)); - } else { - logger.warn("会话已关闭,无法发送错误消息: {}", message); - } - } catch (Exception e) { - // 捕获所有可能的异常,防止影响主流程 - logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + session.send(map); + } catch (IOException e) { + logger.warn("发送消息失败 (消息ID: {}): {}", sessionId, e.getMessage(), e); } } } diff --git a/server/src/main/java/com/ecep/contract/service/WebSocketServerTasker.java b/server/src/main/java/com/ecep/contract/service/tasker/WebSocketServerTasker.java similarity index 86% rename from server/src/main/java/com/ecep/contract/service/WebSocketServerTasker.java rename to server/src/main/java/com/ecep/contract/service/tasker/WebSocketServerTasker.java index c3752ec..b8716db 100644 --- a/server/src/main/java/com/ecep/contract/service/WebSocketServerTasker.java +++ b/server/src/main/java/com/ecep/contract/service/tasker/WebSocketServerTasker.java @@ -1,13 +1,19 @@ package com.ecep.contract.service; -import com.ecep.contract.Message; -import com.fasterxml.jackson.databind.JsonNode; - import java.util.concurrent.Callable; import java.util.function.BiConsumer; import java.util.function.Predicate; +import com.ecep.contract.Message; +import com.ecep.contract.handler.SessionInfo; +import com.fasterxml.jackson.databind.JsonNode; + public interface WebSocketServerTasker extends Callable { + void init(JsonNode argsNode); + + default void setSession(SessionInfo session) { + } + /** * 设置消息处理函数 */ @@ -17,7 +23,6 @@ public interface WebSocketServerTasker extends Callable { void setPropertyHandler(BiConsumer propertyHandler); - void init(JsonNode argsNode); - void setProgressHandler(BiConsumer progressHandler); + }