refactor: 重构WebSocket服务及相关实体类

重构WebSocket服务名称从WebSocketService改为WebSocketClientService,并实现Serializable接口
添加WebSocket常量定义和消息处理实现
优化实体类equals和hashCode方法
修复控制器路径和日志配置
添加查询服务和任务接口方法
This commit is contained in:
2025-09-17 11:45:50 +08:00
parent 30deb0a280
commit c42ff7501d
152 changed files with 1933 additions and 999 deletions

View File

@@ -2,14 +2,15 @@ package com.ecep.contract.service;
import com.ecep.contract.PageArgument;
import com.ecep.contract.PageContent;
import com.ecep.contract.WebSocketService;
import com.ecep.contract.WebSocketClientService;
import com.ecep.contract.model.IdentityEntity;
import com.ecep.contract.msg.SimpleMessage;
import com.ecep.contract.vm.IdentityViewModel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
@@ -19,13 +20,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class QueryService<T extends IdentityEntity, TV extends IdentityViewModel<T>>
implements ViewModelService<T, TV> {
// 添加日志记录器
private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
@Autowired
protected WebSocketService webSocketService;
protected WebSocketClientService webSocketService;
@Autowired
protected ObjectMapper objectMapper;
@@ -61,56 +64,60 @@ public class QueryService<T extends IdentityEntity, TV extends IdentityViewModel
@Override
public T save(T entity) {
SimpleMessage msg = new SimpleMessage();
msg.setService(getBeanName());
msg.setMethod("save");
msg.setArguments(entity);
try {
Object response = webSocketService.send(msg).get(webSocketService.getReadTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
objectMapper.updateValue(entity, response);
}
return async("save", entity, entity.getClass().getName()).handle((response, ex) -> {
if (response != null) {
try {
objectMapper.updateValue(entity, response);
} catch (JsonMappingException e) {
throw new RuntimeException(e);
}
}
return entity;
}).get();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("保存实体失败", e);
}
return entity;
}
@Override
public void delete(T entity) {
SimpleMessage msg = new SimpleMessage();
msg.setService(getBeanName());
msg.setMethod("delete");
msg.setArguments(entity);
try {
JsonNode response = webSocketService.send(msg).get(webSocketService.getReadTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
objectMapper.updateValue(entity, response);
}
async("delete", entity, entity.getClass().getName()).handle((response, ex) -> {
if (response != null) {
return updateValue(entity, response);
}
return null;
}).get();
} catch (Exception e) {
e.printStackTrace();
logger.error("删除实体失败 #{}", entity.getId(), e);
throw new RuntimeException("删除实体失败", e);
}
}
public CompletableFuture<T> asyncFindById(Integer id) {
SimpleMessage msg = new SimpleMessage();
msg.setService(getBeanName());
msg.setMethod("findById");
msg.setArguments(id);
return webSocketService.send(msg).orTimeout(webSocketService.getReadTimeout(), TimeUnit.MILLISECONDS).handle((response, ex) -> {
public T updateValue(T entity, JsonNode node) {
try {
objectMapper.updateValue(entity, node);
} catch (JsonMappingException e) {
throw new RuntimeException(e);
}
return entity;
}
public CompletableFuture<JsonNode> async(String method, Object... params) {
return webSocketService.invoke(getBeanName(), method, params).handle((response, ex) -> {
if (ex != null) {
return null;
}
if (response == null) {
return null;
throw new RuntimeException("远程方法+" + method + "+调用失败", ex);
}
return response;
});
}
public CompletableFuture<T> asyncFindById(Integer id) {
return async("findById", id, Integer.class).handle((response, ex) -> {
T newEntity = createNewEntity();
try {
objectMapper.updateValue(newEntity, response);
} catch (JsonMappingException e) {
throw new RuntimeException(response.toString(), e);
}
return newEntity;
return updateValue(newEntity, response);
});
}
@@ -119,45 +126,77 @@ public class QueryService<T extends IdentityEntity, TV extends IdentityViewModel
try {
return asyncFindById(id).get();
} catch (Exception e) {
e.printStackTrace();
logger.error("查询实体失败 #{}", id, e);
throw new RuntimeException("查询实体失败", e);
}
return null;
}
public List<T> findAll() {
return findAll(null, Pageable.unpaged()).getContent();
}
/**
* 异步查询所有实体数据返回CompletableFuture对象
*
* @param params 查询参数映射表,可以包含过滤条件等信息
* @param pageable 分页参数,包含页码、每页条数、排序规则等
* @return 包含分页结果的CompletableFuture对象
*/
public CompletableFuture<Page<T>> asyncFindAll(Map<String, Object> params, Pageable pageable) {
SimpleMessage msg = new SimpleMessage();
msg.setService(getBeanName());
msg.setMethod("findAll");
msg.setArguments(params, PageArgument.of(pageable));
return webSocketService.send(msg).orTimeout(webSocketService.getReadTimeout(), TimeUnit.MILLISECONDS).handle((response, ex) -> {
if (ex != null) {
return null;
}
if (response == null) {
return null;
}
// 调用async方法发送WebSocket请求获取异步响应结果
return async("findAll", params, PageArgument.of(pageable)).handle((response, ex) -> {
try {
// 将响应结果转换为PageContent对象使用当前类的createNewEntity方法创建实体实例
PageContent<T> pageContent = of(response, objectMapper, this::createNewEntity);
// 将PageContent转换为Spring Data的Page对象并返回
return pageContent.toPage();
} catch (Exception e) {
// 处理转换过程中的异常包装为RuntimeException并附带响应内容
throw new RuntimeException(response.toString(), e);
}
});
}
/**
* 同步查询所有实体数据实现了ViewModelService接口的findAll方法
*
* @param params 查询参数映射表,可以包含过滤条件等信息
* @param pageable 分页参数,包含页码、每页条数、排序规则等
* @return 包含查询结果的Page对象
* @throws RuntimeException 当查询失败时抛出异常
*/
@Override
public Page<T> findAll(Map<String, Object> params, Pageable pageable) {
try {
// 调用异步方法并阻塞等待结果返回
return asyncFindAll(params, pageable).get();
} catch (Exception e) {
e.printStackTrace();
// 处理异步查询过程中的任何异常,转换为运行时异常抛出
throw new RuntimeException("查询所有实体失败", e);
}
}
/**
* 异步执行计数操作返回CompletableFuture<Long>类型结果
*
* @param params 包含计数所需参数的Map集合
* @return 返回一个CompletableFuture<Long>对象,用于获取异步操作的结果
*/
public CompletableFuture<Long> asyncCount(Map<String, Object> params) {
// 调用async方法执行名为"count"的异步操作传入参数params
// 使用handle方法处理异步操作的结果或异常
return async("count", params).handle((response, ex) -> {
// 将响应结果转换为Long类型并返回
return response.asLong();
});
}
public long count(Map<String, Object> params) {
try {
return asyncCount(params).get();
} catch (Exception e) {
throw new RuntimeException("计数失败", e);
}
return null;
}
public List<T> search(String searchText) {
@@ -199,4 +238,4 @@ public class QueryService<T extends IdentityEntity, TV extends IdentityViewModel
return pageContent;
}
}
}