Files
contract-manager/docs/task/client_server_tasker_communication_rules.md
songqq 45f7b611c5 feat: 实现VoableService接口并重构相关服务
refactor: 优化WebSocket通信和任务处理逻辑

fix: 修复客户和供应商路径选择功能

docs: 更新任务通信规则文档

build: 更新项目版本至0.0.86-SNAPSHOT

style: 清理无用导入和日志输出

test: 添加CustomerFileMoveTasker测试类

chore: 更新tasker_mapper.json注册信息
2025-09-25 18:57:17 +08:00

22 KiB
Raw Blame History

客户端 Tasker 至 服务器端 Tasker 通信规则与逻辑

本文档总结了 Contract-Manager 项目中客户端 Tasker 与服务器端 Tasker 之间的通信规则、调用逻辑和实现模式,基于对以下文件的分析:

1. 架构设计原则

项目采用了清晰的客户端-服务器分离架构,任务处理遵循以下原则:

  • 客户端轻量级:负责任务发起、参数传递和结果展示
  • 服务器端重量级:负责实际业务逻辑处理和数据操作
  • WebSocket通信使用WebSocket实现客户端与服务器端的任务通信和进度同步

1.1 核心通信组件

WebSocketClientService

WebSocketClientService类是客户端WebSocket通信的核心服务组件负责建立、维护与服务器的WebSocket连接并提供消息发送和接收的功能。主要职责包括

  • 连接管理初始化WebSocket连接、处理连接关闭和重连逻辑
  • 心跳维护:定期发送心跳消息保持连接活跃
  • 会话管理创建和管理WebSocket会话WebSocketClientSession
  • 消息路由:接收并路由服务器消息到对应的会话处理器
  • 服务调用:提供invoke方法调用服务器端服务

WebSocketClientSession

WebSocketClientSession类代表一个特定的WebSocket会话每个Tasker任务执行时都会创建一个对应的会话实例。主要职责包括

  • 会话标识维护唯一的会话ID
  • 任务提交将Tasker任务提交到服务器端执行
  • 消息处理:处理服务器返回的各类消息(包括进度更新、属性更新、状态变更等)
  • 属性同步根据服务器消息更新Tasker的属性值

2. 类命名与结构规范

