package com.ecep.contract; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.ecep.contract.constant.WebSocketConstant; import com.ecep.contract.controller.OkHttpLoginController; import com.ecep.contract.msg.SimpleMessage; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import javafx.application.Platform; import javafx.beans.property.BooleanProperty; import javafx.beans.property.SimpleBooleanProperty; import javafx.beans.property.SimpleStringProperty; import javafx.beans.property.StringProperty; import lombok.Getter; import lombok.Setter; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.WebSocket; import okhttp3.WebSocketListener; import okio.ByteString; /** * WebSocket消息服务 * 提供向服务器端发送WebSocket消息的功能 */ @Service public class WebSocketClientService { private static final Logger logger = LoggerFactory.getLogger(WebSocketClientService.class); @Getter @Setter private WebSocket webSocket; @Getter @Autowired private ObjectMapper objectMapper; private static final int RECONNECT_DELAY_MS = 5000; // 重连延迟时间(毫秒) private static final int HEARTBEAT_INTERVAL_MS = 30000; // 心跳间隔时间(毫秒) @Getter @Setter private long readTimeout = 30000; private String webSocketUrl = "ws://localhost:8080/ws"; private boolean isActive = false; // 标记连接是否活跃 private ScheduledFuture heartbeatTask; // 心跳任务 private ScheduledFuture reconnectFuture; // 修改类型为CompletableFuture private SimpleBooleanProperty online = new SimpleBooleanProperty(false); private SimpleStringProperty message = new SimpleStringProperty(""); // 存储所有活跃的WebSocket会话 private final Map sessions = Collections.synchronizedMap(new HashMap<>()); // 存储所有活跃的WebSocket会话 private final Map> callbacks = Collections.synchronizedMap(new HashMap<>()); WebSocketListener listener = new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { Platform.runLater(() -> { online.setValue(true); message.setValue("已连接"); }); startHeartbeat(); // 启动心跳 } @Override public void onMessage(WebSocket webSocket, String text) { // 处理收到的文本消息 logger.debug("收到WebSocket消息: {}", text); // 这里可以根据需要处理从服务器接收的数据 if ("pong".equals(text)) { // 收到pong响应,说明连接正常 logger.debug("收到心跳响应"); return; } try { JsonNode node = objectMapper.readTree(text); if (node.has(WebSocketConstant.MESSAGE_ID_FIELD_NAME)) { String messageId = node.get(WebSocketConstant.MESSAGE_ID_FIELD_NAME).asText(); CompletableFuture future = callbacks.remove(messageId); if (future != null) { onCallbackMessage(future, node); } else { logger.error("未找到对应的回调future: {}", messageId); } return; } if (node.has(WebSocketConstant.SESSION_ID_FIELD_NAME)) { String sessionId = node.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText(); WebSocketClientSession session = sessions.get(sessionId); if (session != null) { try { session.onMessage(node); } catch (Exception e) { session.updateMessage(java.util.logging.Level.SEVERE, "会话异常: " + e.getMessage()); } } return; } if (node.has(WebSocketConstant.ERROR_CODE_FIELD_NAME)) { int errorCode = node.get(WebSocketConstant.ERROR_CODE_FIELD_NAME).asInt(); String errorMsg = node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText(); logger.error("收到错误消息: 错误码={}, 错误信息={}", errorCode, errorMsg); if (errorCode == WebSocketConstant.ERROR_CODE_UNAUTHORIZED) { // 处理未授权错误,重新登录 OkHttpLoginController controller = new OkHttpLoginController(); controller.tryLogin(); // 需要把窗口顶置 } return; } } catch (Exception e) { logger.error("处理WebSocket消息失败: {}", e.getMessage(), e); } } @Override public void onMessage(WebSocket webSocket, ByteString bytes) { // 处理收到的二进制消息 logger.debug("收到二进制WebSocket消息,长度: " + bytes.size()); } @Override public void onClosing(WebSocket webSocket, int code, String reason) { logger.debug("WebSocket连接正在关闭: 代码=" + code + ", 原因=" + reason); stopHeartbeat(); // 停止心跳 } @Override public void onClosed(WebSocket webSocket, int code, String reason) { logger.debug("WebSocket连接已关闭: 代码=" + code + ", 原因=" + reason); stopHeartbeat(); // 停止心跳 // 处理重连逻辑 scheduleReconnect(); } @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { logger.error("WebSocket连接失败: " + t.getMessage() + ", response=" + response, t); Platform.runLater(() -> { online.setValue(false); message.set("连接失败: " + t.getMessage()); }); stopHeartbeat(); // 停止心跳 // 处理重连逻辑 scheduleReconnect(); } }; private void onCallbackMessage(CompletableFuture future, JsonNode node) { if (node.has(WebSocketConstant.SUCCESS_FIELD_VALUE)) { if (!node.get(WebSocketConstant.SUCCESS_FIELD_VALUE).asBoolean()) { future.completeExceptionally( new RuntimeException( "请求失败:来自服务器的消息=" + node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText())); return; } } // 使用具体类型后,这里不会再出现类型不匹配的错误 if (node.has("data")) { future.complete(node.get("data")); } else { future.complete(node); } } public void send(String string) { if (webSocket != null && webSocket.send(string)) { logger.debug("send message success:{}", string); } else if (webSocket == null) { logger.warn("Failed to send message: WebSocket is not initialized"); } } public void send(Object message) throws JsonProcessingException { send(objectMapper.writeValueAsString(message)); } public CompletableFuture send(SimpleMessage msg) { CompletableFuture future = new CompletableFuture<>(); try { if (webSocket == null) { throw new IllegalStateException("WebSocket is not initialized"); } if (!online.get()) { throw new IllegalStateException("WebSocket is not online"); } String json = objectMapper.writeValueAsString(msg); callbacks.put(msg.getMessageId(), future); if (webSocket.send(json)) { logger.debug("send message success:{}", json); } else { if (isActive) { future.completeExceptionally(new RuntimeException("Failed to send WebSocket message")); } else { future.completeExceptionally(new RuntimeException("WebSocket is not active")); } } } catch (Exception e) { logger.error("Failed to send WebSocket message: {}", e.getMessage()); future.completeExceptionally(e); } return future; } public CompletableFuture invoke(String service, String method, Object... params) { SimpleMessage msg = new SimpleMessage(); msg.setService(service); msg.setMethod(method); msg.setArguments(params); return send(msg).orTimeout(getReadTimeout(), TimeUnit.MILLISECONDS); } public void initWebSocket() { isActive = true; OkHttpClient httpClient = Desktop.instance.getHttpClient(); try { // 构建WebSocket请求,包含认证信息 Request request = new Request.Builder() .url(webSocketUrl) .build(); webSocket = httpClient.newWebSocket(request, listener); } catch (Exception e) { logger.error("建立WebSocket连接失败: " + e.getMessage()); Platform.runLater(() -> { online.setValue(false); message.set("连接失败: " + e.getMessage()); }); // 处理重连逻辑 scheduleReconnect(); } } /** * 启动心跳任务,定期发送ping消息保持连接 */ private void startHeartbeat() { // 先停止可能存在的心跳任务 stopHeartbeat(); ScheduledExecutorService executorService = Desktop.instance.getExecutorService(); heartbeatTask = executorService.scheduleAtFixedRate(this::heartbeat, RECONNECT_DELAY_MS, HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS); } void heartbeat() { if (!isActive) { return; } try { if (webSocket != null) { logger.debug("发送心跳 ping"); webSocket.send("ping"); } } catch (Exception e) { logger.error("发送心跳失败: {}", e.getMessage()); } } /** * 停止心跳任务 */ private void stopHeartbeat() { if (heartbeatTask != null && !heartbeatTask.isCancelled()) { heartbeatTask.cancel(true); heartbeatTask = null; } } /** * 安排重连任务 */ private void scheduleReconnect() { if (!isActive) { return; // 如果连接已被主动关闭,则不再重连 } // 取消之前可能存在的重连任务 if (reconnectFuture != null && !reconnectFuture.isDone()) { reconnectFuture.cancel(true); } // 创建新的重连任务s logger.info("计划在 {} 毫秒后尝试重连WebSocket", RECONNECT_DELAY_MS); reconnectFuture = Desktop.instance.getExecutorService().schedule(() -> { if (isActive) { logger.info("尝试重新连接WebSocket"); Platform.runLater(() -> { online.setValue(false); message.set("正在重新连接WebSocket..."); }); initWebSocket(); } }, RECONNECT_DELAY_MS, TimeUnit.MILLISECONDS); } /** * 关闭WebSocket连接 */ public void closeWebSocket() { isActive = false; stopHeartbeat(); if (reconnectFuture != null && !reconnectFuture.isDone()) { reconnectFuture.cancel(false); reconnectFuture = null; } if (webSocket != null) { webSocket.close(1000, "主动关闭连接"); webSocket = null; } } public StringProperty getMessageProperty() { return message; } public BooleanProperty getOnlineProperty() { return online; } public void withSession(Consumer sessionConsumer) { WebSocketClientSession session = createSession(); try { sessionConsumer.accept(session); } finally { // closeSession(session); } } public void closeSession(WebSocketClientSession session) { if (session != null) { sessions.remove(session.getSessionId()); // session.close(); } } public WebSocketClientSession createSession() { WebSocketClientSession session = new WebSocketClientSession(this); sessions.put(session.getSessionId(), session); return session; } }