feat: 实现WebSocket通信框架及任务管理功能

新增WebSocket客户端和服务端通信框架,包括会话管理、心跳检测和自动重连机制
添加任务管理器用于处理WebSocket任务创建和执行
实现消息回调处理和错误处理机制
重构销售类型服务并添加缓存支持
移除旧的销售类型服务实现
This commit is contained in:
2025-09-17 11:44:39 +08:00
parent ada539bebf
commit 30deb0a280
19 changed files with 495 additions and 160 deletions

View File

@@ -0,0 +1,330 @@
package com.ecep.contract;
import com.ecep.contract.constant.WebSocketConstant;
import com.ecep.contract.msg.SimpleMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import javafx.application.Platform;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;
import lombok.Getter;
import lombok.Setter;
import okhttp3.*;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* WebSocket消息服务
* 提供向服务器端发送WebSocket消息的功能
*/
@Service
public class WebSocketService {
private static final Logger logger = LoggerFactory.getLogger(WebSocketService.class);
@Getter
@Setter
private WebSocket webSocket;
@Autowired
private ObjectMapper objectMapper;
private static final int RECONNECT_DELAY_MS = 5000; // 重连延迟时间(毫秒)
private static final int HEARTBEAT_INTERVAL_MS = 30000; // 心跳间隔时间(毫秒)
@Getter
@Setter
private long readTimeout = 30000;
private String webSocketUrl = "ws://localhost:8080/ws";
private boolean isActive = false; // 标记连接是否活跃
private ScheduledFuture<?> heartbeatTask; // 心跳任务
private ScheduledFuture<?> reconnectFuture; // 修改类型为CompletableFuture<Void>
private SimpleBooleanProperty online = new SimpleBooleanProperty(false);
private SimpleStringProperty message = new SimpleStringProperty("");
// 存储所有活跃的WebSocket会话
private final Map<String, WebSocketClientSession> sessions = Collections.synchronizedMap(new HashMap<>());
// 存储所有活跃的WebSocket会话
private final Map<String, CompletableFuture<JsonNode>> callbacks = Collections.synchronizedMap(new HashMap<>());
WebSocketListener listener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
Platform.runLater(() -> {
online.setValue(true);
message.setValue("已连接");
});
startHeartbeat(); // 启动心跳
}
@Override
public void onMessage(WebSocket webSocket, String text) {
// 处理收到的文本消息
logger.debug("收到WebSocket消息: {}", text);
// 这里可以根据需要处理从服务器接收的数据
if ("pong".equals(text)) {
// 收到pong响应说明连接正常
logger.debug("收到心跳响应");
return;
}
try {
JsonNode node = objectMapper.readTree(text);
if (node.has(WebSocketConstant.MESSAGE_ID_FIELD_NAME)) {
String messageId = node.get(WebSocketConstant.MESSAGE_ID_FIELD_NAME).asText();
CompletableFuture<JsonNode> future = callbacks.remove(messageId);
if (future != null) {
onCallbackMessage(future, node);
} else {
logger.error("未找到对应的回调future: {}", messageId);
}
} else if (node.has(WebSocketConstant.SESSION_ID_FIELD_NAME)) {
String sessionId = node.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText();
WebSocketClientSession session = sessions.get(sessionId);
if (session != null) {
session.onMessage(node);
}
} else if (node.has(WebSocketConstant.ERROR_CODE_FIELD_NAME)) {
int errorCode = node.get(WebSocketConstant.ERROR_CODE_FIELD_NAME).asInt();
String errorMsg = node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText();
// TODO 需要重新登录
logger.error("收到错误消息: 错误码={}, 错误信息={}", errorCode, errorMsg);
}
} catch (Exception e) {
logger.error("处理WebSocket消息失败: {}", e.getMessage(), e);
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
// 处理收到的二进制消息
logger.debug("收到二进制WebSocket消息长度: " + bytes.size());
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
logger.debug("WebSocket连接正在关闭: 代码=" + code + ", 原因=" + reason);
stopHeartbeat(); // 停止心跳
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
logger.debug("WebSocket连接已关闭: 代码=" + code + ", 原因=" + reason);
stopHeartbeat(); // 停止心跳
// 处理重连逻辑
scheduleReconnect();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.error("WebSocket连接失败: " + t.getMessage());
Platform.runLater(() -> {
online.setValue(false);
message.set("连接失败: " + t.getMessage());
});
stopHeartbeat(); // 停止心跳
// 处理重连逻辑
scheduleReconnect();
}
};
private void onCallbackMessage(CompletableFuture<JsonNode> future, JsonNode node) {
if (node.has(WebSocketConstant.SUCCESS_FIELD_VALUE)) {
if (!node.get(WebSocketConstant.SUCCESS_FIELD_VALUE).asBoolean()) {
future.completeExceptionally(
new RuntimeException("请求失败:来自服务器的消息=" + node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText()));
return;
}
}
// 使用具体类型后,这里不会再出现类型不匹配的错误
if (node.has("data")) {
future.complete(node.get("data"));
} else {
future.complete(node);
}
}
public void send(String string) {
if (webSocket != null && webSocket.send(string)) {
logger.debug("send message success:{}", string);
} else if (webSocket == null) {
logger.warn("Failed to send message: WebSocket is not initialized");
}
}
public void send(Object message) throws JsonProcessingException {
send(objectMapper.writeValueAsString(message));
}
public CompletableFuture<JsonNode> send(SimpleMessage msg) {
CompletableFuture<JsonNode> future = new CompletableFuture<>();
try {
if (webSocket == null) {
throw new IllegalStateException("WebSocket is not initialized");
}
String json = objectMapper.writeValueAsString(msg);
callbacks.put(msg.getMessageId(), future);
if (webSocket.send(json)) {
logger.debug("send message success:{}", json);
} else {
future.completeExceptionally(new RuntimeException("Failed to send WebSocket message"));
}
} catch (Exception e) {
logger.error("Failed to send WebSocket message: {}", e.getMessage());
future.completeExceptionally(e);
}
return future;
}
public CompletableFuture<JsonNode> invoke(String service, String method, Object... params) {
SimpleMessage msg = new SimpleMessage();
msg.setService(service);
msg.setMethod(method);
msg.setArguments(params);
return send(msg).orTimeout(getReadTimeout(), TimeUnit.MILLISECONDS);
}
public void initWebSocket() {
isActive = true;
OkHttpClient httpClient = Desktop.instance.getHttpClient();
try {
// 构建WebSocket请求包含认证信息
Request request = new Request.Builder()
.url(webSocketUrl)
.build();
webSocket = httpClient.newWebSocket(request, listener);
} catch (Exception e) {
logger.error("建立WebSocket连接失败: " + e.getMessage());
Platform.runLater(() -> {
online.setValue(false);
message.set("连接失败: " + e.getMessage());
});
// 处理重连逻辑
scheduleReconnect();
}
}
/**
* 启动心跳任务定期发送ping消息保持连接
*/
private void startHeartbeat() {
// 先停止可能存在的心跳任务
stopHeartbeat();
ScheduledExecutorService executorService = Desktop.instance.getExecutorService();
heartbeatTask = executorService.scheduleAtFixedRate(this::heartbeat, RECONNECT_DELAY_MS, HEARTBEAT_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
void heartbeat() {
if (!isActive) {
return;
}
try {
if (webSocket != null) {
logger.debug("发送心跳 ping");
webSocket.send("ping");
}
} catch (Exception e) {
logger.error("发送心跳失败: {}", e.getMessage());
}
}
/**
* 停止心跳任务
*/
private void stopHeartbeat() {
if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
heartbeatTask.cancel(true);
heartbeatTask = null;
}
}
/**
* 安排重连任务
*/
private void scheduleReconnect() {
if (!isActive) {
return; // 如果连接已被主动关闭,则不再重连
}
// 取消之前可能存在的重连任务
if (reconnectFuture != null && !reconnectFuture.isDone()) {
reconnectFuture.cancel(true);
}
// 创建新的重连任务s
logger.info("计划在 {} 毫秒后尝试重连WebSocket", RECONNECT_DELAY_MS);
reconnectFuture = Desktop.instance.getExecutorService().schedule(() -> {
if (isActive) {
logger.info("尝试重新连接WebSocket");
Platform.runLater(() -> {
online.setValue(false);
message.set("正在重新连接WebSocket...");
});
initWebSocket();
}
}, RECONNECT_DELAY_MS, TimeUnit.MILLISECONDS);
}
/**
* 关闭WebSocket连接
*/
public void closeWebSocket() {
isActive = false;
stopHeartbeat();
if (reconnectFuture != null && !reconnectFuture.isDone()) {
reconnectFuture.cancel(false);
reconnectFuture = null;
}
if (webSocket != null) {
webSocket.close(1000, "主动关闭连接");
webSocket = null;
}
}
public StringProperty getMessageProperty() {
return message;
}
public BooleanProperty getOnlineProperty() {
return online;
}
public void withSession(Consumer<WebSocketClientSession> sessionConsumer) {
WebSocketClientSession session = createSession();
try {
sessionConsumer.accept(session);
} finally {
// closeSession(session);
}
}
private void closeSession(WebSocketClientSession session) {
if (session != null) {
sessions.remove(session.getSessionId());
session.close();
}
}
private WebSocketClientSession createSession() {
WebSocketClientSession session = new WebSocketClientSession(this);
sessions.put(session.getSessionId(), session);
return session;
}
}