Files
contract-manager/docs/异步任务监控实现方案.md

29 KiB
Raw Blame History

异步任务监控实现方案

现状分析

通过分析项目代码,我们发现当前系统的异步任务管理存在以下特点和不足:

  1. 任务调度机制系统使用JavaFX的Task和Spring的ScheduledExecutorService进行异步任务调度
  2. 任务基类Tasker类作为所有任务的基类,实现了基本的任务生命周期管理
  3. 进度展示UITools.showTaskDialogAndWait()方法提供了简单的任务进度对话框
  4. 异步编程:项目广泛使用CompletableFuture进行异步操作
  5. 主要不足
    • 缺乏统一的任务监控中心
    • 任务执行状态缺乏集中管理
    • 大部分任务没有超时处理机制
    • 任务执行历史无法追溯
    • 缺少任务失败后的重试策略

实现方案

1. 任务监控核心组件

1.1 任务监控中心

package com.ecep.contract.manager.ui.task; 

import com.ecep.contract.manager.ui.Tasker; 
import javafx.collections.FXCollections; 
import javafx.collections.ObservableList; 
import lombok.Getter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.stereotype.Component; 

import java.util.Map; 
import java.util.UUID; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.atomic.AtomicInteger; 

/** 
 * 任务监控中心 - 统一管理所有异步任务 
 */ 
@Component 
public class TaskMonitorCenter { 
    private static final Logger logger = LoggerFactory.getLogger(TaskMonitorCenter.class); 
    
    // 正在运行的任务列表 
    @Getter 
    private final ObservableList<MonitoredTask<?>> activeTasks = FXCollections.observableArrayList(); 
    
    // 任务历史记录 
    @Getter 
    private final ObservableList<TaskHistory> taskHistory = FXCollections.observableArrayList(); 
    
    // 任务ID映射 
    private final Map<String, MonitoredTask<?>> taskMap = new ConcurrentHashMap<>(); 
    
    // 任务计数器 
    private final AtomicInteger taskCounter = new AtomicInteger(0); 
    
    /** 
     * 注册并启动任务 
     */ 
    public <T> MonitoredTask<T> registerTask(Tasker<T> task) { 
        return registerTask(task, null); 
    } 
    
    /** 
     * 注册并启动任务,支持设置超时时间 
     */ 
    public <T> MonitoredTask<T> registerTask(Tasker<T> task, Long timeoutSeconds) { 
        String taskId = generateTaskId(); 
        MonitoredTask<T> monitoredTask = new MonitoredTask<>(task, taskId); 
        
        // 设置超时处理 
        if (timeoutSeconds != null && timeoutSeconds > 0) { 
            monitoredTask.setTimeout(timeoutSeconds); 
        } 
        
        // 添加任务状态变更监听器 
        monitoredTask.setOnSucceeded(event -> { 
            logger.info("任务[{}]执行成功: {}", taskId, task.getTitle()); 
            recordHistory(monitoredTask, TaskStatus.SUCCEEDED); 
            removeActiveTask(monitoredTask); 
        }); 
        
        monitoredTask.setOnFailed(event -> { 
            logger.error("任务[{}]执行失败: {}", taskId, task.getTitle(), task.getException()); 
            recordHistory(monitoredTask, TaskStatus.FAILED); 
            removeActiveTask(monitoredTask); 
        }); 
        
        monitoredTask.setOnCancelled(event -> { 
            logger.info("任务[{}]被取消: {}", taskId, task.getTitle()); 
            recordHistory(monitoredTask, TaskStatus.CANCELLED); 
            removeActiveTask(monitoredTask); 
        }); 
        
        // 保存任务引用并启动 
        taskMap.put(taskId, monitoredTask); 
        activeTasks.add(monitoredTask); 
        monitoredTask.start(); 
        
        logger.info("任务[{}]已注册并启动: {}", taskId, task.getTitle()); 
        return monitoredTask; 
    } 
    
    /** 
     * 取消任务 
     */ 
    public boolean cancelTask(String taskId) { 
        MonitoredTask<?> task = taskMap.get(taskId); 
        if (task != null && task.isRunning()) { 
            task.cancel(); 
            return true; 
        } 
        return false; 
    } 
    