2.1 命名规则

  • 客户端与服务器端的对应Tasker类必须使用相同的类名(如示例中的CompanyCustomerEvaluationFormUpdateTask
  • 客户端Tasker位于client模块的控制器包下
  • 服务器端Tasker位于server模块的tasker包下

2.2 任务名称注册规则

  • 所有服务器端Tasker类必须通过配置文件tasker_mapper.json进行注册,并由WebSocketServerTaskManager类在afterPropertiesSet方法中加载
  • 注册格式为"TaskClassName": "fully.qualified.ClassName"
  • 注册的Task名称将用于客户端和服务器端之间的任务识别
  • 配置文件tasker_mapper.json应位于src/main/resources目录下,格式如下:
{
    "taskers": {
        "TaskClassName": "fully.qualified.ClassName",
        // 更多任务映射...
    }
}

任务名称注册实现示例

@Override
public void afterPropertiesSet() throws Exception {
    // 从tasker_mapper.json文件中加载任务注册信息
    try {
        Resource resource = resourceLoader.getResource("classpath:tasker_mapper.json");
        try (InputStream inputStream = resource.getInputStream()) {
            JsonNode rootNode = objectMapper.readTree(inputStream);
            JsonNode taskersNode = rootNode.get("taskers");
            if (taskersNode != null && taskersNode.isObject()) {
                Map<String, String> taskMap = new java.util.HashMap<>();
                taskersNode.fields().forEachRemaining(entry -> {
                    taskMap.put(entry.getKey(), entry.getValue().asText());
                });
                taskClzMap = taskMap;
            }
        }
    } catch (Exception e) {
        logger.error("Failed to load tasker_mapper.json", e);
        // 使用默认值作为fallback
        taskClzMap = Map.of();
    }
}

2.3 接口实现区分

2.4 继承关系

3. 客户端Tasker实现规则

客户端Tasker是任务的发起方需要遵循以下实现规则

3.1 核心属性

  • 通常包含一个可设置的业务对象(如示例中的@Setter private CompanyCustomerVo customer;

3.2 核心方法实现

  • getTaskName():返回任务名称,通常使用类名
  • updateProgress():继承或重写进度更新方法
  • execute():调用callRemoteTask()方法将任务发送到服务器端,传递必要参数, 参数类型只允许基本类和Vo类对象
  • updateProgress()继承或重写进度更新方法public用于接收服务器端发送的进度更新消息

3.3 示例实现

public class CompanyCustomerEvaluationFormUpdateTask extends Tasker<Object> implements WebSocketClientTasker {
    private static final Logger logger = LoggerFactory.getLogger(CompanyCustomerEvaluationFormUpdateTask.class);

    @Setter
    private CompanyCustomerVo customer;  // 业务对象

    @Override
    public String getTaskName() {
        return getClass().getSimpleName();
    }

    @Override
    protected Object execute(MessageHolder holder) throws Exception {
        updateTitle("客户评估表更新任务");  // 设置任务标题
        // 调用远程任务传递locale和业务对象ID
        return callRemoteTask(holder, getLocale(), customer.getId());
    }
}

4. 服务器端Tasker实现规则

服务器端Tasker是任务的实际执行者需要遵循以下实现规则

4.1 参数接收

  • 实现init(JsonNode argsNode)方法接收客户端传递的参数
  • 从参数中提取业务对象ID并加载完整业务对象

4.2 服务获取

  • 通过getCachedBean(Service.class)方法获取所需的服务实例
  • 可以提供辅助方法封装服务获取逻辑

4.3 任务执行

  • 实现execute(MessageHolder holder)方法包含实际业务逻辑
  • 使用holder.info()/error()等方法记录任务执行状态
  • 调用updateProgress()方法更新任务进度
  • 调用updateTitle()方法更新任务标题
  • 调用updateProperty()方法更新任务属性
  • holderupdateProgress()updateTitle()updateProperty() 等方法通过 webSocket 把数据传输到 client 端的 taskerclient 端的 tasker 收到数据后会调用对应的方法更新任务属性

4.4 示例实现

public class CompanyCustomerEvaluationFormUpdateTask extends Tasker<Object> implements WebSocketServerTasker {
    private CompanyCustomer customer;  // 业务对象

    @Override
    public void init(JsonNode argsNode) {
        // 从参数中提取业务对象ID并加载
        int customerId = argsNode.get(0).asInt();
        customer = getCachedBean(CompanyCustomerService.class).findById(customerId);
    }

    // 辅助方法获取服务
    CompanyCustomerFileService getCompanyCustomerFileService() {
        return getCachedBean(CompanyCustomerFileService.class);
    }

    @Override
    protected Object execute(MessageHolder holder) throws Exception {
        // 执行实际业务逻辑
        updateEvaluationForm(holder);
        return null;
    }

    // 具体业务逻辑实现
    public void updateEvaluationForm(MessageHolder holder) {
        // 业务逻辑代码...
        updateProgress(1, 10);  // 更新进度
        // 更多业务逻辑...
    }
}

5. 客户端Tasker调用流程

在UI控制器中调用客户端Tasker的标准流程如下

5.1 实例化与参数设置

  1. 创建客户端Tasker实例
  2. 设置必要的业务对象参数

5.2 任务执行与监控

  1. 使用UITools.showTaskDialogAndWait()显示任务对话框
  2. 调用initializeTask()初始化任务执行环境
  3. 通过消费者函数处理任务消息

5.3 任务完成处理

  1. 任务完成后执行必要的数据刷新操作

5.4 示例调用

public void onUpdateEvaluationFormAction(ActionEvent event) {
    // 1. 创建Tasker并设置参数
    CompanyCustomerEvaluationFormUpdateTask task = new CompanyCustomerEvaluationFormUpdateTask();
    task.setCustomer(getCompanyCustomerService().findById(viewModel.getId().get()));
    
    // 2. 显示任务对话框并执行任务
    UITools.showTaskDialogAndWait("更新评价表", task, consumer -> {
        initializeTask(task, "更新评价表", msg -> consumer.accept(Message.info(msg)));
    });
    
    // 3. 任务完成后刷新数据
    loadTableDataSet();
}

6. 任务进度管理

  • 服务器端使用updateProgress(current, total)方法更新任务进度
  • 进度值通常以0-1000或类似小范围数值表示完成百分比
  • 客户端通过WebSocket接收进度更新并显示

7. 异常处理机制

  • 服务器端使用MessageHolder.error()方法记录错误信息
  • 客户端通过任务对话框展示错误信息
  • 服务器端在关键操作点进行异常捕获和处理

9. 数据一致性保障

  • 任务完成后客户端通常调用数据刷新方法(如loadTableDataSet()确保UI显示最新数据
  • 服务器端负责业务数据的持久化操作

10. WebSocketServerTaskManager功能与作用

WebSocketServerTaskManager是服务器端管理WebSocket任务的核心组件负责处理客户端发起的任务请求创建并执行对应的任务实例并维护任务执行过程中的双向通信。

10.1 核心职责

  • 任务注册管理:从tasker_mapper.json文件加载并管理任务名称与对应实现类的映射关系
  • 消息处理分发:处理客户端发送的任务创建请求
  • 任务实例化根据任务名称动态创建服务器端Tasker实例
  • 处理器设置为Tasker设置各类回调处理器实现服务器端到客户端的消息推送
  • 异步任务执行:使用虚拟线程异步执行任务
  • 消息通信:维护任务执行过程中的双向通信,包括进度更新、状态通知等
  • 异常处理:处理任务执行过程中的异常情况并通知客户端

10.2 任务注册机制

WebSocketServerTaskManager在初始化阶段(通过实现InitializingBean接口的afterPropertiesSet()方法)从tasker_mapper.json文件中加载任务注册信息:

@Override
public void afterPropertiesSet() throws Exception {
    // 从tasker_mapper.json文件中加载任务注册信息
    try {
        Resource resource = resourceLoader.getResource("classpath:tasker_mapper.json");
        try (InputStream inputStream = resource.getInputStream()) {
            JsonNode rootNode = objectMapper.readTree(inputStream);
            JsonNode taskersNode = rootNode.get("taskers");
            if (taskersNode != null && taskersNode.isObject()) {
                Map<String, String> taskMap = new java.util.HashMap<>();
                taskersNode.fields().forEachRemaining(entry -> {
                    taskMap.put(entry.getKey(), entry.getValue().asText());
                });
                taskClzMap = taskMap;
            }
        }
    } catch (Exception e) {
        logger.error("Failed to load tasker_mapper.json", e);
        // 使用默认值作为fallback
        taskClzMap = Map.of();
    }
}

10.2 消息处理流程

WebSocketServerTaskManager的消息处理流程如下:

  1. 接收消息:通过onMessage方法接收来自WebSocketSession的消息
  2. 解析会话ID:从消息中提取会话标识
  3. 分发处理:调用handleAsSessionCallback根据消息类型进行分发
  4. 异常处理:捕获处理过程中的异常并发送错误消息
public void onMessage(WebSocketSession session, JsonNode jsonNode) {
    // 处理 sessionId 的消息
    String sessionId = jsonNode.get(WebSocketConstant.SESSION_ID_FIELD_NAME).asText();
    try {
        handleAsSessionCallback(session, sessionId, jsonNode);
    } catch (Exception e) {
        sendError(session, sessionId, e.getMessage());
        logger.warn("处理会话回调失败 (会话ID: {}): {}", sessionId, e.getMessage(), e);
    }
}

private void handleAsSessionCallback(WebSocketSession session, String sessionId, JsonNode jsonNode) {
    if (!jsonNode.has("type")) {
        throw new IllegalArgumentException("缺失 type 参数");
    }
    String type = jsonNode.get("type").asText();
    if (type.equals("createTask")) {
        createTask(session, sessionId, jsonNode);
    }
}

private void sendError(WebSocketSession session, String sessionId, String message) {
    if (session == null || !session.isOpen()) {
        logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message);
        return;
    }

    try {
        String errorMessage = objectMapper.writeValueAsString(Map.of(
                WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
                WebSocketConstant.SUCCESS_FIELD_VALUE, false,
                WebSocketConstant.MESSAGE_FIELD_NAME, message));

        // 检查会话状态并尝试发送错误消息
        if (session.isOpen()) {
            session.sendMessage(new TextMessage(errorMessage));
        } else {
            logger.warn("会话已关闭,无法发送错误消息: {}", message);
        }
    } catch (Exception e) {
        // 捕获所有可能的异常,防止影响主流程
        logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
    }
}

10.3 任务创建与初始化

createTask()方法负责创建和初始化任务实例:

  1. 参数提取:从消息中提取任务名称和参数
  2. 类型查找:根据任务名称查找对应的类名
  3. 实例创建:通过反射创建任务实例
  4. 国际化设置设置任务的Locale信息
  5. 处理器绑定:为任务绑定各类消息处理器
  6. 参数初始化:调用任务的init()方法进行参数初始化
  7. 异步执行:使用虚拟线程启动任务执行
private void createTask(WebSocketSession session, String sessionId, JsonNode jsonNode) {
    if (!jsonNode.has(WebSocketConstant.ARGUMENTS_FIELD_NAME)) {
        throw new IllegalArgumentException("缺失 " + WebSocketConstant.ARGUMENTS_FIELD_NAME + " 参数");
    }
    JsonNode argsNode = jsonNode.get(WebSocketConstant.ARGUMENTS_FIELD_NAME);

    String taskName = argsNode.get(0).asText();
    String clzName = taskClzMap.get(taskName);
    if (clzName == null) {
        throw new IllegalArgumentException("未知的任务类型: " + taskName);
    }
    Object tasker = null;
    try {
        Class<?> clz = Class.forName(clzName);
        tasker = clz.getDeclaredConstructor().newInstance();
    } catch (ClassNotFoundException e) {
        throw new IllegalArgumentException("未知的任务类型: " + taskName + ", class: " + clzName);
    } catch (Exception e) {
        throw new IllegalArgumentException("任务类型: " + taskName + ", class: " + clzName + " 实例化失败");
    }

    if (tasker instanceof Tasker<?> t) {
        String locale = argsNode.get(1).asText();
        t.setLocale(Locale.forLanguageTag(locale));
    }

    if (tasker instanceof WebSocketServerTasker t) {
        t.setTitleHandler(title -> sendToSession(session, sessionId, "title", title));
        t.setMessageHandler(msg -> sendMessageToSession(session, sessionId, msg));
        t.setPropertyHandler((name, value) -> sendToSession(session, sessionId, "property", name, value));
        t.setProgressHandler((current, total) -> sendToSession(session, sessionId, "progress", current, total));
        t.init(argsNode.get(2));
    }

    if (tasker instanceof Callable<?> callable) {
        Thread.ofVirtual().start(() -> {
            try {
                sendToSession(session, sessionId, "start");
                callable.call();
                sendToSession(session, sessionId, "done");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}

10.4 消息通信机制

WebSocketServerTaskManager提供了多种消息发送方法,实现服务器端到客户端的通信:

  • sendToSession()发送各类消息title、property、progress、start、done等
  • sendMessageToSession():专门发送带级别的消息
  • sendError():发送错误消息给客户端
private boolean sendToSession(WebSocketSession session, String sessionId, String type, Object... args) {
    try {
        String text = objectMapper.writeValueAsString(Map.of(
                WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
                "type", type,
                WebSocketConstant.ARGUMENTS_FIELD_NAME, args));
        session.sendMessage(new TextMessage(text));
    } catch (IOException e) {
        // 捕获所有可能的异常,防止影响主流程
        logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
    }
    return true;
}

private boolean sendMessageToSession(WebSocketSession session, String sessionId, Message msg) {
    return sendToSession(session, sessionId, "message", msg.getLevel().getName(), msg.getMessage());
}

private void sendError(WebSocketSession session, String sessionId, String message) {
    if (session == null || !session.isOpen()) {
        logger.warn("尝试向已关闭的WebSocket会话发送错误消息: {}", message);
        return;
    }

    try {
        String errorMessage = objectMapper.writeValueAsString(Map.of(
                WebSocketConstant.SESSION_ID_FIELD_NAME, sessionId,
                WebSocketConstant.SUCCESS_FIELD_VALUE, false,
                WebSocketConstant.MESSAGE_FIELD_NAME, message));

        // 检查会话状态并尝试发送错误消息
        if (session.isOpen()) {
            session.sendMessage(new TextMessage(errorMessage));
        } else {
            logger.warn("会话已关闭,无法发送错误消息: {}", message);
        }
    } catch (Exception e) {
        // 捕获所有可能的异常,防止影响主流程
        logger.error("发送错误消息失败 (会话ID: {})", session.getId(), e);
    }
}

#### 10.5 与WebSocketServerHandler的关系

`WebSocketServerTaskManager``WebSocketServerHandler`协同工作共同处理客户端的WebSocket请求

- `WebSocketServerHandler`作为WebSocket消息的入口点接收所有客户端消息
- 对于与任务相关的消息如创建任务`WebSocketServerHandler`将消息转发给`WebSocketServerTaskManager`处理
- `WebSocketServerTaskManager`负责具体的任务创建初始化和执行逻辑
- 两者结合实现了完整的客户端-服务器端Tasker通信机制

## 11. WebSocketClientService工作机制

### 11.1 连接管理流程

`WebSocketClientService`的连接管理流程包括以下几个核心步骤

1. **初始化连接**通过`initWebSocket()`方法创建与服务器的WebSocket连接
2. **连接状态监控**使用`online``message`属性实时反映连接状态
3. **自动重连机制**当连接断开时通过`scheduleReconnect()`方法安排自动重连
4. **心跳维护**通过`startHeartbeat()``heartbeat()`方法定期发送ping消息

### 11.2 消息处理机制

`WebSocketClientService`使用内置的`WebSocketListener`处理各类WebSocket事件

- `onOpen`连接建立时触发启动心跳并更新连接状态
- `onMessage`接收服务器消息时触发解析并路由消息到对应处理器
- `onClosing`/`onClosed`连接关闭时触发停止心跳并准备重连
- `onFailure`连接失败时触发记录错误并尝试重连

### 11.3 消息路由策略

`WebSocketClientService`采用以下策略路由接收到的消息

1. **回调消息**如果消息包含`messageId`则路由到对应的`CompletableFuture`回调
2. **会话消息**如果消息包含`sessionId`则路由到对应的`WebSocketClientSession`处理
3. **错误消息**如果消息包含错误码则按错误类型进行处理如未授权时自动重新登录

## 12. WebSocketClientSession工作机制

### 12.1 会话生命周期管理

每个`WebSocketClientSession`实例的生命周期包括

1. **创建**通过`WebSocketClientService.createSession()`方法创建
2. **任务提交**通过`submitTask()`方法提交Tasker任务到服务器
3. **消息处理**处理服务器返回的各类消息
4. **关闭**任务完成后通过`close()`方法关闭会话

### 12.2 消息类型处理

`WebSocketClientSession`能够处理以下几种主要的消息类型

- **message**普通文本消息通过`handleAsMessage()`处理
- **title**任务标题更新通过`handleAsTitle()`处理
- **property**任务属性更新通过`handleAsProperty()`处理
- **progress**任务进度更新通过`handleAsProgress()`处理
- **start**任务开始通知通过`handleAsStart()`处理
- **done**任务完成通知通过`handleAsDone()`处理

### 12.3 属性同步机制

`WebSocketClientSession``handleAsProperty()`方法实现了服务器到客户端的属性同步机制

1. 接收属性名和属性值
2. 通过Java反射获取Tasker类的属性描述符
3. 将接收到的属性值转换为正确的Java类型
4. 调用属性的setter方法更新Tasker实例的属性值

## 13. 最佳实践建议

1. **任务拆分**复杂任务应拆分为多个小任务便于进度跟踪和错误定位
2. **状态反馈**在关键节点提供清晰的状态信息增强用户体验
3. **资源释放**确保文件流等资源正确关闭避免资源泄露
4. **事务控制**对于涉及多步数据操作的任务考虑使用事务确保数据一致性
5. **错误重试**针对网络波动等临时性问题考虑实现任务重试机制
6. **配置管理**使用`tasker_mapper.json`文件统一管理任务注册信息避免在代码中硬编码任务映射
7. **异常处理**确保实现配置文件加载失败的fallback机制保证系统稳定性
8. **版本控制**对任务配置文件进行版本控制便于追踪变更历史
9. **会话管理**确保任务完成后正确关闭WebSocket会话避免资源泄露
10. **属性同步**合理使用属性同步机制确保客户端显示与服务器状态一致

通过遵循以上规则和模式可以确保Contract-Manager项目中客户端与服务器端Tasker通信的一致性可靠性和可维护性