refactor(client): 重构服务类继承关系并统一使用QueryService

重构所有服务类,使其继承自QueryService接口,统一数据查询逻辑。同时为服务类添加@Service注解,确保Spring容器管理。更新相关FXML文件的控制器路径,从manager.ds调整为controller目录结构。调整pom.xml版本号至0.0.84-SNAPSHOT。新增MessageNotitfication和SimpleMessage消息类,提供基础消息结构支持。
This commit is contained in:
2025-09-11 00:06:22 +08:00
parent 23e1f98ae5
commit 375de610ef
163 changed files with 2085 additions and 578 deletions

View File

@@ -6,12 +6,12 @@
<parent>
<groupId>com.ecep.contract</groupId>
<artifactId>Contract-Manager</artifactId>
<version>0.0.80-SNAPSHOT</version>
<version>0.0.84-SNAPSHOT</version>
</parent>
<groupId>com.ecep.contract</groupId>
<artifactId>server</artifactId>
<version>0.0.80-SNAPSHOT</version>
<version>0.0.84-SNAPSHOT</version>
<properties>
<maven.compiler.source>${java.version}</maven.compiler.source>
@@ -22,7 +22,7 @@
<dependency>
<groupId>com.ecep.contract</groupId>
<artifactId>common</artifactId>
<version>0.0.80-SNAPSHOT</version>
<version>0.0.84-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -0,0 +1,10 @@
package com.ecep.contract;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import com.fasterxml.jackson.databind.JsonNode;
public interface QueryService<T> {
Page<T> findAll(JsonNode paramsNode, Pageable pageable);
}

View File

@@ -3,10 +3,13 @@ package com.ecep.contract;
import java.time.Duration;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.ConfigurableBootstrapContext;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringApplicationHook;
@@ -19,6 +22,7 @@ import org.springframework.boot.context.metrics.buffering.StartupTimeline;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.env.ConfigurableEnvironment;
@@ -35,8 +39,8 @@ import com.ecep.contract.ds.DsRepositoriesConfig;
import com.ecep.contract.util.TaskMonitorCenter;
@SpringBootApplication(exclude = {
org.springframework.boot.autoconfigure.mail.MailSenderAutoConfiguration.class,
RedisReactiveAutoConfiguration.class, RedisRepositoriesAutoConfiguration.class
org.springframework.boot.autoconfigure.mail.MailSenderAutoConfiguration.class,
RedisReactiveAutoConfiguration.class, RedisRepositoriesAutoConfiguration.class
})
@EnableScheduling
@EnableAsync
@@ -59,8 +63,16 @@ public class SpringApp {
return context.getBean(requiredType);
}
public static Object getBean(String name) throws BeansException {
return context.getBean(name);
}
public static ConfigurableListableBeanFactory getBeanFactory() {
return context.getBeanFactory();
}
public static void launch(Properties properties, MessageHolder holder) {
//
holder.debug("应用程序环境准备中...");
SpringApplication.withHook(new Hook(holder), () -> {
@@ -92,15 +104,17 @@ public class SpringApp {
}
});
// startup.start("");
// startup.start("");
context = application.run();
logger.debug("SpringApp.launch application.run().");
// Duration between = Duration.between(startup.getBufferedTimeline().getStartTime(), Instant.now());
// Duration between =
// Duration.between(startup.getBufferedTimeline().getStartTime(),
// Instant.now());
// holder.info("应用程序环境加载完成... " + between);
});
// CompletableFuture.runAsync(() -> {
// // 在这里调用 startup 性能分析
// analyzeStartupPerformance(startup);
// // 在这里调用 startup 性能分析
// analyzeStartupPerformance(startup);
// }, Desktop.instance.getExecutorService());
}
@@ -221,14 +235,13 @@ public class SpringApp {
}
}
public static TaskMonitorCenter getTaskMonitorCenter() {
return getBean(TaskMonitorCenter.class);
}
// @Bean
// public ScheduledExecutorService scheduledExecutorService() {
// return Desktop.instance.getExecutorService();
// }
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(3);
}
}

View File