    /** 
     * 取消所有任务 
     */ 
    public void cancelAllTasks() { 
        for (MonitoredTask<?> task : new ArrayList<>(activeTasks)) { 
            task.cancel(); 
        } 
    } 
    
    /** 
     * 从活动列表中移除任务 
     */ 
    private void removeActiveTask(MonitoredTask<?> task) { 
        Platform.runLater(() -> activeTasks.remove(task)); 
        taskMap.remove(task.getTaskId()); 
    } 
    
    /** 
     * 记录任务历史 
     */ 
    private void recordHistory(MonitoredTask<?> task, TaskStatus status) { 
        TaskHistory history = new TaskHistory( 
            task.getTaskId(), 
            task.getTitle(), 
            status, 
            task.getStartTime(), 
            System.currentTimeMillis() 
        ); 
        
        if (task.getException() != null) { 
            history.setErrorMessage(task.getException().getMessage()); 
        } 
        
        Platform.runLater(() -> { 
            taskHistory.add(0, history); 
            // 只保留最近100条历史记录 
            if (taskHistory.size() > 100) { 
                taskHistory.remove(taskHistory.size() - 1); 
            } 
        }); 
    } 
    
    /** 
     * 生成任务ID 
     */ 
    private String generateTaskId() { 
        return "task-" + UUID.randomUUID().toString().substring(0, 8) + "-" + taskCounter.incrementAndGet(); 
    } 
} 

1.2 监控任务包装类

package com.ecep.contract.manager.ui.task; 

import com.ecep.contract.manager.ui.Tasker; 
import javafx.concurrent.Task; 
import lombok.Getter; 
import lombok.Setter; 

import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicBoolean; 

/** 
 * 被监控的任务包装类 
 */ 
public class MonitoredTask<T> extends Task<T> { 
    @Getter 
    private final Tasker<T> delegate; 
    @Getter 
    private final String taskId; 
    @Getter 
    private final long startTime; 
    @Setter 
    private Long timeoutSeconds; 
    private ScheduledExecutorService timeoutExecutor; 
    private final AtomicBoolean timeoutHandled = new AtomicBoolean(false); 
    
    public MonitoredTask(Tasker<T> delegate, String taskId) { 
        this.delegate = delegate; 
        this.taskId = taskId; 
        this.startTime = System.currentTimeMillis(); 
        
        // 绑定属性 
        titleProperty().bind(delegate.titleProperty()); 
        messageProperty().bind(delegate.messageProperty()); 
        progressProperty().bind(delegate.progressProperty()); 
    } 
    
    @Override 
    protected T call() throws Exception { 
        // 设置超时监控 
        if (timeoutSeconds != null) { 
            setupTimeoutMonitor(); 
        } 
        
        try { 
            return delegate.call(); 
        } finally { 
            // 取消超时监控 
            if (timeoutExecutor != null) { 
                timeoutExecutor.shutdownNow(); 
            } 
        } 
    } 
    
    /** 
     * 设置超时监控 
     */ 
    private void setupTimeoutMonitor() { 
        timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> { 
            Thread thread = new Thread(r, "task-timeout-monitor-" + taskId); 
            thread.setDaemon(true); 
            return thread; 
        }); 
        
        timeoutExecutor.schedule(() -> { 
            if (isRunning() && timeoutHandled.compareAndSet(false, true)) { 
                updateMessage("任务执行超时,正在取消..."); 
                cancel(); 
            } 
        }, timeoutSeconds, TimeUnit.SECONDS); 
    } 
    
    @Override 
    public boolean cancel() { 
        boolean cancelled = super.cancel(); 
        if (cancelled) { 
            delegate.cancel(); 
        } 
        return cancelled; 
    } 
    
    /** 
     * 启动任务 
     */ 
    public void start() { 
        Thread thread = new Thread(this, "task-" + taskId); 
        thread.setDaemon(true); 
        thread.start(); 
    } 
    
    /** 
     * 获取任务执行耗时 
     */ 
    public long getExecutionTime() { 
        return System.currentTimeMillis() - startTime; 
    } 
} 

