# 异步任务监控实现方案 ## 现状分析 通过分析项目代码,我们发现当前系统的异步任务管理存在以下特点和不足: 1. **任务调度机制**:系统使用JavaFX的`Task`和Spring的`ScheduledExecutorService`进行异步任务调度 2. **任务基类**:`Tasker`类作为所有任务的基类,实现了基本的任务生命周期管理 3. **进度展示**:`UITools.showTaskDialogAndWait()`方法提供了简单的任务进度对话框 4. **异步编程**:项目广泛使用`CompletableFuture`进行异步操作 5. **主要不足**: - 缺乏统一的任务监控中心 - 任务执行状态缺乏集中管理 - 大部分任务没有超时处理机制 - 任务执行历史无法追溯 - 缺少任务失败后的重试策略 ## 实现方案 ### 1. 任务监控核心组件 #### 1.1 任务监控中心 ```java package com.ecep.contract.manager.ui.task; import com.ecep.contract.manager.ui.Tasker; import javafx.collections.FXCollections; import javafx.collections.ObservableList; import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * 任务监控中心 - 统一管理所有异步任务 */ @Component public class TaskMonitorCenter { private static final Logger logger = LoggerFactory.getLogger(TaskMonitorCenter.class); // 正在运行的任务列表 @Getter private final ObservableList> activeTasks = FXCollections.observableArrayList(); // 任务历史记录 @Getter private final ObservableList taskHistory = FXCollections.observableArrayList(); // 任务ID映射 private final Map> taskMap = new ConcurrentHashMap<>(); // 任务计数器 private final AtomicInteger taskCounter = new AtomicInteger(0); /** * 注册并启动任务 */ public MonitoredTask registerTask(Tasker task) { return registerTask(task, null); } /** * 注册并启动任务,支持设置超时时间 */ public MonitoredTask registerTask(Tasker task, Long timeoutSeconds) { String taskId = generateTaskId(); MonitoredTask monitoredTask = new MonitoredTask<>(task, taskId); // 设置超时处理 if (timeoutSeconds != null && timeoutSeconds > 0) { monitoredTask.setTimeout(timeoutSeconds); } // 添加任务状态变更监听器 monitoredTask.setOnSucceeded(event -> { logger.info("任务[{}]执行成功: {}", taskId, task.getTitle()); recordHistory(monitoredTask, TaskStatus.SUCCEEDED); removeActiveTask(monitoredTask); }); monitoredTask.setOnFailed(event -> { logger.error("任务[{}]执行失败: {}", taskId, task.getTitle(), task.getException()); recordHistory(monitoredTask, TaskStatus.FAILED); removeActiveTask(monitoredTask); }); monitoredTask.setOnCancelled(event -> { logger.info("任务[{}]被取消: {}", taskId, task.getTitle()); recordHistory(monitoredTask, TaskStatus.CANCELLED); removeActiveTask(monitoredTask); }); // 保存任务引用并启动 taskMap.put(taskId, monitoredTask); activeTasks.add(monitoredTask); monitoredTask.start(); logger.info("任务[{}]已注册并启动: {}", taskId, task.getTitle()); return monitoredTask; } /** * 取消任务 */ public boolean cancelTask(String taskId) { MonitoredTask task = taskMap.get(taskId); if (task != null && task.isRunning()) { task.cancel(); return true; } return false; } /** * 取消所有任务 */ public void cancelAllTasks() { for (MonitoredTask task : new ArrayList<>(activeTasks)) { task.cancel(); } } /** * 从活动列表中移除任务 */ private void removeActiveTask(MonitoredTask task) { Platform.runLater(() -> activeTasks.remove(task)); taskMap.remove(task.getTaskId()); } /** * 记录任务历史 */ private void recordHistory(MonitoredTask task, TaskStatus status) { TaskHistory history = new TaskHistory( task.getTaskId(), task.getTitle(), status, task.getStartTime(), System.currentTimeMillis() ); if (task.getException() != null) { history.setErrorMessage(task.getException().getMessage()); } Platform.runLater(() -> { taskHistory.add(0, history); // 只保留最近100条历史记录 if (taskHistory.size() > 100) { taskHistory.remove(taskHistory.size() - 1); } }); } /** * 生成任务ID */ private String generateTaskId() { return "task-" + UUID.randomUUID().toString().substring(0, 8) + "-" + taskCounter.incrementAndGet(); } } ``` #### 1.2 监控任务包装类 ```java package com.ecep.contract.manager.ui.task; import com.ecep.contract.manager.ui.Tasker; import javafx.concurrent.Task; import lombok.Getter; import lombok.Setter; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * 被监控的任务包装类 */ public class MonitoredTask extends Task { @Getter private final Tasker delegate; @Getter private final String taskId; @Getter private final long startTime; @Setter private Long timeoutSeconds; private ScheduledExecutorService timeoutExecutor; private final AtomicBoolean timeoutHandled = new AtomicBoolean(false); public MonitoredTask(Tasker delegate, String taskId) { this.delegate = delegate; this.taskId = taskId; this.startTime = System.currentTimeMillis(); // 绑定属性 titleProperty().bind(delegate.titleProperty()); messageProperty().bind(delegate.messageProperty()); progressProperty().bind(delegate.progressProperty()); } @Override protected T call() throws Exception { // 设置超时监控 if (timeoutSeconds != null) { setupTimeoutMonitor(); } try { return delegate.call(); } finally { // 取消超时监控 if (timeoutExecutor != null) { timeoutExecutor.shutdownNow(); } } } /** * 设置超时监控 */ private void setupTimeoutMonitor() { timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread thread = new Thread(r, "task-timeout-monitor-" + taskId); thread.setDaemon(true); return thread; }); timeoutExecutor.schedule(() -> { if (isRunning() && timeoutHandled.compareAndSet(false, true)) { updateMessage("任务执行超时,正在取消..."); cancel(); } }, timeoutSeconds, TimeUnit.SECONDS); } @Override public boolean cancel() { boolean cancelled = super.cancel(); if (cancelled) { delegate.cancel(); } return cancelled; } /** * 启动任务 */ public void start() { Thread thread = new Thread(this, "task-" + taskId); thread.setDaemon(true); thread.start(); } /** * 获取任务执行耗时 */ public long getExecutionTime() { return System.currentTimeMillis() - startTime; } } ``` #### 1.3 任务状态和历史记录类 ```java package com.ecep.contract.manager.ui.task; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 任务状态枚举 */ public enum TaskStatus { SCHEDULED("已调度"), RUNNING("运行中"), SUCCEEDED("成功"), FAILED("失败"), CANCELLED("已取消"), TIMED_OUT("超时"); private final String description; TaskStatus(String description) { this.description = description; } public String getDescription() { return description; } } /** * 任务历史记录 */ @Data @NoArgsConstructor @AllArgsConstructor public class TaskHistory { private String taskId; private String title; private TaskStatus status; private long startTime; private long endTime; private String errorMessage; /** * 获取任务执行耗时(毫秒) */ public long getExecutionTime() { return endTime - startTime; } /** * 获取格式化的执行耗时 */ public String getFormattedExecutionTime() { long ms = getExecutionTime(); if (ms < 1000) { return ms + "ms"; } else if (ms < 60000) { return String.format("%.2fs", ms / 1000.0); } else { return String.format("%dm %.2fs", ms / 60000, (ms % 60000) / 1000.0); } } } ``` ### 2. 任务监控界面 ```java package com.ecep.contract.manager.ui.task; import com.ecep.contract.manager.SpringApp; import com.ecep.contract.manager.ui.Tasker; import javafx.application.Platform; import javafx.beans.property.SimpleStringProperty; import javafx.collections.ListChangeListener; import javafx.fxml.FXML; import javafx.fxml.FXMLLoader; import javafx.scene.Parent; import javafx.scene.Scene; import javafx.scene.control.*; import javafx.scene.control.cell.PropertyValueFactory; import javafx.stage.Modality; import javafx.stage.Stage; import lombok.SneakyThrows; import org.springframework.stereotype.Component; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * 任务监控界面控制器 */ @Component public class TaskMonitorViewController { @FXML private TableView> activeTasksTable; @FXML private TableColumn, String> activeTaskIdColumn; @FXML private TableColumn, String> activeTaskTitleColumn; @FXML private TableColumn, ProgressBar> activeTaskProgressColumn; @FXML private TableColumn, String> activeTaskStatusColumn; @FXML private TableView historyTasksTable; @FXML private TableColumn historyTaskIdColumn; @FXML private TableColumn historyTaskTitleColumn; @FXML private TableColumn historyTaskStatusColumn; @FXML private TableColumn historyTaskStartTimeColumn; @FXML private TableColumn historyTaskExecutionTimeColumn; @FXML private Button cancelTaskButton; @FXML private Button clearHistoryButton; private final TaskMonitorCenter taskMonitorCenter; private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public TaskMonitorViewController() { this.taskMonitorCenter = SpringApp.getBean(TaskMonitorCenter.class); } @FXML public void initialize() { // 初始化活动任务表格 activeTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId")); activeTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title")); // 进度条列 activeTaskProgressColumn.setCellFactory(column -> new TableCell<>() { private final ProgressBar progressBar = new ProgressBar(); @Override protected void updateItem(ProgressBar item, boolean empty) { super.updateItem(item, empty); if (empty || getTableRow().getItem() == null) { setGraphic(null); } else { MonitoredTask task = getTableRow().getItem(); progressBar.progressProperty().bind(task.progressProperty()); setGraphic(progressBar); } } }); // 状态列 activeTaskStatusColumn.setCellValueFactory(cellData -> { MonitoredTask task = cellData.getValue(); String status; if (task.isRunning()) { status = "运行中"; } else if (task.isCancelled()) { status = "已取消"; } else if (task.isFailed()) { status = "失败"; } else if (task.isSucceeded()) { status = "成功"; } else { status = "等待中"; } return new SimpleStringProperty(status); }); // 初始化历史任务表格 historyTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId")); historyTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title")); historyTaskStatusColumn.setCellValueFactory(cellData -> new SimpleStringProperty(cellData.getValue().getStatus().getDescription())); historyTaskStartTimeColumn.setCellValueFactory(cellData -> new SimpleStringProperty(dateFormat.format(new Date(cellData.getValue().getStartTime())))); historyTaskExecutionTimeColumn.setCellValueFactory(cellData -> new SimpleStringProperty(cellData.getValue().getFormattedExecutionTime())); // 绑定数据 activeTasksTable.setItems(taskMonitorCenter.getActiveTasks()); historyTasksTable.setItems(taskMonitorCenter.getTaskHistory()); // 添加任务双击事件(显示详情) historyTasksTable.setRowFactory(tv -> { TableRow row = new TableRow<>(); row.setOnMouseClicked(event -> { if (event.getClickCount() == 2 && !row.isEmpty()) { TaskHistory history = row.getItem(); showTaskDetails(history); } }); return row; }); // 更新按钮状态 updateButtonStates(); activeTasksTable.getSelectionModel().selectedItemProperty().addListener((obs, oldVal, newVal) -> updateButtonStates()); } /** * 更新按钮状态 */ private void updateButtonStates() { cancelTaskButton.setDisable(activeTasksTable.getSelectionModel().getSelectedItem() == null); } /** * 取消选中的任务 */ @FXML private void onCancelTask() { MonitoredTask selectedTask = activeTasksTable.getSelectionModel().getSelectedItem(); if (selectedTask != null) { taskMonitorCenter.cancelTask(selectedTask.getTaskId()); } } /** * 清空历史记录 */ @FXML private void onClearHistory() { Alert alert = new Alert(Alert.AlertType.CONFIRMATION); alert.setTitle("确认操作"); alert.setHeaderText("清空任务历史"); alert.setContentText("确定要清空所有任务历史记录吗?此操作不可恢复。"); alert.showAndWait().ifPresent(response -> { if (response == ButtonType.OK) { taskMonitorCenter.getTaskHistory().clear(); } }); } /** * 显示任务详情 */ private void showTaskDetails(TaskHistory history) { Alert alert = new Alert(Alert.AlertType.INFORMATION); alert.setTitle("任务详情"); alert.setHeaderText(history.getTitle()); StringBuilder content = new StringBuilder(); content.append("任务ID: " + history.getTaskId() + "\n"); content.append("状态: " + history.getStatus().getDescription() + "\n"); content.append("开始时间: " + dateFormat.format(new Date(history.getStartTime())) + "\n"); content.append("结束时间: " + dateFormat.format(new Date(history.getEndTime())) + "\n"); content.append("执行耗时: " + history.getFormattedExecutionTime() + "\n"); if (history.getErrorMessage() != null) { content.append("\n错误信息: " + history.getErrorMessage()); alert.getDialogPane().setExpandableContent(new TextArea(history.getErrorMessage())); } alert.setContentText(content.toString()); alert.showAndWait(); } /** * 显示任务监控窗口 */ @SneakyThrows(IOException.class) public static void show() { FXMLLoader loader = new FXMLLoader(TaskMonitorViewController.class.getResource("/ui/task/TaskMonitorView.fxml")); // 设置Spring为控制器工厂 loader.setControllerFactory(SpringApp::getBean); Parent root = loader.load(); Stage stage = new Stage(); stage.setTitle("任务监控中心"); stage.setScene(new Scene(root, 800, 600)); stage.initModality(Modality.NONE); // 非模态窗口 stage.show(); } } ``` ### 3. 集成到现有系统 #### 3.1 修改UITools类 ```java package com.ecep.contract.manager.util; import com.ecep.contract.manager.SpringApp; import com.ecep.contract.manager.ui.Message; import com.ecep.contract.manager.ui.Tasker; import com.ecep.contract.manager.ui.task.TaskMonitorCenter; import com.ecep.contract.manager.ui.task.MonitoredTask; import javafx.application.Platform; import javafx.concurrent.Task; import javafx.scene.Node; import javafx.scene.control.*; import javafx.scene.layout.VBox; import java.util.Optional; import java.util.function.Consumer; public class UITools { // 其他方法保持不变... /** * 显示一个对话框, 并等待用户关闭对话框 * * @param title 对话框标题 * @param task 任务 * @param init 初始化 */ public static void showTaskDialogAndWait(String title, Task task, Consumer> init) { Dialog dialog = createDialog(); dialog.getDialogPane().getScene().getStylesheets().add("/ui/dialog.css"); dialog.setTitle(title); DialogPane dialogPane = dialog.getDialogPane(); VBox box = (VBox) dialogPane.getContent(); Label headerLabel = (Label) box.lookup("#header"); ListView listView = (ListView) box.lookup("#list-view"); listView.setCellFactory(param -> new Tasker.MessageListCell()); headerLabel.textProperty().bind(task.titleProperty()); Consumer consumer = message -> { if (Platform.isFxApplicationThread()) { listView.getItems().add(message); } else { Platform.runLater(() -> listView.getItems().add(message)); } }; if (task instanceof Tasker tasker) { tasker.setMessageHandler(consumer); } else { task.messageProperty().addListener((observable, oldValue, newValue) -> { consumer.accept(Message.info(newValue)); }); } // 加一个进度条 ProgressBar progressBar = new ProgressBar(0); progressBar.setPrefHeight(12); progressBar.setPrefWidth(200); progressBar.prefWidthProperty().bind(box.widthProperty().subtract(10)); progressBar.setMinHeight(12); progressBar.setVisible(true); VBox.setMargin(progressBar, new Insets(6, 0, 6, 0)); Platform.runLater(() -> { box.getChildren().add(progressBar); }); progressBar.visibleProperty().bind(task.runningProperty()); progressBar.progressProperty().bind(task.progressProperty()); // 使用任务监控中心启动任务 if (task instanceof Tasker tasker) { // 注册到监控中心,但不在这里启动,因为showAndWait会阻塞UI线程 TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class); MonitoredTask monitoredTask = monitorCenter.registerTask(tasker); } if (init != null) { init.accept(consumer); } dialog.showAndWait(); if (task.isRunning()) { task.cancel(); } } // 其他方法保持不变... } ``` #### 3.2 修改SpringApp配置 ```java package com.ecep.contract.manager; import com.ecep.contract.manager.ui.task.TaskMonitorCenter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.DomainScan; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import java.util.concurrent.ScheduledExecutorService; @SpringBootApplication @EnableJpaRepositories(basePackages = {"com.ecep.contract.manager.ds"}) @DomainScan(basePackages = {"com.ecep.contract.manager.ds"}) @EnableScheduling @EnableAsync public class SpringApp { private static ConfigurableApplicationContext context; // 其他方法保持不变... public static void main(String[] args) { context = SpringApplication.run(SpringApp.class, args); // 初始化任务监控中心 TaskMonitorCenter monitorCenter = context.getBean(TaskMonitorCenter.class); // 可以在这里添加一些初始化代码 } // 其他方法保持不变... } ``` ### 4. 超时处理机制 ```java package com.ecep.contract.manager.util; import java.util.concurrent.*; import java.util.function.Supplier; /** * 异步任务工具类 - 提供超时处理等增强功能 */ public class AsyncUtils { /** * 执行带超时的CompletableFuture任务 */ public static CompletableFuture supplyAsyncWithTimeout(Supplier supplier, long timeout, TimeUnit unit) { CompletableFuture future = CompletableFuture.supplyAsync(supplier); return withTimeout(future, timeout, unit); } /** * 执行带超时的CompletableFuture任务,并指定线程池 */ public static CompletableFuture supplyAsyncWithTimeout(Supplier supplier, Executor executor, long timeout, TimeUnit unit) { CompletableFuture future = CompletableFuture.supplyAsync(supplier, executor); return withTimeout(future, timeout, unit); } /** * 为已有的CompletableFuture添加超时处理 */ public static CompletableFuture withTimeout(CompletableFuture future, long timeout, TimeUnit unit) { CompletableFuture timeoutFuture = new CompletableFuture<>(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread thread = new Thread(r, "timeout-scheduler"); thread.setDaemon(true); return thread; }); scheduler.schedule(() -> { TimeoutException timeoutException = new TimeoutException("Task timed out after " + timeout + " " + unit.name()); timeoutFuture.completeExceptionally(timeoutException); }, timeout, unit); return future.applyToEither(timeoutFuture, t -> t) .whenComplete((t, ex) -> scheduler.shutdownNow()); } /** * 带重试机制的CompletableFuture任务 */ public static CompletableFuture supplyAsyncWithRetry(Supplier supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit) { return supplyAsyncWithRetry(supplier, maxRetries, delayBetweenRetries, unit, null); } /** * 带重试机制的CompletableFuture任务,并指定哪些异常需要重试 */ public static CompletableFuture supplyAsyncWithRetry( Supplier supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit, Class[] retryableExceptions) { CompletableFuture resultFuture = new CompletableFuture<>(); // 执行任务的方法 executeWithRetry(supplier, resultFuture, 0, maxRetries, delayBetweenRetries, unit, retryableExceptions); return resultFuture; } private static void executeWithRetry( Supplier supplier, CompletableFuture resultFuture, int currentAttempt, int maxRetries, long delayBetweenRetries, TimeUnit unit, Class[] retryableExceptions) { CompletableFuture.supplyAsync(supplier) .thenAccept(resultFuture::complete) .exceptionally(ex -> { if (currentAttempt < maxRetries && shouldRetry(ex, retryableExceptions)) { // 延迟后重试 try { Thread.sleep(unit.toMillis(delayBetweenRetries)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); resultFuture.completeExceptionally(ie); return null; } // 递归调用,增加尝试次数 executeWithRetry(supplier, resultFuture, currentAttempt + 1, maxRetries, delayBetweenRetries, unit, retryableExceptions); } else { // 达到最大重试次数或异常不可重试,完成Future resultFuture.completeExceptionally(ex); } return null; }); } private static boolean shouldRetry(Throwable ex, Class[] retryableExceptions) { // 如果未指定重试异常类型,则所有异常都重试 if (retryableExceptions == null || retryableExceptions.length == 0) { return true; } // 检查异常是否属于可重试类型 for (Class exceptionClass : retryableExceptions) { if (exceptionClass.isInstance(ex.getCause())) { return true; } } return false; } } ``` ## 使用示例 ### 1. 基本任务监控 ```java // 在任何需要执行异步任务的地方 Tasker task = new ContractSyncTask(); TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class); // 注册任务,设置30秒超时 monitorCenter.registerTask(task, 30L); // 显示任务监控窗口 TaskMonitorViewController.show(); ``` ### 2. 带超时处理的CompletableFuture ```java // 使用AsyncUtils工具类执行带超时的异步任务 CompletableFuture future = AsyncUtils.supplyAsyncWithTimeout(() -> { // 执行耗时操作 Thread.sleep(5000); return "任务完成"; }, 3, TimeUnit.SECONDS); // 处理结果 future.thenAccept(result -> System.out.println("结果: " + result)) .exceptionally(ex -> { if (ex.getCause() instanceof TimeoutException) { System.out.println("任务执行超时"); } else { System.out.println("任务执行异常: " + ex.getMessage()); } return null; }); ``` ### 3. 带重试机制的任务 ```java // 使用AsyncUtils工具类执行带重试的异步任务 CompletableFuture future = AsyncUtils.supplyAsyncWithRetry(() -> { // 模拟一个可能失败的操作 if (Math.random() > 0.7) { throw new RuntimeException("随机失败"); } return "操作成功"; }, 3, 1, TimeUnit.SECONDS, new Class[]{RuntimeException.class}); // 处理结果 future.thenAccept(result -> System.out.println("结果: " + result)) .exceptionally(ex -> { System.out.println("多次重试后仍然失败: " + ex.getMessage()); return null; }); ``` ## 实现要点总结 1. **统一任务管理**:通过`TaskMonitorCenter`集中管理所有异步任务,提供注册、监控和取消功能 2. **完整生命周期监控**:跟踪任务从创建、运行到完成/失败/取消的整个生命周期 3. **超时处理机制**:为任务设置超时时间,防止任务无限期阻塞 4. **历史记录追踪**:保存任务执行历史,方便问题排查和性能分析 5. **可视化界面**:提供直观的任务监控界面,展示任务进度和状态 6. **重试策略**:针对失败任务提供可配置的重试机制 7. **与现有系统集成**:无缝集成到现有任务调度机制中 这套实现方案可以显著提升系统的稳定性和可维护性,防止长时间运行的任务占用系统资源,并为用户提供更好的任务执行状态反馈。