@@ -0,0 +1,17 @@
package com.ecep.contract.api.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ecep.contract.SpringApp;
@RestController
@RequestMapping("/api/bean")
public class BeanApiController {
@GetMapping("/getBeanDefinitionNames")
public String[] getBeanDefinitionNames() {
return SpringApp.getBeanFactory().getBeanDefinitionNames();
}
}

View File

@@ -1,15 +1,15 @@
package com.ecep.contract.api.controller;
import static com.ecep.contract.SpringApp.getBean;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.data.domain.Sort;
import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
@@ -23,13 +23,15 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ecep.contract.ds.other.service.EmployeeAuthBindService;
import com.ecep.contract.ds.other.service.EmployeeLoginHistoryService;
import com.ecep.contract.ds.other.service.EmployeeService;
import com.ecep.contract.model.Employee;
import com.ecep.contract.model.EmployeeAuthBind;
import com.ecep.contract.model.EmployeeLoginHistory;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpSession;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RestController
@RequestMapping("/api")
@@ -100,6 +102,8 @@ public class LoginApiController {
// 设置认证结果到SecurityContext
SecurityContextHolder.getContext().setAuthentication(authentication);
session.setAttribute("ip", matched.getIp());
session.setAttribute("mac", matched.getMac());
} else {
// 创建认证令牌
@@ -125,6 +129,23 @@ public class LoginApiController {
result.put("username", employee.getAccount());
result.put("roles", currentUser.getAuthorities());
result.put("message", "登录成功");
// 登录成功后,发送系统通知
EmployeeLoginHistoryService employeeLoginHistoryService = getBean(EmployeeLoginHistoryService.class);
EmployeeLoginHistory employeeLoginHistory = new EmployeeLoginHistory();
employeeLoginHistory.setEmployee(employee);
String userId = (String) session.getAttribute("ip");
if (userId == null) {
userId = request.getRemoteAddr();
}
employeeLoginHistory.setIp(userId);
employeeLoginHistory.setMac((String) session.getAttribute("mac"));
employeeLoginHistory.setLoginTime(LocalDateTime.now());
EmployeeLoginHistory saved = employeeLoginHistoryService.save(employeeLoginHistory);
if (saved != null) {
session.setAttribute("loginHistoryId", saved.getId());
}
session.setAttribute("employeeId", employee.getId());
} else {
// 用户不存在
result.put("success", false);

View File

@@ -3,13 +3,21 @@ package com.ecep.contract.cloud.u8;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import com.ecep.contract.MessageHolder;
@@ -20,12 +28,55 @@ import com.ecep.contract.ds.contract.tasker.AbstContractRepairTasker;
/**
* 合同同步任务
*/
public class ContractSyncTask extends AbstContractRepairTasker {
@Component
public class ContractSyncTask extends AbstContractRepairTasker implements InitializingBean, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(ContractSyncTask.class);
private YongYouU8Repository repository;
private ScheduledExecutorService executorService;
private ScheduledFuture<?> scheduleAtFixedRate;
public ContractSyncTask() {
@Autowired
public ContractSyncTask(ScheduledExecutorService executorService) {
this.executorService = executorService;
updateTitle("用友U8系统-同步合同");
System.out.println("合同同步任务启动");
}
@Override
public void afterPropertiesSet() throws Exception {
scheduleAtFixedRate = executorService.scheduleAtFixedRate(() -> {
try {
ContractSyncTask.this.call();
} catch (Exception e) {
logger.error("合同同步任务异常", e);
}
}, 1, 5, TimeUnit.MINUTES);
;
}
public void destroy() throws Exception {
if (scheduleAtFixedRate != null) {
scheduleAtFixedRate.cancel(true);
}
}
@Override
protected void updateMessage(Level level, String message) {
if (level == Level.SEVERE) {
logger.error(message);
}
if (level == Level.WARNING) {
logger.warn(message);
}
if (level == Level.INFO) {
logger.info(message);
}
if (level == Level.FINE) {
logger.debug(message);
}
if (level == Level.FINER) {
logger.trace(message);
}
}
@Override

View File

@@ -8,7 +8,6 @@ import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
@@ -17,6 +16,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import com.ecep.contract.IEntityService;
import com.ecep.contract.QueryService;
import com.ecep.contract.cloud.CloudInfo;
import com.ecep.contract.cloud.CloudInfoRepository;
import com.ecep.contract.cloud.u8.ctx.AbstractYongYouU8Ctx;
@@ -26,13 +26,14 @@ import com.ecep.contract.ds.other.service.EmployeeService;
import com.ecep.contract.ds.vendor.service.CompanyVendorService;
import com.ecep.contract.model.CloudYu;
import com.ecep.contract.model.Company;
import com.fasterxml.jackson.databind.JsonNode;
@Lazy
@Service
@ConditionalOnProperty(name = "cloud.u8.enabled", havingValue = "true")
public class YongYouU8Service implements IEntityService<CloudYu> {
// @ConditionalOnProperty(name = "cloud.u8.enabled", havingValue = "true")
public class YongYouU8Service implements IEntityService<CloudYu>, QueryService<CloudYu> {
private static final Logger logger = LoggerFactory.getLogger(YongYouU8Service.class);
public static final String KEY_HOST_IP = "u8.db.server.ip";
public static final String KEY_DATABASE = "u8.db.database";
public static final String KEY_USER_NAME = "u8.db.server.name";
@@ -146,6 +147,12 @@ public class YongYouU8Service implements IEntityService<CloudYu> {
return cloudYuRepository.findAll(spec, pageable);
}
@Override
public Page<CloudYu> findAll(JsonNode paramsNode, Pageable pageable) {
Specification<CloudYu> spec = null;
return findAll(spec, pageable);
}
@Override
public Specification<CloudYu> getSpecification(String searchText) {
if (!StringUtils.hasText(searchText)) {

View File

@@ -51,8 +51,8 @@ public class SecurityConfig {
http
.authorizeHttpRequests(authorize -> authorize
.requestMatchers("/login.html", "/css/**", "/js/**", "/images/**", "/webjars/**", "/login",
"/error", "/api/login")
.permitAll() // 允许静态资源、登录页面、错误页面和JSON登录API访问
"/error", "/api/login", "/ws/**")
.permitAll() // 允许静态资源、登录页面、错误页面和JSON登录API访问以及WebSocket连接
.anyRequest().authenticated() // 其他所有请求需要认证
)
.csrf(AbstractHttpConfigurer::disable) // 禁用CSRF保护适合开发环境

View File

@@ -0,0 +1,37 @@
package com.ecep.contract.config;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import com.ecep.contract.handler.WebSocketHandler;
/**
* WebSocket配置类
* 启用WebSocket支持并注册WebSocket处理器
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketHandler webSocketHandler;
@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册WebSocket处理器指定路径为/ws允许跨域请求
registry.addHandler(webSocketHandler, "/ws")
.addInterceptors(
new HttpSessionHandshakeInterceptor(List.of("JSESSIONID", "loginHistoryId", "employeeId")))
.setAllowedOrigins("*");
}
}

View File

@@ -0,0 +1,82 @@
package com.ecep.contract.controller;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ecep.contract.service.WebSocketService;
/**
* WebSocket测试控制器
* 提供API端点用于测试WebSocket消息发送功能
*/
@RestController
@RequestMapping("/api/websocket")
public class WebSocketController {
private final WebSocketService webSocketService;
@Autowired
public WebSocketController(WebSocketService webSocketService) {
this.webSocketService = webSocketService;
}
/**
* 发送广播消息
*
* @param payload 消息负载包含message字段
* @return 响应实体
*/
@PostMapping("/broadcast")
public ResponseEntity<?> broadcastMessage(@RequestBody Map<String, String> payload) {
String message = payload.get("message");
if (message == null || message.isEmpty()) {
return ResponseEntity.badRequest().body("消息内容不能为空");
}
webSocketService.broadcastMessage(message);
return ResponseEntity.ok(Map.of(
"success", true,
"message", "消息已广播",
"activeSessions", webSocketService.getActiveSessionCount()));
}
/**
* 发送系统通知
*
* @param payload 通知负载包含notification字段
* @return 响应实体
*/
@PostMapping("/notification")
public ResponseEntity<?> sendNotification(@RequestBody Map<String, String> payload) {
String notification = payload.get("notification");
if (notification == null || notification.isEmpty()) {
return ResponseEntity.badRequest().body("通知内容不能为空");
}
webSocketService.sendSystemNotification(notification);
return ResponseEntity.ok(Map.of(
"success", true,
"message", "通知已发送",
"activeSessions", webSocketService.getActiveSessionCount()));
}
/**
* 获取WebSocket连接状态
*
* @return 响应实体,包含当前活跃连接数
*/
@GetMapping("/status")
public ResponseEntity<?> getStatus() {
return ResponseEntity.ok(Map.of(
"success", true,
"activeSessions", webSocketService.getActiveSessionCount(),
"message", "WebSocket服务运行正常"));
}
}

View File

@@ -30,6 +30,7 @@ import org.springframework.util.StringUtils;
import com.ecep.contract.IEntityService;
import com.ecep.contract.MyDateTimeUtils;
import com.ecep.contract.QueryService;
import com.ecep.contract.cloud.rk.CloudRkService;
import com.ecep.contract.cloud.tyc.CloudTycService;
import com.ecep.contract.cloud.u8.YongYouU8Service;
@@ -47,6 +48,7 @@ import com.ecep.contract.model.CompanyOldName;
import com.ecep.contract.model.CompanyVendor;
import com.ecep.contract.util.MyStringUtils;
import com.ecep.contract.util.SpecificationUtils;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery;
@@ -60,7 +62,7 @@ import jakarta.transaction.Transactional;
@Lazy
@Service
@CacheConfig(cacheNames = "company")
public class CompanyService implements IEntityService<Company> {
public class CompanyService implements IEntityService<Company>, QueryService<Company> {
private static final Logger logger = LoggerFactory.getLogger(CompanyService.class);
private static final String COMPANY_BASE_PATH = "company.base.path";
@@ -122,6 +124,12 @@ public class CompanyService implements IEntityService<Company> {
return companyRepository.findAll(spec, pageable);
}
@Override
public Page<Company> findAll(JsonNode paramsNode, Pageable pageable) {
Specification<Company> spec = null;
return findAll(spec, pageable);
}
/**
* 查找
* 重复的删除

View File

@@ -1,20 +1,12 @@
package com.ecep.contract.ds.other.repository;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Repository;
import com.ecep.contract.ds.MyRepository;
import com.ecep.contract.model.EmployeeLoginHistory;
@Lazy
@Repository
public interface EmployeeLoginHistoryRepository extends
// JDBC interfaces
CrudRepository<EmployeeLoginHistory, Integer>,
PagingAndSortingRepository<EmployeeLoginHistory, Integer>,
// JPA interfaces
JpaRepository<EmployeeLoginHistory, Integer>, JpaSpecificationExecutor<EmployeeLoginHistory> {
public interface EmployeeLoginHistoryRepository extends MyRepository<EmployeeLoginHistory, Integer> {
}

View File

@@ -3,8 +3,6 @@ package com.ecep.contract.ds.other.repository;
import java.util.Optional;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import com.ecep.contract.ds.MyRepository;
@@ -20,9 +18,4 @@ public interface EmployeeRepository extends MyRepository<Employee, Integer> {
Optional<Employee> findByAlias(String alias);
Optional<Employee> findByCode(String personCode);
@Modifying
@Query(value = "update EMPLOYEE_LOGIN_HISTORY e set e.LATEST_ACTIVE = now() where e.id = ?1", nativeQuery = true)
void updateActive(int sessionId);
}

View File

@@ -141,11 +141,6 @@ public class EmployeeService implements IEntityService<Employee> {
return employeeRepository.findAll(spec, Pageable.ofSize(10)).getContent();
}
@Transactional
public void updateActive(int sessionId) {
employeeRepository.updateActive(sessionId);
}
@Transactional
public List<EmployeeRole> getRolesByEmployeeId(int employeeId) {
Optional<Employee> optional = employeeRepository.findById(employeeId);

View File

@@ -0,0 +1,9 @@
package com.ecep.contract.handler;
import lombok.Data;
@Data
public class MessageNotitfication {
private String type;
private String content;
}

View File

@@ -0,0 +1,370 @@
package com.ecep.contract.handler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import com.ecep.contract.IEntityService;
import com.ecep.contract.QueryService;
import com.ecep.contract.SpringApp;
import com.ecep.contract.ds.other.service.EmployeeLoginHistoryService;
import com.ecep.contract.ds.other.service.EmployeeService;
import com.ecep.contract.model.Employee;
import com.ecep.contract.model.EmployeeLoginHistory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Data;
/**
* WebSocket处理器
* 处理与客户端的WebSocket连接、消息传递和断开连接
*/
@Component
public class WebSocketHandler extends TextWebSocketHandler {
private final ObjectMapper objectMapper;
private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
@Autowired
private EmployeeLoginHistoryService employeeLoginHistoryService;
@Autowired
private EmployeeService employeeService;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
// 存储所有活跃的WebSocket会话
private final Map<Integer, SessionInfo> activeSessions = Collections.synchronizedMap(new HashMap<>());
@Data
static class SessionInfo {
private Integer employeeId;
private Integer loginHistoryId;
private WebSocketSession session;
private ScheduledFuture<?> schedule;
void click() {
try {
session.sendMessage(new PingMessage(ByteBuffer.wrap("ping".getBytes())));
} catch (IOException e) {
logger.error("发送ping消息失败 (会话ID: " + session.getId() + "): " + e.getMessage(), e);
}
}
}
WebSocketHandler(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
/**
* 连接建立时调用
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 添加会话到活跃会话集合
SessionInfo sessionInfo = new SessionInfo();
sessionInfo.setSession(session);
sessionInfo.setLoginHistoryId((Integer) session.getAttributes().get("loginHistoryId"));
sessionInfo.setEmployeeId((Integer) session.getAttributes().get("employeeId"));
if (sessionInfo.getEmployeeId() == null) {
logger.error("会话未绑定用户: " + session.getId());
session.close();
return;
}
activeSessions.put(sessionInfo.getEmployeeId(), sessionInfo);
System.out.println(sessionInfo.getLoginHistoryId());
System.out.println(sessionInfo.getEmployeeId());
logger.info("WebSocket连接已建立: " + session.getId());
Employee employee = employeeService.findById(sessionInfo.getEmployeeId());
if (employee == null) {
logger.error("未找到用户: #" + sessionInfo.getEmployeeId());
return;
}
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(sessionInfo::click, 30, TimeUnit.SECONDS);
sessionInfo.setSchedule(schedule);
}
/**
* 接收文本消息时调用
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
logger.info("收到来自客户端的消息: " + payload + " (会话ID: " + session.getId() + ")");
// 处理文本格式的ping消息
if ("ping".equals(payload)) {
// 回复文本格式的pong消息
try {
session.sendMessage(new TextMessage("pong"));
logger.info("发送pong响应到会话: " + session.getId());
} catch (IOException e) {
logger.error("发送pong响应失败 (会话ID: " + session.getId() + "): " + e.getMessage(), e);
}
return;
}
if (handleAsMessageCallback(session, payload)) {
return;
}
// 处理其他类型的文本消息
logger.info("处理普通消息: " + payload);
}
private boolean handleAsMessageCallback(WebSocketSession session, String payload)
throws JsonProcessingException, IOException {
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(payload);
} catch (Exception e) {
logger.warn(payload, e);
return false;
}
jsonNode = objectMapper.readTree(payload);
if (!jsonNode.has("messageId")) {
// 没有 messageId 的消息不处理
return false;
}
String messageId = jsonNode.get("messageId").asText();
if (!jsonNode.has("service")) {
sendError(session, messageId, "缺失 service 参数");
return true;
}
String serviceName = jsonNode.get("service").asText();
Object service = null;
try {
service = SpringApp.getBean(serviceName);
} catch (Exception e) {
sendError(session, messageId, "未找到服务: " + serviceName);
return true;
}
if (!jsonNode.has("method")) {
sendError(session, messageId, "缺失 method 参数");
return true;
}
String methodName = jsonNode.get("method").asText();
try {
Object result = null;
if (methodName.equals("findAll")) {
result = invokerFindAllMethod(service, jsonNode.get("arguments"));
} else if (methodName.equals("findById")) {
result = invokerFindByIdMethod(service, jsonNode.get("arguments"));
} else if (methodName.equals("save")) {
result = invokerSaveMethod(service, jsonNode.get("arguments"));
} else if (methodName.equals("delete")) {
result = invokerDeleteMethod(service, jsonNode.get("arguments"));
} else {
sendError(session, messageId, "未实现的方法: " + methodName);
return true;
}
ObjectNode objectNode = objectMapper.createObjectNode();
objectNode.put("messageId", messageId);
objectNode.set("data", objectMapper.valueToTree(result));
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(objectNode)));
} catch (BeansException | IOException | NoSuchElementException e) {
sendError(session, messageId, e.getMessage());
}
ObjectNode objectNode = objectMapper.createObjectNode();
objectNode.put("messageId", messageId);
objectNode.put("success", false);
objectNode.put("message", "未找到服务: " + serviceName);
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(objectNode)));
return true;
}
private Object invokerDeleteMethod(Object service, JsonNode paramsNode) {
JsonNode param = paramsNode.get(0);
IEntityService<Object> entityService = (IEntityService<Object>) service;
Object entity = entityService.findById(param.get("id").asInt());
if (entity == null) {
throw new NoSuchElementException("未找到实体: " + param.get("id").asInt());
}
entityService.delete(entity);
return entity;
}
private Object invokerSaveMethod(Object service, JsonNode paramsNode) throws JsonMappingException {
JsonNode param = paramsNode.get(0);
IEntityService<Object> entityService = (IEntityService<Object>) service;
Object entity = entityService.findById(param.get("id").asInt());
objectMapper.updateValue(entity, param);
return entityService.save(entity);
}
private Object invokerFindByIdMethod(Object service, JsonNode argumentsNode) {
JsonNode paramsNode = argumentsNode.get(0);
Integer id = paramsNode.get(0).asInt();
IEntityService<?> entityService = (IEntityService<?>) service;
return entityService.findById(id);
}
private Object invokerFindAllMethod(Object service, JsonNode argumentsNode) {
JsonNode paramsNode = argumentsNode.get(0);
JsonNode pageableNode = argumentsNode.get(1);
// 从pageableNode中解析分页参数
int pageNumber = 0;
int pageSize = 10;
Sort sort = Sort.unsorted();
if (pageableNode.has("pageNumber")) {
pageNumber = pageableNode.get("pageNumber").asInt();
}
if (pageableNode.has("pageSize")) {
pageSize = pageableNode.get("pageSize").asInt();
}
// 处理排序信息
if (pageableNode.has("sort")) {
JsonNode sortNode = pageableNode.get("sort");
// 检查是否有排序字段
if (sortNode.has("sorted") && sortNode.get("sorted").asBoolean() && sortNode.has("orders")) {
JsonNode ordersNode = sortNode.get("orders");
if (ordersNode.isArray() && ordersNode.size() > 0) {
List<Sort.Order> orders = new ArrayList<>();
for (JsonNode orderNode : ordersNode) {
if (orderNode.has("property") && orderNode.has("direction")) {
String property = orderNode.get("property").asText();
String direction = orderNode.get("direction").asText();
orders.add(new Sort.Order(Sort.Direction.valueOf(direction.toUpperCase()),
property));
}
}
if (!orders.isEmpty()) {
sort = Sort.by(orders);
}
}
}
}
Pageable pageable = PageRequest.of(pageNumber, pageSize, sort);
QueryService<?> entityService = (QueryService<?>) service;
return entityService.findAll(paramsNode, pageable);
}
private void sendError(WebSocketSession session, String messageId, String string)
throws JsonProcessingException, IOException {
ObjectNode objectNode = objectMapper.createObjectNode();
objectNode.put("messageId", messageId);
objectNode.put("success", false);
objectNode.put("message", messageId);
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(objectNode)));
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
logger.info("收到来自客户端的二进制消息: " + message.getPayload() + " (会话ID: " + session.getId() + ")");
// 处理二进制消息
// 例如,将字节数组转换为字符串
String binaryData = new String(message.getPayload().array());
logger.info("二进制消息内容: " + binaryData);
}
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
logger.info("收到来自客户端的Pong消息: " + message.getPayload() + " (会话ID: " + session.getId() + ")");
}
/**
* 连接关闭时调用
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
logger.info(
"WebSocket连接已关闭: " + session.getId() + ", 状态码: " + status.getCode() + ", 原因: " + status.getReason());
// 从活跃会话集合中移除会话
SessionInfo sessionInfo = activeSessions.remove(session.getAttributes().get("employeeId"));
if (sessionInfo == null) {
return;
}
ScheduledFuture<?> schedule = sessionInfo.getSchedule();
if (schedule != null) {
schedule.cancel(true);
sessionInfo.setSchedule(null);
}
Integer loginHistoryId = sessionInfo.getLoginHistoryId();
if (loginHistoryId != null) {
EmployeeLoginHistory history = employeeLoginHistoryService.findById(loginHistoryId);
history.setActiveTime(LocalDateTime.now());
employeeLoginHistoryService.save(history);
}
}
/**
* 处理传输错误
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
logger.error("WebSocket传输错误 (会话ID: " + session.getId() + "): " + exception.getMessage(), exception);
// 连接出错时关闭会话
if (session.isOpen()) {
session.close(CloseStatus.SERVER_ERROR);
}
}
/**
* 向所有连接的客户端广播消息
*/
public void broadcast(TextMessage message) {
synchronized (activeSessions) {
for (SessionInfo sessionInfo : activeSessions.values()) {
WebSocketSession session = sessionInfo.getSession();
if (session.isOpen()) {
try {
session.sendMessage(message);
logger.info("广播消息到会话: " + session.getId());
} catch (IOException e) {
logger.error("广播消息失败 (会话ID: " + session.getId() + "): " + e.getMessage(), e);
}
}
}
}
}
/**
* 获取当前活跃会话数
*/
public int getActiveSessionCount() {
return activeSessions.size();
}
}

View File

@@ -0,0 +1,125 @@
package com.ecep.contract.service;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import com.ecep.contract.handler.MessageNotitfication;
import com.ecep.contract.handler.WebSocketHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* WebSocket消息服务
* 提供向客户端发送WebSocket消息的功能
*/
@Service
public class WebSocketService {
private static final Logger logger = Logger.getLogger(WebSocketService.class.getName());
private final WebSocketHandler webSocketHandler;
@Autowired
private ObjectMapper objectMapper;
public WebSocketService(WebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}
/**
* 向所有连接的客户端广播消息
*
* @param message 要广播的消息内容
*/
public void broadcastMessage(String message) {
try {
TextMessage textMessage = new TextMessage(message);
webSocketHandler.broadcast(textMessage);
logger.info("广播消息: " + message);
} catch (Exception e) {
logger.log(Level.SEVERE, "广播消息失败: " + e.getMessage(), e);
}
}
/**
* 向特定会话发送消息
*
* @param session WebSocket会话
* @param message 要发送的消息内容
*/
public void sendMessage(WebSocketSession session, String message) {
try {
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(message));
logger.info("向会话发送消息: " + session.getId() + ", 消息: " + message);
}
} catch (IOException e) {
logger.log(Level.SEVERE, "向会话发送消息失败: " + session.getId() + ", 原因: " + e.getMessage(), e);
}
}
/**
* 获取当前活跃的WebSocket会话数量
*
* @return 活跃会话数
*/
public int getActiveSessionCount() {
return webSocketHandler.getActiveSessionCount();
}
/**
* 发送系统通知
*
* @param notification 通知内容
*/
public void sendSystemNotification(String notification) {
MessageNotitfication messageNotitfication = new MessageNotitfication();
messageNotitfication.setType("notification");
messageNotitfication.setContent(notification);
try {
String jsonMessage = objectMapper.writeValueAsString(messageNotitfication);
broadcastMessage(jsonMessage);
} catch (Exception e) {
logger.log(Level.SEVERE, "发送系统通知失败: " + e.getMessage(), e);
}
}
/**
* 发送业务数据更新通知
*
* @param dataType 数据类型
* @param operation 操作类型create, update, delete
* @param id 数据ID
*/
public void sendDataUpdate(String dataType, String operation, String id) {
String message = String.format(
"{\"type\":\"data_update\",\"dataType\":\"%s\",\"operation\":\"%s\",\"id\":\"%s\"}",
escapeJson(dataType),
escapeJson(operation),
escapeJson(id));
broadcastMessage(message);
}
/**
* 转义JSON字符串中的特殊字符
*
* @param text 要转义的文本
* @return 转义后的文本
*/
private String escapeJson(String text) {
if (text == null) {
return "";
}
return text
.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\b", "\\b")
.replace("\f", "\\f")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}
}