1.3 任务状态和历史记录类

package com.ecep.contract.manager.ui.task; 

import lombok.AllArgsConstructor; 
import lombok.Data; 
import lombok.NoArgsConstructor; 

/** 
 * 任务状态枚举 
 */ 
public enum TaskStatus { 
    SCHEDULED("已调度"), 
    RUNNING("运行中"), 
    SUCCEEDED("成功"), 
    FAILED("失败"), 
    CANCELLED("已取消"), 
    TIMED_OUT("超时"); 
    
    private final String description; 
    
    TaskStatus(String description) { 
        this.description = description; 
    } 
    
    public String getDescription() { 
        return description; 
    } 
} 

/** 
 * 任务历史记录 
 */ 
@Data 
@NoArgsConstructor 
@AllArgsConstructor 
public class TaskHistory { 
    private String taskId; 
    private String title; 
    private TaskStatus status; 
    private long startTime; 
    private long endTime; 
    private String errorMessage; 
    
    /** 
     * 获取任务执行耗时(毫秒) 
     */ 
    public long getExecutionTime() { 
        return endTime - startTime; 
    } 
    
    /** 
     * 获取格式化的执行耗时 
     */ 
    public String getFormattedExecutionTime() { 
        long ms = getExecutionTime(); 
        if (ms < 1000) { 
            return ms + "ms"; 
        } else if (ms < 60000) { 
            return String.format("%.2fs", ms / 1000.0); 
        } else { 
            return String.format("%dm %.2fs", ms / 60000, (ms % 60000) / 1000.0); 
        } 
    } 
} 

2. 任务监控界面

package com.ecep.contract.manager.ui.task; 

import com.ecep.contract.manager.SpringApp; 
import com.ecep.contract.manager.ui.Tasker; 
import javafx.application.Platform; 
import javafx.beans.property.SimpleStringProperty; 
import javafx.collections.ListChangeListener; 
import javafx.fxml.FXML; 
import javafx.fxml.FXMLLoader; 
import javafx.scene.Parent; 
import javafx.scene.Scene; 
import javafx.scene.control.*; 
import javafx.scene.control.cell.PropertyValueFactory; 
import javafx.stage.Modality; 
import javafx.stage.Stage; 
import lombok.SneakyThrows; 
import org.springframework.stereotype.Component; 

import java.io.IOException; 
import java.text.SimpleDateFormat; 
import java.util.Date; 

/** 
 * 任务监控界面控制器 
 */ 
@Component 
public class TaskMonitorViewController { 
    @FXML 
    private TableView<MonitoredTask<?>> activeTasksTable; 
    @FXML 
    private TableColumn<MonitoredTask<?>, String> activeTaskIdColumn; 
    @FXML 
    private TableColumn<MonitoredTask<?>, String> activeTaskTitleColumn; 
    @FXML 
    private TableColumn<MonitoredTask<?>, ProgressBar> activeTaskProgressColumn; 
    @FXML 
    private TableColumn<MonitoredTask<?>, String> activeTaskStatusColumn; 
    
    @FXML 
    private TableView<TaskHistory> historyTasksTable; 
    @FXML 
    private TableColumn<TaskHistory, String> historyTaskIdColumn; 
    @FXML 
    private TableColumn<TaskHistory, String> historyTaskTitleColumn; 
    @FXML 
    private TableColumn<TaskHistory, String> historyTaskStatusColumn; 
    @FXML 
    private TableColumn<TaskHistory, String> historyTaskStartTimeColumn; 
    @FXML 
    private TableColumn<TaskHistory, String> historyTaskExecutionTimeColumn; 
    
    @FXML 
    private Button cancelTaskButton; 
    @FXML 
    private Button clearHistoryButton; 
    
    private final TaskMonitorCenter taskMonitorCenter; 
    private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    
    public TaskMonitorViewController() { 
        this.taskMonitorCenter = SpringApp.getBean(TaskMonitorCenter.class); 
    } 
    
