重构文件类型相关Service以支持国际化查询 添加findOneByLang辅助方法统一查询逻辑 实现StringConverter支持UI控件显示 优化缓存配置和查询性能 新增UnitStringConverter和CustomerCatalogStringConverter 完善文档和测试用例
362 lines
13 KiB
Java
362 lines
13 KiB
Java
package com.ecep.contract;
|
||
|
||
import java.util.Collections;
|
||
import java.util.HashMap;
|
||
import java.util.Map;
|
||
import java.util.concurrent.CompletableFuture;
|
||
import java.util.concurrent.ScheduledExecutorService;
|
||
import java.util.concurrent.ScheduledFuture;
|
||
import java.util.concurrent.TimeUnit;
|
||
import java.util.function.Consumer;
|
||
|
||
import org.slf4j.Logger;
|
||
import org.slf4j.LoggerFactory;
|
||
import org.springframework.beans.factory.annotation.Autowired;
|
||
import org.springframework.stereotype.Service;
|
||
|
||
import com.ecep.contract.constant.WebSocketConstant;
|
||
import com.ecep.contract.controller.OkHttpLoginController;
|
||
import com.ecep.contract.msg.SimpleMessage;
|
||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||
import com.fasterxml.jackson.databind.JsonNode;
|
||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||
|
||
import javafx.application.Platform;
|
||
import javafx.beans.property.BooleanProperty;
|
||
import javafx.beans.property.SimpleBooleanProperty;
|
||
import javafx.beans.property.SimpleStringProperty;
|
||
import javafx.beans.property.StringProperty;
|
||
import lombok.Getter;
|
||
import lombok.Setter;
|
||
import okhttp3.OkHttpClient;
|
||
import okhttp3.Request;
|
||
import okhttp3.Response;
|
||
import okhttp3.WebSocket;
|
||
import okhttp3.WebSocketListener;
|
||
import okio.ByteString;
|
||
|
||
/**
|
||
* WebSocket消息服务
|
||
* 提供向服务器端发送WebSocket消息的功能
|
||
*/
|
||
@Service
|
||
public class WebSocketClientService {
|
||
private static final Logger logger = LoggerFactory.getLogger(WebSocketClientService.class);
|
||
@Getter
|
||
@Setter
|
||
private WebSocket webSocket;
|
||
@Getter
|
||
@Autowired
|
||
private ObjectMapper objectMapper;
|
||
private static final int RECONNECT_DELAY_MS = 5000; // 重连延迟时间(毫秒)
|
||
private static final int HEARTBEAT_INTERVAL_MS = 30000; // 心跳间隔时间(毫秒)
|
||
@Getter
|
||
@Setter
|
||
private long readTimeout = 30000;
|
||
private String webSocketUrl = "ws://localhost:8080/ws";
|
||
private boolean isActive = false; // 标记连接是否活跃
|
||
private ScheduledFuture<?> heartbeatTask; // 心跳任务
|
||
private ScheduledFuture<?> reconnectFuture; // 修改类型为CompletableFuture<Void>
|
||
private SimpleBooleanProperty online = new SimpleBooleanProperty(false);
|
||
private SimpleStringProperty message = new SimpleStringProperty("");
|
||
|
||
// 存储所有活跃的WebSocket会话
|
||
private final Map<String, WebSocketClientSession> sessions = Collections.synchronizedMap(new HashMap<>());
|
||
|
||
// 存储所有活跃的WebSocket会话
|
||
private final Map<String, CompletableFuture<JsonNode>> callbacks = Collections.synchronizedMap(new HashMap<>());
|
||
|
||
WebSocketListener listener = new WebSocketListener() {
|
||
@Override
|
||
public void onOpen(WebSocket webSocket, Response response) {
|
||
Platform.runLater(() -> {
|
||
online.setValue(true);
|
||
message.setValue("已连接");
|
||
});
|
||
startHeartbeat(); // 启动心跳
|
||
}
|
||
|
||
@Override
|
||
public void onMessage(WebSocket webSocket, String text) {
|
||
// 处理收到的文本消息
|
||
logger.debug("收到WebSocket消息: {}", text);
|
||
// 这里可以根据需要处理从服务器接收的数据
|
||
if ("pong".equals(text)) {
|
||
// 收到pong响应,说明连接正常
|
||
logger.debug("收到心跳响应");
|
||
return;
|
||
}
|
||
|
||
try {
|
||
JsonNode node = objectMapper.readTree(text);
|
||
if (node.has(WebSocketConstant.MESSAGE_ID_FIELD_NAME)) {
|
||
String messageId = node.get(WebSocketConstant.MESSAGE_ID_FIELD_NAME).asText();
|
||
CompletableFuture<JsonNode> future = callbacks.remove(messageId);
|
||
if (future != null) {
|
||
onCallbackMessage(future, node);
|
||
} else {
|
||
logger.error("未找到对应的回调future: {}", messageId);
|
||
}
|
||
return;
|
||
}
|
||
|
||
if (node.has(WebSocketConstant.SESSION_ID_FIELD_NAME)) {
|
||
String sessionId = node.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText();
|
||
WebSocketClientSession session = sessions.get(sessionId);
|
||
if (session != null) {
|
||
try {
|
||
session.onMessage(node);
|
||
} catch (Exception e) {
|
||
session.updateMessage(java.util.logging.Level.SEVERE, "会话异常: " + e.getMessage());
|
||
}
|
||
}
|
||
return;
|
||
}
|
||
|
||
if (node.has(WebSocketConstant.ERROR_CODE_FIELD_NAME)) {
|
||
int errorCode = node.get(WebSocketConstant.ERROR_CODE_FIELD_NAME).asInt();
|
||
String errorMsg = node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText();
|
||
logger.error("收到错误消息: 错误码={}, 错误信息={}", errorCode, errorMsg);
|
||
if (errorCode == WebSocketConstant.ERROR_CODE_UNAUTHORIZED) {
|
||
// 处理未授权错误,重新登录
|
||
OkHttpLoginController controller = new OkHttpLoginController();
|
||
controller.tryLogin();
|
||
// 需要把窗口顶置
|
||
}
|
||
return;
|
||
}
|
||
} catch (Exception e) {
|
||
logger.error("处理WebSocket消息失败: {}", e.getMessage(), e);
|
||
}
|
||
}
|
||
|
||
@Override
|
||
public void onMessage(WebSocket webSocket, ByteString bytes) {
|
||
// 处理收到的二进制消息
|
||
logger.debug("收到二进制WebSocket消息,长度: " + bytes.size());
|
||
}
|
||
|
||
@Override
|
||
public void onClosing(WebSocket webSocket, int code, String reason) {
|
||
logger.debug("WebSocket连接正在关闭: 代码=" + code + ", 原因=" + reason);
|
||
stopHeartbeat(); // 停止心跳
|
||
}
|
||
|
||
@Override
|
||
public void onClosed(WebSocket webSocket, int code, String reason) {
|
||
logger.debug("WebSocket连接已关闭: 代码=" + code + ", 原因=" + reason);
|
||
stopHeartbeat(); // 停止心跳
|
||
// 处理重连逻辑
|
||
scheduleReconnect();
|
||
}
|
||
|
||
@Override
|
||
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
|
||
logger.error("WebSocket连接失败: " + t.getMessage() + ", response=" + response, t);
|
||
Platform.runLater(() -> {
|
||
online.setValue(false);
|
||
message.set("连接失败: " + t.getMessage());
|
||
});
|
||
stopHeartbeat(); // 停止心跳
|
||
// 处理重连逻辑
|
||
scheduleReconnect();
|
||
}
|
||
};
|
||
|
||
private void onCallbackMessage(CompletableFuture<JsonNode> future, JsonNode node) {
|
||
if (node.has(WebSocketConstant.SUCCESS_FIELD_VALUE)) {
|
||
if (!node.get(WebSocketConstant.SUCCESS_FIELD_VALUE).asBoolean()) {
|
||
future.completeExceptionally(
|
||
new RuntimeException(
|
||
"请求失败:来自服务器的消息=" + node.get(WebSocketConstant.MESSAGE_FIELD_NAME).asText()));
|
||
return;
|
||
}
|
||
}
|
||
// 使用具体类型后,这里不会再出现类型不匹配的错误
|
||
if (node.has("data")) {
|
||
future.complete(node.get("data"));
|
||
} else {
|
||
future.complete(node);
|
||
}
|
||
}
|
||
|
||
public void send(String string) {
|
||
if (webSocket != null && webSocket.send(string)) {
|
||
logger.debug("send message success:{}", string);
|
||
} else if (webSocket == null) {
|
||
logger.warn("Failed to send message: WebSocket is not initialized");
|
||
}
|
||
}
|
||
|
||
public void send(Object message) throws JsonProcessingException {
|
||
send(objectMapper.writeValueAsString(message));
|
||
}
|
||
|
||
public CompletableFuture<JsonNode> send(SimpleMessage msg) {
|
||
CompletableFuture<JsonNode> future = new CompletableFuture<>();
|
||
try {
|
||
if (webSocket == null) {
|
||
throw new IllegalStateException("WebSocket is not initialized");
|
||
}
|
||
if (!online.get()) {
|
||
throw new IllegalStateException("WebSocket is not online");
|
||
}
|
||
String json = objectMapper.writeValueAsString(msg);
|
||
callbacks.put(msg.getMessageId(), future);
|
||
if (webSocket.send(json)) {
|
||
logger.debug("send message success:{}", json);
|
||
} else {
|
||
if (isActive) {
|
||
future.completeExceptionally(new RuntimeException("Failed to send WebSocket message"));
|
||
} else {
|
||
future.completeExceptionally(new RuntimeException("WebSocket is not active"));
|
||
}
|
||
}
|
||
} catch (Exception e) {
|
||
logger.error("Failed to send WebSocket message: {}", e.getMessage());
|
||
future.completeExceptionally(e);
|
||
}
|
||
return future;
|
||
}
|
||
|
||
public CompletableFuture<JsonNode> invoke(String service, String method, Object... params) {
|
||
SimpleMessage msg = new SimpleMessage();
|
||
msg.setService(service);
|
||
msg.setMethod(method);
|
||
msg.setArguments(params);
|
||
return send(msg).orTimeout(getReadTimeout(), TimeUnit.MILLISECONDS);
|
||
}
|
||
|
||
public void initWebSocket() {
|
||
isActive = true;
|
||
OkHttpClient httpClient = Desktop.instance.getHttpClient();
|
||
|
||
try {
|
||
// 构建WebSocket请求,包含认证信息
|
||
Request request = new Request.Builder()
|
||
.url(webSocketUrl)
|
||
.build();
|
||
webSocket = httpClient.newWebSocket(request, listener);
|
||
} catch (Exception e) {
|
||
logger.error("建立WebSocket连接失败: " + e.getMessage());
|
||
Platform.runLater(() -> {
|
||
online.setValue(false);
|
||
message.set("连接失败: " + e.getMessage());
|
||
});
|
||
// 处理重连逻辑
|
||
scheduleReconnect();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 启动心跳任务,定期发送ping消息保持连接
|
||
*/
|
||
private void startHeartbeat() {
|
||
// 先停止可能存在的心跳任务
|
||
stopHeartbeat();
|
||
ScheduledExecutorService executorService = Desktop.instance.getExecutorService();
|
||
heartbeatTask = executorService.scheduleAtFixedRate(this::heartbeat, RECONNECT_DELAY_MS, HEARTBEAT_INTERVAL_MS,
|
||
TimeUnit.MILLISECONDS);
|
||
}
|
||
|
||
void heartbeat() {
|
||
if (!isActive) {
|
||
return;
|
||
}
|
||
|
||
try {
|
||
if (webSocket != null) {
|
||
logger.debug("发送心跳 ping");
|
||
webSocket.send("ping");
|
||
}
|
||
} catch (Exception e) {
|
||
logger.error("发送心跳失败: {}", e.getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 停止心跳任务
|
||
*/
|
||
private void stopHeartbeat() {
|
||
if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
|
||
heartbeatTask.cancel(true);
|
||
heartbeatTask = null;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 安排重连任务
|
||
*/
|
||
private void scheduleReconnect() {
|
||
if (!isActive) {
|
||
return; // 如果连接已被主动关闭,则不再重连
|
||
}
|
||
|
||
// 取消之前可能存在的重连任务
|
||
if (reconnectFuture != null && !reconnectFuture.isDone()) {
|
||
reconnectFuture.cancel(true);
|
||
}
|
||
// 创建新的重连任务s
|
||
logger.info("计划在 {} 毫秒后尝试重连WebSocket", RECONNECT_DELAY_MS);
|
||
|
||
reconnectFuture = Desktop.instance.getExecutorService().schedule(() -> {
|
||
if (isActive) {
|
||
logger.info("尝试重新连接WebSocket");
|
||
Platform.runLater(() -> {
|
||
online.setValue(false);
|
||
message.set("正在重新连接WebSocket...");
|
||
});
|
||
initWebSocket();
|
||
}
|
||
}, RECONNECT_DELAY_MS, TimeUnit.MILLISECONDS);
|
||
|
||
}
|
||
|
||
/**
|
||
* 关闭WebSocket连接
|
||
*/
|
||
public void closeWebSocket() {
|
||
isActive = false;
|
||
stopHeartbeat();
|
||
|
||
if (reconnectFuture != null && !reconnectFuture.isDone()) {
|
||
reconnectFuture.cancel(false);
|
||
reconnectFuture = null;
|
||
}
|
||
if (webSocket != null) {
|
||
webSocket.close(1000, "主动关闭连接");
|
||
webSocket = null;
|
||
}
|
||
}
|
||
|
||
public StringProperty getMessageProperty() {
|
||
return message;
|
||
}
|
||
|
||
public BooleanProperty getOnlineProperty() {
|
||
return online;
|
||
}
|
||
|
||
public void withSession(Consumer<WebSocketClientSession> sessionConsumer) {
|
||
WebSocketClientSession session = createSession();
|
||
try {
|
||
sessionConsumer.accept(session);
|
||
} finally {
|
||
// closeSession(session);vvvv
|
||
}
|
||
}
|
||
|
||
public void closeSession(WebSocketClientSession session) {
|
||
if (session != null) {
|
||
sessions.remove(session.getSessionId());
|
||
// session.close();
|
||
}
|
||
}
|
||
|
||
public WebSocketClientSession createSession() {
|
||
WebSocketClientSession session = new WebSocketClientSession(this);
|
||
sessions.put(session.getSessionId(), session);
|
||
return session;
|
||
}
|
||
}
|