From 30deb0a280f968ff4a98f41a6271e8c222ba29fa Mon Sep 17 00:00:00 2001 From: songqq Date: Wed, 17 Sep 2025 11:44:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0WebSocket=E9=80=9A?= =?UTF-8?q?=E4=BF=A1=E6=A1=86=E6=9E=B6=E5=8F=8A=E4=BB=BB=E5=8A=A1=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增WebSocket客户端和服务端通信框架,包括会话管理、心跳检测和自动重连机制 添加任务管理器用于处理WebSocket任务创建和执行 实现消息回调处理和错误处理机制 重构销售类型服务并添加缓存支持 移除旧的销售类型服务实现 --- ...rvice.java => WebSocketClientService.java} | 85 ++++-- .../ecep/contract/WebSocketClientSession.java | 56 ++++ .../ecep/contract/WebSocketClientTasker.java | 4 + .../CompanyCustomerFileTypeService.java | 4 + .../CompanyInvoiceInfoService.java | 6 +- .../service/CompanyVendorFileTypeService.java | 4 + .../ecep/contract/service/HolidayService.java | 4 + .../contract/service/SaleTypeService.java | 16 - ...CompanyCustomerFileTypeLocalViewModel.java | 4 + .../CompanyVendorFileTypeLocalViewModel.java | 4 + .../com/ecep/contract/vm/EnumViewModel.java | 4 + .../contract/constant/WebSocketConstant.java | 4 + .../contract/model/SalesOrderInvoice.java | 4 + .../java/com/ecep/contract/EntityService.java | 4 + ...rvice.java => ProjectSaleTypeService.java} | 17 +- ...ndler.java => WebSocketServerHandler.java} | 286 ++++++++++-------- .../service/WebSocketServerTaskManager.java | 130 ++++++++ .../service/WebSocketServerTasker.java | 15 + .../ecep/contract/ui/MessageHolderImpl.java | 4 + 19 files changed, 495 insertions(+), 160 deletions(-) rename client/src/main/java/com/ecep/contract/{WebSocketService.java => WebSocketClientService.java} (75%) create mode 100644 client/src/main/java/com/ecep/contract/WebSocketClientSession.java create mode 100644 client/src/main/java/com/ecep/contract/WebSocketClientTasker.java create mode 100644 client/src/main/java/com/ecep/contract/service/CompanyCustomerFileTypeService.java rename client/src/main/java/com/ecep/contract/{controller/project => service}/CompanyInvoiceInfoService.java (66%) create mode 100644 client/src/main/java/com/ecep/contract/service/CompanyVendorFileTypeService.java create mode 100644 client/src/main/java/com/ecep/contract/service/HolidayService.java delete mode 100644 client/src/main/java/com/ecep/contract/service/SaleTypeService.java create mode 100644 client/src/main/java/com/ecep/contract/vm/CompanyCustomerFileTypeLocalViewModel.java create mode 100644 client/src/main/java/com/ecep/contract/vm/CompanyVendorFileTypeLocalViewModel.java create mode 100644 client/src/main/java/com/ecep/contract/vm/EnumViewModel.java create mode 100644 common/src/main/java/com/ecep/contract/constant/WebSocketConstant.java create mode 100644 common/src/main/java/com/ecep/contract/model/SalesOrderInvoice.java create mode 100644 server/src/main/java/com/ecep/contract/EntityService.java rename server/src/main/java/com/ecep/contract/ds/project/service/{SaleTypeService.java => ProjectSaleTypeService.java} (80%) rename server/src/main/java/com/ecep/contract/handler/{WebSocketHandler.java => WebSocketServerHandler.java} (76%) create mode 100644 server/src/main/java/com/ecep/contract/service/WebSocketServerTaskManager.java create mode 100644 server/src/main/java/com/ecep/contract/service/WebSocketServerTasker.java create mode 100644 server/src/main/java/com/ecep/contract/ui/MessageHolderImpl.java diff --git a/client/src/main/java/com/ecep/contract/WebSocketService.java b/client/src/main/java/com/ecep/contract/WebSocketClientService.java similarity index 75% rename from client/src/main/java/com/ecep/contract/WebSocketService.java rename to client/src/main/java/com/ecep/contract/WebSocketClientService.java index f12cb45..6a316ed 100644 --- a/client/src/main/java/com/ecep/contract/WebSocketService.java +++ b/client/src/main/java/com/ecep/contract/WebSocketClientService.java @@ -1,6 +1,8 @@ package com.ecep.contract; +import com.ecep.contract.constant.WebSocketConstant; import com.ecep.contract.msg.SimpleMessage; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import javafx.application.Platform; @@ -24,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * WebSocket消息服务 @@ -49,6 +52,9 @@ public class WebSocketService { private SimpleBooleanProperty online = new SimpleBooleanProperty(false); private SimpleStringProperty message = new SimpleStringProperty(""); + // 存储所有活跃的WebSocket会话 + private final Map sessions = Collections.synchronizedMap(new HashMap<>()); + // 存储所有活跃的WebSocket会话 private final Map> callbacks = Collections.synchronizedMap(new HashMap<>()); @@ -75,29 +81,23 @@ public class WebSocketService { try { JsonNode node = objectMapper.readTree(text); - if (node.has("messageId")) { - String messageId = node.get("messageId").asText(); + if (node.has(WebSocketConstant.MESSAGE_ID_FIELD_NAME)) { + String messageId = node.get(WebSocketConstant.MESSAGE_ID_FIELD_NAME).asText(); CompletableFuture future = callbacks.remove(messageId); if (future != null) { - if (node.has("success")) { - if (!node.get("success").asBoolean()) { - future.completeExceptionally( - new RuntimeException("请求失败:来自服务器的消息=" + node.get("message").asText())); - return; - } - } - // 使用具体类型后,这里不会再出现类型不匹配的错误 - if (node.has("data")) { - future.complete(node.get("data")); - } else { - future.complete(node); - } + onCallbackMessage(future, node); } else { logger.error("未找到对应的回调future: {}", messageId); } - } else if (node.has("errorCode")) { - int errorCode = node.get("errorCode").asInt(); - String errorMsg = node.get("message").asText(); + } else if (node.has(WebSocketConstant.SESSION_ID_FIELD_NAME)) { + String sessionId = node.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText(); + WebSocketClientSession session = sessions.get(sessionId); + if (session != null) { + session.onMessage(node); + } + } else if (node.has(WebSocketConstant.ERROR_CODE_FIELD_NAME)) { + int errorCode = node.get(WebSocketConstant.ERROR_CODE_FIELD_NAME).asInt(); + String errorMsg = node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText(); // TODO 需要重新登录 logger.error("收到错误消息: 错误码={}, 错误信息={}", errorCode, errorMsg); } @@ -139,6 +139,22 @@ public class WebSocketService { } }; + private void onCallbackMessage(CompletableFuture future, JsonNode node) { + if (node.has(WebSocketConstant.SUCCESS_FIELD_VALUE)) { + if (!node.get(WebSocketConstant.SUCCESS_FIELD_VALUE).asBoolean()) { + future.completeExceptionally( + new RuntimeException("请求失败:来自服务器的消息=" + node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText())); + return; + } + } + // 使用具体类型后,这里不会再出现类型不匹配的错误 + if (node.has("data")) { + future.complete(node.get("data")); + } else { + future.complete(node); + } + } + public void send(String string) { if (webSocket != null && webSocket.send(string)) { logger.debug("send message success:{}", string); @@ -147,6 +163,10 @@ public class WebSocketService { } } + public void send(Object message) throws JsonProcessingException { + send(objectMapper.writeValueAsString(message)); + } + public CompletableFuture send(SimpleMessage msg) { CompletableFuture future = new CompletableFuture<>(); try { @@ -168,6 +188,14 @@ public class WebSocketService { return future; } + public CompletableFuture invoke(String service, String method, Object... params) { + SimpleMessage msg = new SimpleMessage(); + msg.setService(service); + msg.setMethod(method); + msg.setArguments(params); + return send(msg).orTimeout(getReadTimeout(), TimeUnit.MILLISECONDS); + } + public void initWebSocket() { isActive = true; OkHttpClient httpClient = Desktop.instance.getHttpClient(); @@ -278,4 +306,25 @@ public class WebSocketService { return online; } + public void withSession(Consumer sessionConsumer) { + WebSocketClientSession session = createSession(); + try { + sessionConsumer.accept(session); + } finally { + // closeSession(session); + } + } + + private void closeSession(WebSocketClientSession session) { + if (session != null) { + sessions.remove(session.getSessionId()); + session.close(); + } + } + + private WebSocketClientSession createSession() { + WebSocketClientSession session = new WebSocketClientSession(this); + sessions.put(session.getSessionId(), session); + return session; + } } diff --git a/client/src/main/java/com/ecep/contract/WebSocketClientSession.java b/client/src/main/java/com/ecep/contract/WebSocketClientSession.java new file mode 100644 index 0000000..345e180 --- /dev/null +++ b/client/src/main/java/com/ecep/contract/WebSocketClientSession.java @@ -0,0 +1,56 @@ +package com.ecep.contract; + +import com.ecep.contract.constant.WebSocketConstant; +import com.ecep.contract.task.Tasker; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Getter; +import lombok.Setter; + +import java.util.Map; +import java.util.UUID; + +public class WebSocketSession { + @Getter + private String sessionId = UUID.randomUUID().toString(); + + private WebSocketClientTasker tasker; + + private final WebSocketService webSocketService; + + public WebSocketSession(WebSocketService webSocketService) { + this.webSocketService = webSocketService; + } + + public void close() { + } + + public void submitTask(WebSocketClientTasker tasker, Object... args) throws JsonProcessingException { + Map argments = Map.of( + WebSocketConstant.SESSION_ID_FIELD_NAME, getSessionId(), + "type", "createTask", + "taskName", tasker.getTaskName(), + "args", args); + webSocketService.send(argments); + } + + public void onMessage(JsonNode node) { + if (node.has("type")) { + String type = node.get("type").asText(); + if (type.equals("message")) { + JsonNode args = node.get("args"); + String message = args.get(1).asText(); + String level = args.get(0).asText(); + if (tasker instanceof Tasker t) { + t.updateMessage(java.util.logging.Level.parse(level), message); + } + } else if (type.equals("title")) { + JsonNode args = node.get("args"); + String message = args.get(0).asText(); + if (tasker instanceof Tasker t) { + t.updateTitle(message); + } + } + } + } +} diff --git a/client/src/main/java/com/ecep/contract/WebSocketClientTasker.java b/client/src/main/java/com/ecep/contract/WebSocketClientTasker.java new file mode 100644 index 0000000..31be58f --- /dev/null +++ b/client/src/main/java/com/ecep/contract/WebSocketClientTasker.java @@ -0,0 +1,4 @@ +package com.ecep.contract; + +public interface WebSocketClientTasker { +} diff --git a/client/src/main/java/com/ecep/contract/service/CompanyCustomerFileTypeService.java b/client/src/main/java/com/ecep/contract/service/CompanyCustomerFileTypeService.java new file mode 100644 index 0000000..35e264d --- /dev/null +++ b/client/src/main/java/com/ecep/contract/service/CompanyCustomerFileTypeService.java @@ -0,0 +1,4 @@ +package com.ecep.contract.service; + +public class CompanyCustomerFileTypeService { +} diff --git a/client/src/main/java/com/ecep/contract/controller/project/CompanyInvoiceInfoService.java b/client/src/main/java/com/ecep/contract/service/CompanyInvoiceInfoService.java similarity index 66% rename from client/src/main/java/com/ecep/contract/controller/project/CompanyInvoiceInfoService.java rename to client/src/main/java/com/ecep/contract/service/CompanyInvoiceInfoService.java index 3db57ab..035c239 100644 --- a/client/src/main/java/com/ecep/contract/controller/project/CompanyInvoiceInfoService.java +++ b/client/src/main/java/com/ecep/contract/service/CompanyInvoiceInfoService.java @@ -4,10 +4,14 @@ import java.util.List; import com.ecep.contract.model.Company; import com.ecep.contract.model.CompanyInvoiceInfo; +import com.ecep.contract.service.QueryService; import com.ecep.contract.service.ViewModelService; import com.ecep.contract.vm.CompanyInvoiceInfoViewModel; +import org.springframework.stereotype.Service; -public class CompanyInvoiceInfoService implements ViewModelService { +@Service + +public class CompanyInvoiceInfoService extends QueryService { public List searchByCompany(Company company, String searchText) { throw new UnsupportedOperationException("未实现"); diff --git a/client/src/main/java/com/ecep/contract/service/CompanyVendorFileTypeService.java b/client/src/main/java/com/ecep/contract/service/CompanyVendorFileTypeService.java new file mode 100644 index 0000000..c02b0cf --- /dev/null +++ b/client/src/main/java/com/ecep/contract/service/CompanyVendorFileTypeService.java @@ -0,0 +1,4 @@ +package com.ecep.contract.service; + +public class CompanyVendorFileTypeService { +} diff --git a/client/src/main/java/com/ecep/contract/service/HolidayService.java b/client/src/main/java/com/ecep/contract/service/HolidayService.java new file mode 100644 index 0000000..b935495 --- /dev/null +++ b/client/src/main/java/com/ecep/contract/service/HolidayService.java @@ -0,0 +1,4 @@ +package com.ecep.contract.service; + +public class HolidayService { +} diff --git a/client/src/main/java/com/ecep/contract/service/SaleTypeService.java b/client/src/main/java/com/ecep/contract/service/SaleTypeService.java deleted file mode 100644 index faa5041..0000000 --- a/client/src/main/java/com/ecep/contract/service/SaleTypeService.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.ecep.contract.service; - -import org.springframework.stereotype.Service; - -import com.ecep.contract.model.ProjectSaleType; -import com.ecep.contract.vm.ProjectSaleTypeViewModel; - -@Service -public class SaleTypeService extends QueryService { - - public ProjectSaleType findByName(String name) { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'findByName'"); - } - -} diff --git a/client/src/main/java/com/ecep/contract/vm/CompanyCustomerFileTypeLocalViewModel.java b/client/src/main/java/com/ecep/contract/vm/CompanyCustomerFileTypeLocalViewModel.java new file mode 100644 index 0000000..f3eb65a --- /dev/null +++ b/client/src/main/java/com/ecep/contract/vm/CompanyCustomerFileTypeLocalViewModel.java @@ -0,0 +1,4 @@ +package com.ecep.contract.vm; + +public class CompanyCustomerFileTypeLocalViewModel { +} diff --git a/client/src/main/java/com/ecep/contract/vm/CompanyVendorFileTypeLocalViewModel.java b/client/src/main/java/com/ecep/contract/vm/CompanyVendorFileTypeLocalViewModel.java new file mode 100644 index 0000000..a1164b1 --- /dev/null +++ b/client/src/main/java/com/ecep/contract/vm/CompanyVendorFileTypeLocalViewModel.java @@ -0,0 +1,4 @@ +package com.ecep.contract.vm; + +public class CompanyVendorFileTypeLocalViewModel { +} diff --git a/client/src/main/java/com/ecep/contract/vm/EnumViewModel.java b/client/src/main/java/com/ecep/contract/vm/EnumViewModel.java new file mode 100644 index 0000000..b42376a --- /dev/null +++ b/client/src/main/java/com/ecep/contract/vm/EnumViewModel.java @@ -0,0 +1,4 @@ +package com.ecep.contract.vm; + +public class EnumViewModel { +} diff --git a/common/src/main/java/com/ecep/contract/constant/WebSocketConstant.java b/common/src/main/java/com/ecep/contract/constant/WebSocketConstant.java new file mode 100644 index 0000000..f5dbf82 --- /dev/null +++ b/common/src/main/java/com/ecep/contract/constant/WebSocketConstant.java @@ -0,0 +1,4 @@ +package com.ecep.contract.constant; + +public class WebSocketConstant { +} diff --git a/common/src/main/java/com/ecep/contract/model/SalesOrderInvoice.java b/common/src/main/java/com/ecep/contract/model/SalesOrderInvoice.java new file mode 100644 index 0000000..1956b94 --- /dev/null +++ b/common/src/main/java/com/ecep/contract/model/SalesOrderInvoice.java @@ -0,0 +1,4 @@ +package com.ecep.contract.model; + +public class SalesOrderInvoice { +} diff --git a/server/src/main/java/com/ecep/contract/EntityService.java b/server/src/main/java/com/ecep/contract/EntityService.java new file mode 100644 index 0000000..be78487 --- /dev/null +++ b/server/src/main/java/com/ecep/contract/EntityService.java @@ -0,0 +1,4 @@ +package com.ecep.contract; + +public class EntityService { +} diff --git a/server/src/main/java/com/ecep/contract/ds/project/service/SaleTypeService.java b/server/src/main/java/com/ecep/contract/ds/project/service/ProjectSaleTypeService.java similarity index 80% rename from server/src/main/java/com/ecep/contract/ds/project/service/SaleTypeService.java rename to server/src/main/java/com/ecep/contract/ds/project/service/ProjectSaleTypeService.java index 78c7088..951a0a6 100644 --- a/server/src/main/java/com/ecep/contract/ds/project/service/SaleTypeService.java +++ b/server/src/main/java/com/ecep/contract/ds/project/service/ProjectSaleTypeService.java @@ -2,6 +2,10 @@ package com.ecep.contract.ds.project.service; import java.util.List; +import com.ecep.contract.QueryService; +import com.ecep.contract.model.CompanyBankAccount; +import com.ecep.contract.util.SpecificationUtils; +import com.fasterxml.jackson.databind.JsonNode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheEvict; @@ -21,7 +25,7 @@ import com.ecep.contract.model.ProjectSaleType; @Lazy @Service @CacheConfig(cacheNames = "sale-type") -public class SaleTypeService implements IEntityService { +public class SaleTypeService implements IEntityService, QueryService { @Lazy @Autowired private ProjectSaleTypeRepository saleTypeRepository; @@ -49,6 +53,17 @@ public class SaleTypeService implements IEntityService { return saleTypeRepository.findAll(spec, pageable); } + @Override + public Page findAll(JsonNode paramsNode, Pageable pageable) { + Specification spec = null; + if (paramsNode.has("searchText")) { + spec = getSpecification(paramsNode.get("searchText").asText()); + } + // field + spec = SpecificationUtils.andFieldEqualParam(spec, paramsNode, "active"); + return findAll(spec, pageable); + } + @Override public Specification getSpecification(String searchText) { if (!StringUtils.hasText(searchText)) { diff --git a/server/src/main/java/com/ecep/contract/handler/WebSocketHandler.java b/server/src/main/java/com/ecep/contract/handler/WebSocketServerHandler.java similarity index 76% rename from server/src/main/java/com/ecep/contract/handler/WebSocketHandler.java rename to server/src/main/java/com/ecep/contract/handler/WebSocketServerHandler.java index a9ad17a..a14c918 100644 --- a/server/src/main/java/com/ecep/contract/handler/WebSocketHandler.java +++ b/server/src/main/java/com/ecep/contract/handler/WebSocketServerHandler.java @@ -1,6 +1,29 @@ package com.ecep.contract.handler; +import com.ecep.contract.*; +import com.ecep.contract.constant.WebSocketConstant; +import com.ecep.contract.ds.other.service.EmployeeLoginHistoryService; +import com.ecep.contract.ds.other.service.EmployeeService; +import com.ecep.contract.model.Employee; +import com.ecep.contract.model.EmployeeLoginHistory; +import com.ecep.contract.service.WebSocketServerTaskManager; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Page; +import org.springframework.security.authentication.AuthenticationManager; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.*; +import org.springframework.web.socket.handler.TextWebSocketHandler; + import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @@ -14,37 +37,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.domain.Page; -import org.springframework.security.authentication.AuthenticationManager; -import org.springframework.stereotype.Component; -import org.springframework.web.socket.BinaryMessage; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.PingMessage; -import org.springframework.web.socket.PongMessage; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; -import org.springframework.web.socket.handler.TextWebSocketHandler; - -import com.ecep.contract.IEntityService; -import com.ecep.contract.PageArgument; -import com.ecep.contract.PageContent; -import com.ecep.contract.QueryService; -import com.ecep.contract.SpringApp; -import com.ecep.contract.ds.other.service.EmployeeLoginHistoryService; -import com.ecep.contract.ds.other.service.EmployeeService; -import com.ecep.contract.model.Employee; -import com.ecep.contract.model.EmployeeLoginHistory; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import lombok.Data; - /** * WebSocket处理器 * 处理与客户端的WebSocket连接、消息传递和断开连接 @@ -64,6 +56,8 @@ public class WebSocketHandler extends TextWebSocketHandler { private EmployeeService employeeService; @Autowired private ScheduledExecutorService scheduledExecutorService; + @Autowired + private WebSocketServerTaskManager taskManager; // 存储所有活跃的WebSocket会话 private final Map activeSessions = Collections.synchronizedMap(new HashMap<>()); @@ -101,21 +95,18 @@ public class WebSocketHandler extends TextWebSocketHandler { sessionInfo.setEmployeeId((Integer) session.getAttributes().get("employeeId")); if (sessionInfo.getEmployeeId() == null) { - logger.error("会话未绑定用户: " + session.getId()); + logger.error("会话未绑定用户: {}", session.getId()); sendError(session, 401, "会话未绑定用户"); session.close(); return; } activeSessions.put(sessionInfo.getEmployeeId(), sessionInfo); - System.out.println(sessionInfo.getLoginHistoryId()); - System.out.println(sessionInfo.getEmployeeId()); - - logger.info("WebSocket连接已建立: " + session.getId()); + logger.info("WebSocket连接已建立: {}", session.getId()); Employee employee = employeeService.findById(sessionInfo.getEmployeeId()); if (employee == null) { - logger.error("未找到用户: #" + sessionInfo.getEmployeeId()); + logger.error("未找到用户: #{}", sessionInfo.getEmployeeId()); return; } @@ -123,30 +114,6 @@ public class WebSocketHandler extends TextWebSocketHandler { sessionInfo.setSchedule(schedule); } - private void sendError(WebSocketSession session, int errorCode, String message) { - if (session == null || !session.isOpen()) { - logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message); - return; - } - - try { - ObjectNode objectNode = objectMapper.createObjectNode(); - objectNode.put("errorCode", errorCode); - objectNode.put("success", false); - objectNode.put("message", message); - String errorMessage = objectMapper.writeValueAsString(objectNode); - - // 检查会话状态并尝试发送错误消息 - if (session.isOpen()) { - session.sendMessage(new TextMessage(errorMessage)); - } else { - logger.warn("会话已关闭,无法发送错误消息: {}", message); - } - } catch (Exception e) { - // 捕获所有可能的异常,防止影响主流程 - logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); - } - } /** * 接收文本消息时调用 @@ -154,8 +121,9 @@ public class WebSocketHandler extends TextWebSocketHandler { @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String payload = message.getPayload(); - logger.info("收到来自客户端的消息: " + payload + " (会话ID: " + session.getId() + ")"); - + if (logger.isInfoEnabled()) { + logger.info("收到来自客户端的消息: {} (会话ID: {})", payload, session.getId()); + } // 处理文本格式的ping消息 if ("ping".equals(payload)) { // 回复文本格式的pong消息 @@ -168,7 +136,8 @@ public class WebSocketHandler extends TextWebSocketHandler { return; } - if (handleAsMessageCallback(session, payload)) { + // 尝试将消息作为JSON处理 + if (handleAsJson(session, payload)) { return; } @@ -176,10 +145,10 @@ public class WebSocketHandler extends TextWebSocketHandler { logger.info("处理普通消息: " + payload); } - private boolean handleAsMessageCallback(WebSocketSession session, String payload) { - if (session == null || !session.isOpen()) { + private boolean handleAsJson(WebSocketSession session, String payload) { + if (!session.isOpen()) { logger.warn("尝试在已关闭的WebSocket会话上处理消息回调"); - return false; + return true; } JsonNode jsonNode = null; @@ -189,73 +158,98 @@ public class WebSocketHandler extends TextWebSocketHandler { logger.warn("解析消息回调JSON失败: {}", payload, e); return false; } - - if (!jsonNode.has("messageId")) { - // 没有 messageId 的消息不处理 - return false; - } - String messageId = jsonNode.get("messageId").asText(); - - if (!jsonNode.has("service")) { - sendError(session, messageId, "缺失 service 参数"); + if (jsonNode.has(WebSocketConstant.MESSAGE_ID_FIELD_NAME)) { + // 处理 messageId 的消息 + String messageId = jsonNode.get(WebSocketConstant.MESSAGE_ID_FIELD_NAME).asText(); + try { + handleAsMessageCallback(session, messageId, jsonNode); + } catch (Exception e) { + sendError(session, messageId, e.getMessage()); + logger.warn("处理消息回调失败 (消息ID: {}): {}", messageId, e.getMessage(), e); + } return true; } + if (jsonNode.has(WebSocketConstant.SESSION_ID_FIELD_NAME)) { + taskManager.onMessage(session, jsonNode); + return true; + } + return false; + } - String serviceName = jsonNode.get("service").asText(); + + private void handleAsMessageCallback(WebSocketSession session, String messageId, JsonNode jsonNode) throws Exception { + if (!jsonNode.has(WebSocketConstant.SERVICE_FIELD_NAME)) { + throw new IllegalArgumentException("缺失 service 参数"); + } + + String serviceName = jsonNode.get(WebSocketConstant.SERVICE_FIELD_NAME).asText(); Object service = null; try { service = SpringApp.getBean(serviceName); } catch (Exception e) { - sendError(session, messageId, "未找到服务: " + serviceName); - return true; + throw new IllegalArgumentException("未找到服务: " + serviceName); } - if (!jsonNode.has("method")) { - sendError(session, messageId, "缺失 method 参数"); - return true; + if (!jsonNode.has(WebSocketConstant.METHOD_FIELD_NAME)) { + throw new IllegalArgumentException("缺失 method 参数"); } - String methodName = jsonNode.get("method").asText(); + String methodName = jsonNode.get(WebSocketConstant.METHOD_FIELD_NAME).asText(); + JsonNode argumentsNode = jsonNode.get("arguments"); + Object result = null; + if (methodName.equals("findAll")) { + result = invokerFindAllMethod(service, argumentsNode); + } else if (methodName.equals("findById")) { + result = invokerFindByIdMethod(service, argumentsNode); + } else if (methodName.equals("save")) { + result = invokerSaveMethod(service, argumentsNode); + } else if (methodName.equals("delete")) { + result = invokerDeleteMethod(service, argumentsNode); + } else { + result = invokerOtherMethod(service, methodName, argumentsNode); + } + + String response = objectMapper.writeValueAsString(Map.of( + WebSocketConstant.MESSAGE_ID_FIELD_NAME, messageId, + WebSocketConstant.SUCCESS_FIELD_VALUE, true, + "data", result + )); + session.sendMessage(new TextMessage(response)); + + } + + private Object invokerOtherMethod(Object service, String methodName, JsonNode argumentsNode) + throws NoSuchMethodException, SecurityException, InvocationTargetException, IllegalAccessException, ClassNotFoundException, JsonProcessingException { + int size = argumentsNode.size(); + if (size == 0) { + Method method = service.getClass().getMethod(methodName); + return method.invoke(service); + } + + // 参数值 + JsonNode paramsNode = argumentsNode.get(0); + // 参数类型 + JsonNode typesNode = argumentsNode.get(1); + + Class[] parameterTypes = new Class[typesNode.size()]; + Object[] args = new Object[paramsNode.size()]; + for (int i = 0; i < typesNode.size(); i++) { + String type = typesNode.get(i).asText(); + parameterTypes[i] = Class.forName(type); + args[i] = objectMapper.treeToValue(paramsNode.get(i), parameterTypes[i]); + } + Class targetClass = getTargetClass(service.getClass()); try { - Object result = null; - if (methodName.equals("findAll")) { - result = invokerFindAllMethod(service, jsonNode.get("arguments")); - } else if (methodName.equals("findById")) { - result = invokerFindByIdMethod(service, jsonNode.get("arguments")); - } else if (methodName.equals("save")) { - result = invokerSaveMethod(service, jsonNode.get("arguments")); - } else if (methodName.equals("delete")) { - result = invokerDeleteMethod(service, jsonNode.get("arguments")); - } else { - sendError(session, messageId, "未实现的方法: " + methodName); - return true; - } - - // 再次检查会话状态 - if (!session.isOpen()) { - logger.warn("会话已关闭,无法发送处理结果 (消息ID: {})"); - return true; - } - - ObjectNode objectNode = objectMapper.createObjectNode(); - objectNode.put("messageId", messageId); - objectNode.set("data", objectMapper.valueToTree(result)); - String response = objectMapper.writeValueAsString(objectNode); - session.sendMessage(new TextMessage(response)); - return true; - } catch (Exception e) { - // 防止重复发送消息导致的TEXT_PARTIAL_WRITING异常 - if (!session.isOpen()) { - logger.warn("会话已关闭,无法发送错误消息 (消息ID: {})"); - } else { - sendError(session, messageId, e.getMessage()); - } - logger.error("处理消息回调失败 (消息ID: {})", messageId, e); - return true; + Method method = targetClass.getMethod(methodName, parameterTypes); + return method.invoke(service, args); + } catch (NoSuchMethodException e) { + logger.error("targetClass: {}, Methods:{}", targetClass, targetClass.getMethods()); + throw e; } } + private Object invokerDeleteMethod(Object service, JsonNode argumentsNode) { JsonNode paramsNode = argumentsNode.get(0); if (!paramsNode.has("id")) { @@ -445,6 +439,7 @@ public class WebSocketHandler extends TextWebSocketHandler { } try { + JsonNode typesNode = argumentsNode.get(1); if (paramsNode.isInt()) { Method method = service.getClass().getMethod("findById", Integer.class); return method.invoke(service, paramsNode.asInt()); @@ -469,17 +464,22 @@ public class WebSocketHandler extends TextWebSocketHandler { } private void sendError(WebSocketSession session, String messageId, String message) { + _sendError(session, WebSocketConstant.MESSAGE_ID_FIELD_NAME, messageId, message); + } + + + private void _sendError(WebSocketSession session, String fieldName, String messageId, String message) { if (session == null || !session.isOpen()) { logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message); return; } try { - ObjectNode objectNode = objectMapper.createObjectNode(); - objectNode.put("messageId", messageId); - objectNode.put("success", false); - objectNode.put("message", message); - String errorMessage = objectMapper.writeValueAsString(objectNode); + String errorMessage = objectMapper.writeValueAsString(Map.of( + fieldName, messageId, + WebSocketConstant.SUCCESS_FIELD_VALUE, false, + WebSocketConstant.MESSAGE_FIELD_NAME, message + )); // 检查会话状态并尝试发送错误消息 if (session.isOpen()) { @@ -505,6 +505,19 @@ public class WebSocketHandler extends TextWebSocketHandler { @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { logger.info("收到来自客户端的Pong消息: " + message.getPayload() + " (会话ID: " + session.getId() + ")"); + + // 从活跃会话集合中移除会话 + SessionInfo sessionInfo = activeSessions.get((Integer) session.getAttributes().get("employeeId")); + if (sessionInfo == null) { + logger.warn("收到来自客户端的Pong消息,但会话不存在: " + session.getId()); + return; + } + Integer loginHistoryId = sessionInfo.getLoginHistoryId(); + if (loginHistoryId != null) { + EmployeeLoginHistory history = employeeLoginHistoryService.findById(loginHistoryId); + history.setActiveTime(LocalDateTime.now()); + employeeLoginHistoryService.save(history); + } } /** @@ -516,7 +529,7 @@ public class WebSocketHandler extends TextWebSocketHandler { logger.info( "WebSocket连接已关闭: " + session.getId() + ", 状态码: " + status.getCode() + ", 原因: " + status.getReason()); // 从活跃会话集合中移除会话 - SessionInfo sessionInfo = activeSessions.remove(session.getAttributes().get("employeeId")); + SessionInfo sessionInfo = activeSessions.remove((Integer) session.getAttributes().get("employeeId")); if (sessionInfo == null) { return; } @@ -572,4 +585,29 @@ public class WebSocketHandler extends TextWebSocketHandler { public int getActiveSessionCount() { return activeSessions.size(); } + + private void sendError(WebSocketSession session, int errorCode, String message) { + if (session == null || !session.isOpen()) { + logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message); + return; + } + + try { + ObjectNode objectNode = objectMapper.createObjectNode(); + objectNode.put(WebSocketConstant.ERROR_CODE_FIELD_NAME, errorCode); + objectNode.put(WebSocketConstant.SUCCESS_FIELD_VALUE, false); + objectNode.put(WebSocketConstant.MESSAGE_FIELD_NAME, message); + String errorMessage = objectMapper.writeValueAsString(objectNode); + + // 检查会话状态并尝试发送错误消息 + if (session.isOpen()) { + session.sendMessage(new TextMessage(errorMessage)); + } else { + logger.warn("会话已关闭,无法发送错误消息: {}", message); + } + } catch (Exception e) { + // 捕获所有可能的异常,防止影响主流程 + logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + } + } } \ No newline at end of file diff --git a/server/src/main/java/com/ecep/contract/service/WebSocketServerTaskManager.java b/server/src/main/java/com/ecep/contract/service/WebSocketServerTaskManager.java new file mode 100644 index 0000000..3a20047 --- /dev/null +++ b/server/src/main/java/com/ecep/contract/service/WebSocketServerTaskManager.java @@ -0,0 +1,130 @@ +package com.ecep.contract.service; + +import com.ecep.contract.Message; +import com.ecep.contract.constant.WebSocketConstant; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; + +@Service +public class WebSocketTaskManager implements InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(WebSocketTaskManager.class); + @Autowired + private ObjectMapper objectMapper; + @Autowired + private ScheduledExecutorService scheduledExecutorService; + private Map taskClzMap = Map.of(); + + @Override + public void afterPropertiesSet() throws Exception { + taskClzMap = Map.of( + "ContractSyncTask", "com.ecep.contract.cloud.u8.ContractSyncTask", + "ContractRepairTask", "com.ecep.contract.ds.contract.tasker.ContractRepairTask" + ); + } + + public void onMessage(WebSocketSession session, JsonNode jsonNode) { + // 处理 sessionId 的消息 + String sessionId = jsonNode.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText(); + try { + handleAsSessionCallback(session, sessionId, jsonNode); + } catch (Exception e) { + sendError(session, sessionId, e.getMessage()); + logger.warn("处理会话回调失败 (会话ID: {}): {}", sessionId, e.getMessage(), e); + } + } + + + private void handleAsSessionCallback(WebSocketSession session, String sessionId, JsonNode jsonNode) { + if (!jsonNode.has("type")) { + throw new IllegalArgumentException("缺失 type 参数"); + } + String type = jsonNode.get("type").asText(); + if (type.equals("createTask")) { + createTask(session, sessionId, jsonNode); + } + } + + private void createTask(WebSocketSession session, String sessionId, JsonNode jsonNode) { + if (!jsonNode.has("taskName")) { + throw new IllegalArgumentException("缺失 taskName 参数"); + } + String taskName = jsonNode.get("taskName").asText(); + + String clzName = taskClzMap.get(taskName); + if (clzName == null) { + throw new IllegalArgumentException("未知的任务类型: " + taskName); + } + Object tasker = null; + try { + Class clz = Class.forName(clzName); + tasker = clz.getDeclaredConstructor().newInstance(); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("未知的任务类型: " + taskName + ", class: " + clzName); + } catch (Exception e) { + throw new IllegalArgumentException("任务类型: " + taskName + ", class: " + clzName + " 实例化失败"); + } + + if (tasker instanceof WebSocketServerTasker t) { + t.setTitleHandler(title -> sendToSession(session, sessionId, "title", title)); + t.setMessageHandler(msg -> sendMessageToSession(session, sessionId, msg)); + t.init(jsonNode); + scheduledExecutorService.submit(t); + } + } + + private boolean sendMessageToSession(WebSocketSession session, String sessionId, Message msg) { + return sendToSession(session, sessionId, "message", msg.getLevel().getName(), msg.getMessage()); + } + + private boolean sendToSession(WebSocketSession session, String sessionId, String type, Object... args) { + try { + String text = objectMapper.writeValueAsString(Map.of( + WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, + "type", type, + "args", args + )); + session.sendMessage(new TextMessage(text)); + } catch (IOException e) { + // 捕获所有可能的异常,防止影响主流程 + logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + } + return true; + } + + + private void sendError(WebSocketSession session, String sessionId, String message) { + if (session == null || !session.isOpen()) { + logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message); + return; + } + + try { + String errorMessage = objectMapper.writeValueAsString(Map.of( + WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId, + WebSocketConstant.SUCCESS_FIELD_VALUE, false, + WebSocketConstant.MESSAGE_FIELD_NAME, message + )); + + // 检查会话状态并尝试发送错误消息 + if (session.isOpen()) { + session.sendMessage(new TextMessage(errorMessage)); + } else { + logger.warn("会话已关闭,无法发送错误消息: {}", message); + } + } catch (Exception e) { + // 捕获所有可能的异常,防止影响主流程 + logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e); + } + } +} diff --git a/server/src/main/java/com/ecep/contract/service/WebSocketServerTasker.java b/server/src/main/java/com/ecep/contract/service/WebSocketServerTasker.java new file mode 100644 index 0000000..3b8e064 --- /dev/null +++ b/server/src/main/java/com/ecep/contract/service/WebSocketServerTasker.java @@ -0,0 +1,15 @@ +package com.ecep.contract.service; + +import com.ecep.contract.Message; +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.concurrent.Callable; +import java.util.function.Predicate; + +public interface WebSocketTasker extends Callable { + void setMessageHandler(Predicate messageHandler); + + void setTitleHandler(Predicate titleHandler); + + void init(JsonNode jsonNode); +} diff --git a/server/src/main/java/com/ecep/contract/ui/MessageHolderImpl.java b/server/src/main/java/com/ecep/contract/ui/MessageHolderImpl.java new file mode 100644 index 0000000..d223f83 --- /dev/null +++ b/server/src/main/java/com/ecep/contract/ui/MessageHolderImpl.java @@ -0,0 +1,4 @@ +package com.ecep.contract.ui; + +public class MessageHolderImpl { +}