    @FXML 
    public void initialize() { 
        // 初始化活动任务表格 
        activeTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId")); 
        activeTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title")); 
        
        // 进度条列 
        activeTaskProgressColumn.setCellFactory(column -> new TableCell<>() { 
            private final ProgressBar progressBar = new ProgressBar(); 
            
            @Override 
            protected void updateItem(ProgressBar item, boolean empty) { 
                super.updateItem(item, empty); 
                if (empty || getTableRow().getItem() == null) { 
                    setGraphic(null); 
                } else { 
                    MonitoredTask<?> task = getTableRow().getItem(); 
                    progressBar.progressProperty().bind(task.progressProperty()); 
                    setGraphic(progressBar); 
                } 
            } 
        }); 
        
        // 状态列 
        activeTaskStatusColumn.setCellValueFactory(cellData -> { 
            MonitoredTask<?> task = cellData.getValue(); 
            String status; 
            if (task.isRunning()) { 
                status = "运行中"; 
            } else if (task.isCancelled()) { 
                status = "已取消"; 
            } else if (task.isFailed()) { 
                status = "失败"; 
            } else if (task.isSucceeded()) { 
                status = "成功"; 
            } else { 
                status = "等待中"; 
            } 
            return new SimpleStringProperty(status); 
        }); 
        
        // 初始化历史任务表格 
        historyTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId")); 
        historyTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title")); 
        historyTaskStatusColumn.setCellValueFactory(cellData -> 
            new SimpleStringProperty(cellData.getValue().getStatus().getDescription())); 
        historyTaskStartTimeColumn.setCellValueFactory(cellData -> 
            new SimpleStringProperty(dateFormat.format(new Date(cellData.getValue().getStartTime())))); 
        historyTaskExecutionTimeColumn.setCellValueFactory(cellData -> 
            new SimpleStringProperty(cellData.getValue().getFormattedExecutionTime())); 
        
        // 绑定数据 
        activeTasksTable.setItems(taskMonitorCenter.getActiveTasks()); 
        historyTasksTable.setItems(taskMonitorCenter.getTaskHistory()); 
        
        // 添加任务双击事件(显示详情) 
        historyTasksTable.setRowFactory(tv -> { 
            TableRow<TaskHistory> row = new TableRow<>(); 
            row.setOnMouseClicked(event -> { 
                if (event.getClickCount() == 2 && !row.isEmpty()) { 
                    TaskHistory history = row.getItem(); 
                    showTaskDetails(history); 
                } 
            }); 
            return row; 
        }); 
        
        // 更新按钮状态 
        updateButtonStates(); 
        activeTasksTable.getSelectionModel().selectedItemProperty().addListener((obs, oldVal, newVal) -> 
            updateButtonStates()); 
    } 
    
    /** 
     * 更新按钮状态 
     */ 
    private void updateButtonStates() { 
        cancelTaskButton.setDisable(activeTasksTable.getSelectionModel().getSelectedItem() == null); 
    } 
    
    /** 
     * 取消选中的任务 
     */ 
    @FXML 
    private void onCancelTask() { 
        MonitoredTask<?> selectedTask = activeTasksTable.getSelectionModel().getSelectedItem(); 
        if (selectedTask != null) { 
            taskMonitorCenter.cancelTask(selectedTask.getTaskId()); 
        } 
    } 
    
    /** 
     * 清空历史记录 
     */ 
    @FXML 
    private void onClearHistory() { 
        Alert alert = new Alert(Alert.AlertType.CONFIRMATION); 
        alert.setTitle("确认操作"); 
        alert.setHeaderText("清空任务历史"); 
        alert.setContentText("确定要清空所有任务历史记录吗?此操作不可恢复。"); 
        
        alert.showAndWait().ifPresent(response -> { 
            if (response == ButtonType.OK) { 
                taskMonitorCenter.getTaskHistory().clear(); 
            } 
        }); 
    } 
    
    /** 
     * 显示任务详情 
     */ 
    private void showTaskDetails(TaskHistory history) { 
        Alert alert = new Alert(Alert.AlertType.INFORMATION); 
        alert.setTitle("任务详情"); 
        alert.setHeaderText(history.getTitle()); 
        
        StringBuilder content = new StringBuilder(); 
        content.append("任务ID: " + history.getTaskId() + "\n"); 
        content.append("状态: " + history.getStatus().getDescription() + "\n"); 
        content.append("开始时间: " + dateFormat.format(new Date(history.getStartTime())) + "\n"); 
        content.append("结束时间: " + dateFormat.format(new Date(history.getEndTime())) + "\n"); 
        content.append("执行耗时: " + history.getFormattedExecutionTime() + "\n"); 
        
        if (history.getErrorMessage() != null) { 
            content.append("\n错误信息: " + history.getErrorMessage()); 
            alert.getDialogPane().setExpandableContent(new TextArea(history.getErrorMessage())); 
        } 
        
        alert.setContentText(content.toString()); 
        alert.showAndWait(); 
    } 
    
    /** 
     * 显示任务监控窗口 
     */ 
    @SneakyThrows(IOException.class) 
    public static void show() { 
        FXMLLoader loader = new FXMLLoader(TaskMonitorViewController.class.getResource("/ui/task/TaskMonitorView.fxml")); 
        // 设置Spring为控制器工厂 
        loader.setControllerFactory(SpringApp::getBean); 
        
        Parent root = loader.load(); 
        Stage stage = new Stage(); 
        stage.setTitle("任务监控中心"); 
        stage.setScene(new Scene(root, 800, 600)); 
        stage.initModality(Modality.NONE); // 非模态窗口 
        stage.show(); 
    } 
} 

3. 集成到现有系统

3.1 修改UITools类

package com.ecep.contract.manager.util; 

import com.ecep.contract.manager.SpringApp; 
import com.ecep.contract.manager.ui.Message; 
import com.ecep.contract.manager.ui.Tasker; 
import com.ecep.contract.manager.ui.task.TaskMonitorCenter; 
import com.ecep.contract.manager.ui.task.MonitoredTask; 
import javafx.application.Platform; 
import javafx.concurrent.Task; 
import javafx.scene.Node; 
import javafx.scene.control.*; 
import javafx.scene.layout.VBox; 

import java.util.Optional; 
import java.util.function.Consumer; 

public class UITools { 
    // 其他方法保持不变... 
    
    /** 
     * 显示一个对话框, 并等待用户关闭对话框 
     * 
     * @param title 对话框标题 
     * @param task  任务 
     * @param init  初始化 
     */ 
    public static void showTaskDialogAndWait(String title, Task<?> task, Consumer<Consumer<Message>> init) { 
        Dialog<Message> dialog = createDialog(); 
        dialog.getDialogPane().getScene().getStylesheets().add("/ui/dialog.css"); 
        dialog.setTitle(title); 

        DialogPane dialogPane = dialog.getDialogPane(); 
        VBox box = (VBox) dialogPane.getContent(); 
        Label headerLabel = (Label) box.lookup("#header"); 
        ListView<Message> listView = (ListView<Message>) box.lookup("#list-view"); 
        listView.setCellFactory(param -> new Tasker.MessageListCell()); 

        headerLabel.textProperty().bind(task.titleProperty()); 

        Consumer<Message> consumer = message -> { 
            if (Platform.isFxApplicationThread()) { 
                listView.getItems().add(message); 
            } else { 
                Platform.runLater(() -> listView.getItems().add(message)); 
            } 
        }; 
        if (task instanceof Tasker<?> tasker) { 
            tasker.setMessageHandler(consumer); 
        } else { 
            task.messageProperty().addListener((observable, oldValue, newValue) -> { 
                consumer.accept(Message.info(newValue)); 
            }); 
        } 

        // 加一个进度条 
        ProgressBar progressBar = new ProgressBar(0); 
        progressBar.setPrefHeight(12); 
        progressBar.setPrefWidth(200); 
        progressBar.prefWidthProperty().bind(box.widthProperty().subtract(10)); 
        progressBar.setMinHeight(12); 
        progressBar.setVisible(true); 
        VBox.setMargin(progressBar, new Insets(6, 0, 6, 0)); 

        Platform.runLater(() -> { 
            box.getChildren().add(progressBar); 
        }); 

        progressBar.visibleProperty().bind(task.runningProperty()); 
        progressBar.progressProperty().bind(task.progressProperty()); 

        // 使用任务监控中心启动任务 
        if (task instanceof Tasker<?> tasker) { 
            // 注册到监控中心但不在这里启动因为showAndWait会阻塞UI线程 
            TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class); 
            MonitoredTask<?> monitoredTask = monitorCenter.registerTask(tasker); 
        } 
        
        if (init != null) { 
            init.accept(consumer); 
        } 
        
        dialog.showAndWait(); 
        if (task.isRunning()) { 
            task.cancel(); 
        } 
    } 
    
    // 其他方法保持不变... 
} 

3.2 修改SpringApp配置

package com.ecep.contract.manager; 

import com.ecep.contract.manager.ui.task.TaskMonitorCenter; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.boot.autoconfigure.domain.DomainScan; 
import org.springframework.cache.annotation.EnableCaching; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.data.jpa.repository.config.EnableJpaRepositories; 
import org.springframework.scheduling.annotation.EnableAsync; 
import org.springframework.scheduling.annotation.EnableScheduling; 

import java.util.concurrent.ScheduledExecutorService; 

@SpringBootApplication 
@EnableJpaRepositories(basePackages = {"com.ecep.contract.manager.ds"}) 
@DomainScan(basePackages = {"com.ecep.contract.manager.ds"}) 
@EnableScheduling 
@EnableAsync 
public class SpringApp { 
    private static ConfigurableApplicationContext context; 
    
    // 其他方法保持不变... 
    
    public static void main(String[] args) { 
        context = SpringApplication.run(SpringApp.class, args); 
        
        // 初始化任务监控中心 
        TaskMonitorCenter monitorCenter = context.getBean(TaskMonitorCenter.class); 
        // 可以在这里添加一些初始化代码 
    } 
    
    // 其他方法保持不变... 
} 

4. 超时处理机制

package com.ecep.contract.manager.util; 

import java.util.concurrent.*; 
import java.util.function.Supplier; 

/** 
 * 异步任务工具类 - 提供超时处理等增强功能 
 */ 
public class AsyncUtils { 
    /** 
     * 执行带超时的CompletableFuture任务 
     */ 
    public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) { 
        CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier); 
        return withTimeout(future, timeout, unit); 
    } 
    
    /** 
     * 执行带超时的CompletableFuture任务并指定线程池 
     */ 
    public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, Executor executor, long timeout, TimeUnit unit) { 
        CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, executor); 
        return withTimeout(future, timeout, unit); 
    } 
    
    /** 
     * 为已有的CompletableFuture添加超时处理 
     */ 
    public static <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) { 
        CompletableFuture<T> timeoutFuture = new CompletableFuture<>(); 
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { 
            Thread thread = new Thread(r, "timeout-scheduler"); 
            thread.setDaemon(true); 
            return thread; 
        }); 
        
        scheduler.schedule(() -> { 
            TimeoutException timeoutException = new TimeoutException("Task timed out after " + timeout + " " + unit.name()); 
            timeoutFuture.completeExceptionally(timeoutException); 
        }, timeout, unit); 
        
        return future.applyToEither(timeoutFuture, t -> t) 
                .whenComplete((t, ex) -> scheduler.shutdownNow()); 
    } 
    
    /** 
     * 带重试机制的CompletableFuture任务 
     */ 
    public static <T> CompletableFuture<T> supplyAsyncWithRetry(Supplier<T> supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit) { 
        return supplyAsyncWithRetry(supplier, maxRetries, delayBetweenRetries, unit, null); 
    } 
    
    /** 
     * 带重试机制的CompletableFuture任务并指定哪些异常需要重试 
     */ 
    public static <T> CompletableFuture<T> supplyAsyncWithRetry( 
            Supplier<T> supplier, 
            int maxRetries, 
            long delayBetweenRetries, 
            TimeUnit unit, 
            Class<? extends Exception>[] retryableExceptions) { 
        
        CompletableFuture<T> resultFuture = new CompletableFuture<>(); 
        
        // 执行任务的方法 
        executeWithRetry(supplier, resultFuture, 0, maxRetries, delayBetweenRetries, unit, retryableExceptions); 
        
        return resultFuture; 
    } 
    
    private static <T> void executeWithRetry( 
            Supplier<T> supplier, 
            CompletableFuture<T> resultFuture, 
            int currentAttempt, 
            int maxRetries, 
            long delayBetweenRetries, 
            TimeUnit unit, 
            Class<? extends Exception>[] retryableExceptions) { 
        
        CompletableFuture.supplyAsync(supplier) 
            .thenAccept(resultFuture::complete) 
            .exceptionally(ex -> { 
                if (currentAttempt < maxRetries && shouldRetry(ex, retryableExceptions)) { 
                    // 延迟后重试 
                    try { 
                        Thread.sleep(unit.toMillis(delayBetweenRetries)); 
                    } catch (InterruptedException ie) { 
                        Thread.currentThread().interrupt(); 
                        resultFuture.completeExceptionally(ie); 
                        return null; 
                    } 
                    
                    // 递归调用,增加尝试次数 
                    executeWithRetry(supplier, resultFuture, currentAttempt + 1, 
                            maxRetries, delayBetweenRetries, unit, retryableExceptions); 
                } else { 
                    // 达到最大重试次数或异常不可重试完成Future 
                    resultFuture.completeExceptionally(ex); 
                } 
                return null; 
            }); 
    } 
    
    private static boolean shouldRetry(Throwable ex, Class<? extends Exception>[] retryableExceptions) { 
        // 如果未指定重试异常类型,则所有异常都重试 
        if (retryableExceptions == null || retryableExceptions.length == 0) { 
            return true; 
        } 
        
        // 检查异常是否属于可重试类型 
        for (Class<? extends Exception> exceptionClass : retryableExceptions) { 
            if (exceptionClass.isInstance(ex.getCause())) { 
                return true; 
            } 
        } 
        
        return false; 
    } 
} 

使用示例

1. 基本任务监控

// 在任何需要执行异步任务的地方 
Tasker<?> task = new ContractSyncTask(); 
TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class); 
// 注册任务设置30秒超时 
monitorCenter.registerTask(task, 30L); 

// 显示任务监控窗口 
TaskMonitorViewController.show(); 

2. 带超时处理的CompletableFuture

// 使用AsyncUtils工具类执行带超时的异步任务 
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithTimeout(() -> { 
    // 执行耗时操作 
    Thread.sleep(5000); 
    return "任务完成"; 
}, 3, TimeUnit.SECONDS); 

// 处理结果 
future.thenAccept(result -> System.out.println("结果: " + result)) 
      .exceptionally(ex -> { 
          if (ex.getCause() instanceof TimeoutException) { 
              System.out.println("任务执行超时"); 
          } else { 
              System.out.println("任务执行异常: " + ex.getMessage()); 
          } 
          return null; 
      }); 

3. 带重试机制的任务

// 使用AsyncUtils工具类执行带重试的异步任务 
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithRetry(() -> { 
    // 模拟一个可能失败的操作 
    if (Math.random() > 0.7) { 
        throw new RuntimeException("随机失败"); 
    } 
    return "操作成功"; 
}, 3, 1, TimeUnit.SECONDS, new Class[]{RuntimeException.class}); 

// 处理结果 
future.thenAccept(result -> System.out.println("结果: " + result)) 
      .exceptionally(ex -> { 
          System.out.println("多次重试后仍然失败: " + ex.getMessage()); 
          return null; 
      }); 

实现要点总结

  1. 统一任务管理:通过TaskMonitorCenter集中管理所有异步任务,提供注册、监控和取消功能
  2. 完整生命周期监控:跟踪任务从创建、运行到完成/失败/取消的整个生命周期
  3. 超时处理机制:为任务设置超时时间,防止任务无限期阻塞
  4. 历史记录追踪:保存任务执行历史,方便问题排查和性能分析
  5. 可视化界面:提供直观的任务监控界面,展示任务进度和状态
  6. 重试策略:针对失败任务提供可配置的重试机制
  7. 与现有系统集成:无缝集成到现有任务调度机制中

这套实现方案可以显著提升系统的稳定性和可维护性,防止长时间运行的任务占用系统资源,并为用户提供更好的任务执行状态反馈。