Files
contract-manager/异步任务监控实现方案.md
2025-08-22 19:55:19 +08:00

885 lines
29 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 异步任务监控实现方案
## 现状分析
通过分析项目代码,我们发现当前系统的异步任务管理存在以下特点和不足:
1. **任务调度机制**系统使用JavaFX的`Task`和Spring的`ScheduledExecutorService`进行异步任务调度
2. **任务基类**`Tasker`类作为所有任务的基类,实现了基本的任务生命周期管理
3. **进度展示**`UITools.showTaskDialogAndWait()`方法提供了简单的任务进度对话框
4. **异步编程**:项目广泛使用`CompletableFuture`进行异步操作
5. **主要不足**
- 缺乏统一的任务监控中心
- 任务执行状态缺乏集中管理
- 大部分任务没有超时处理机制
- 任务执行历史无法追溯
- 缺少任务失败后的重试策略
## 实现方案
### 1. 任务监控核心组件
#### 1.1 任务监控中心
```java
package com.ecep.contract.manager.ui.task;
import com.ecep.contract.manager.ui.Tasker;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 任务监控中心 - 统一管理所有异步任务
*/
@Component
public class TaskMonitorCenter {
private static final Logger logger = LoggerFactory.getLogger(TaskMonitorCenter.class);
// 正在运行的任务列表
@Getter
private final ObservableList<MonitoredTask<?>> activeTasks = FXCollections.observableArrayList();
// 任务历史记录
@Getter
private final ObservableList<TaskHistory> taskHistory = FXCollections.observableArrayList();
// 任务ID映射
private final Map<String, MonitoredTask<?>> taskMap = new ConcurrentHashMap<>();
// 任务计数器
private final AtomicInteger taskCounter = new AtomicInteger(0);
/**
* 注册并启动任务
*/
public <T> MonitoredTask<T> registerTask(Tasker<T> task) {
return registerTask(task, null);
}
/**
* 注册并启动任务,支持设置超时时间
*/
public <T> MonitoredTask<T> registerTask(Tasker<T> task, Long timeoutSeconds) {
String taskId = generateTaskId();
MonitoredTask<T> monitoredTask = new MonitoredTask<>(task, taskId);
// 设置超时处理
if (timeoutSeconds != null && timeoutSeconds > 0) {
monitoredTask.setTimeout(timeoutSeconds);
}
// 添加任务状态变更监听器
monitoredTask.setOnSucceeded(event -> {
logger.info("任务[{}]执行成功: {}", taskId, task.getTitle());
recordHistory(monitoredTask, TaskStatus.SUCCEEDED);
removeActiveTask(monitoredTask);
});
monitoredTask.setOnFailed(event -> {
logger.error("任务[{}]执行失败: {}", taskId, task.getTitle(), task.getException());
recordHistory(monitoredTask, TaskStatus.FAILED);
removeActiveTask(monitoredTask);
});
monitoredTask.setOnCancelled(event -> {
logger.info("任务[{}]被取消: {}", taskId, task.getTitle());
recordHistory(monitoredTask, TaskStatus.CANCELLED);
removeActiveTask(monitoredTask);
});
// 保存任务引用并启动
taskMap.put(taskId, monitoredTask);
activeTasks.add(monitoredTask);
monitoredTask.start();
logger.info("任务[{}]已注册并启动: {}", taskId, task.getTitle());
return monitoredTask;
}
/**
* 取消任务
*/
public boolean cancelTask(String taskId) {
MonitoredTask<?> task = taskMap.get(taskId);
if (task != null && task.isRunning()) {
task.cancel();
return true;
}
return false;
}
/**
* 取消所有任务
*/
public void cancelAllTasks() {
for (MonitoredTask<?> task : new ArrayList<>(activeTasks)) {
task.cancel();
}
}
/**
* 从活动列表中移除任务
*/
private void removeActiveTask(MonitoredTask<?> task) {
Platform.runLater(() -> activeTasks.remove(task));
taskMap.remove(task.getTaskId());
}
/**
* 记录任务历史
*/
private void recordHistory(MonitoredTask<?> task, TaskStatus status) {
TaskHistory history = new TaskHistory(
task.getTaskId(),
task.getTitle(),
status,
task.getStartTime(),
System.currentTimeMillis()
);
if (task.getException() != null) {
history.setErrorMessage(task.getException().getMessage());
}
Platform.runLater(() -> {
taskHistory.add(0, history);
// 只保留最近100条历史记录
if (taskHistory.size() > 100) {
taskHistory.remove(taskHistory.size() - 1);
}
});
}
/**
* 生成任务ID
*/
private String generateTaskId() {
return "task-" + UUID.randomUUID().toString().substring(0, 8) + "-" + taskCounter.incrementAndGet();
}
}
```
#### 1.2 监控任务包装类
```java
package com.ecep.contract.manager.ui.task;
import com.ecep.contract.manager.ui.Tasker;
import javafx.concurrent.Task;
import lombok.Getter;
import lombok.Setter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 被监控的任务包装类
*/
public class MonitoredTask<T> extends Task<T> {
@Getter
private final Tasker<T> delegate;
@Getter
private final String taskId;
@Getter
private final long startTime;
@Setter
private Long timeoutSeconds;
private ScheduledExecutorService timeoutExecutor;
private final AtomicBoolean timeoutHandled = new AtomicBoolean(false);
public MonitoredTask(Tasker<T> delegate, String taskId) {
this.delegate = delegate;
this.taskId = taskId;
this.startTime = System.currentTimeMillis();
// 绑定属性
titleProperty().bind(delegate.titleProperty());
messageProperty().bind(delegate.messageProperty());
progressProperty().bind(delegate.progressProperty());
}
@Override
protected T call() throws Exception {
// 设置超时监控
if (timeoutSeconds != null) {
setupTimeoutMonitor();
}
try {
return delegate.call();
} finally {
// 取消超时监控
if (timeoutExecutor != null) {
timeoutExecutor.shutdownNow();
}
}
}
/**
* 设置超时监控
*/
private void setupTimeoutMonitor() {
timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "task-timeout-monitor-" + taskId);
thread.setDaemon(true);
return thread;
});
timeoutExecutor.schedule(() -> {
if (isRunning() && timeoutHandled.compareAndSet(false, true)) {
updateMessage("任务执行超时,正在取消...");
cancel();
}
}, timeoutSeconds, TimeUnit.SECONDS);
}
@Override
public boolean cancel() {
boolean cancelled = super.cancel();
if (cancelled) {
delegate.cancel();
}
return cancelled;
}
/**
* 启动任务
*/
public void start() {
Thread thread = new Thread(this, "task-" + taskId);
thread.setDaemon(true);
thread.start();
}
/**
* 获取任务执行耗时
*/
public long getExecutionTime() {
return System.currentTimeMillis() - startTime;
}
}
```
#### 1.3 任务状态和历史记录类
```java
package com.ecep.contract.manager.ui.task;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 任务状态枚举
*/
public enum TaskStatus {
SCHEDULED("已调度"),
RUNNING("运行中"),
SUCCEEDED("成功"),
FAILED("失败"),
CANCELLED("已取消"),
TIMED_OUT("超时");
private final String description;
TaskStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
/**
* 任务历史记录
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskHistory {
private String taskId;
private String title;
private TaskStatus status;
private long startTime;
private long endTime;
private String errorMessage;
/**
* 获取任务执行耗时(毫秒)
*/
public long getExecutionTime() {
return endTime - startTime;
}
/**
* 获取格式化的执行耗时
*/
public String getFormattedExecutionTime() {
long ms = getExecutionTime();
if (ms < 1000) {
return ms + "ms";
} else if (ms < 60000) {
return String.format("%.2fs", ms / 1000.0);
} else {
return String.format("%dm %.2fs", ms / 60000, (ms % 60000) / 1000.0);
}
}
}
```
### 2. 任务监控界面
```java
package com.ecep.contract.manager.ui.task;
import com.ecep.contract.manager.SpringApp;
import com.ecep.contract.manager.ui.Tasker;
import javafx.application.Platform;
import javafx.beans.property.SimpleStringProperty;
import javafx.collections.ListChangeListener;
import javafx.fxml.FXML;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.scene.control.*;
import javafx.scene.control.cell.PropertyValueFactory;
import javafx.stage.Modality;
import javafx.stage.Stage;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 任务监控界面控制器
*/
@Component
public class TaskMonitorViewController {
@FXML
private TableView<MonitoredTask<?>> activeTasksTable;
@FXML
private TableColumn<MonitoredTask<?>, String> activeTaskIdColumn;
@FXML
private TableColumn<MonitoredTask<?>, String> activeTaskTitleColumn;
@FXML
private TableColumn<MonitoredTask<?>, ProgressBar> activeTaskProgressColumn;
@FXML
private TableColumn<MonitoredTask<?>, String> activeTaskStatusColumn;
@FXML
private TableView<TaskHistory> historyTasksTable;
@FXML
private TableColumn<TaskHistory, String> historyTaskIdColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskTitleColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskStatusColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskStartTimeColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskExecutionTimeColumn;
@FXML
private Button cancelTaskButton;
@FXML
private Button clearHistoryButton;
private final TaskMonitorCenter taskMonitorCenter;
private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public TaskMonitorViewController() {
this.taskMonitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
}
@FXML
public void initialize() {
// 初始化活动任务表格
activeTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId"));
activeTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title"));
// 进度条列
activeTaskProgressColumn.setCellFactory(column -> new TableCell<>() {
private final ProgressBar progressBar = new ProgressBar();
@Override
protected void updateItem(ProgressBar item, boolean empty) {
super.updateItem(item, empty);
if (empty || getTableRow().getItem() == null) {
setGraphic(null);
} else {
MonitoredTask<?> task = getTableRow().getItem();
progressBar.progressProperty().bind(task.progressProperty());
setGraphic(progressBar);
}
}
});
// 状态列
activeTaskStatusColumn.setCellValueFactory(cellData -> {
MonitoredTask<?> task = cellData.getValue();
String status;
if (task.isRunning()) {
status = "运行中";
} else if (task.isCancelled()) {
status = "已取消";
} else if (task.isFailed()) {
status = "失败";
} else if (task.isSucceeded()) {
status = "成功";
} else {
status = "等待中";
}
return new SimpleStringProperty(status);
});
// 初始化历史任务表格
historyTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId"));
historyTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title"));
historyTaskStatusColumn.setCellValueFactory(cellData ->
new SimpleStringProperty(cellData.getValue().getStatus().getDescription()));
historyTaskStartTimeColumn.setCellValueFactory(cellData ->
new SimpleStringProperty(dateFormat.format(new Date(cellData.getValue().getStartTime()))));
historyTaskExecutionTimeColumn.setCellValueFactory(cellData ->
new SimpleStringProperty(cellData.getValue().getFormattedExecutionTime()));
// 绑定数据
activeTasksTable.setItems(taskMonitorCenter.getActiveTasks());
historyTasksTable.setItems(taskMonitorCenter.getTaskHistory());
// 添加任务双击事件(显示详情)
historyTasksTable.setRowFactory(tv -> {
TableRow<TaskHistory> row = new TableRow<>();
row.setOnMouseClicked(event -> {
if (event.getClickCount() == 2 && !row.isEmpty()) {
TaskHistory history = row.getItem();
showTaskDetails(history);
}
});
return row;
});
// 更新按钮状态
updateButtonStates();
activeTasksTable.getSelectionModel().selectedItemProperty().addListener((obs, oldVal, newVal) ->
updateButtonStates());
}
/**
* 更新按钮状态
*/
private void updateButtonStates() {
cancelTaskButton.setDisable(activeTasksTable.getSelectionModel().getSelectedItem() == null);
}
/**
* 取消选中的任务
*/
@FXML
private void onCancelTask() {
MonitoredTask<?> selectedTask = activeTasksTable.getSelectionModel().getSelectedItem();
if (selectedTask != null) {
taskMonitorCenter.cancelTask(selectedTask.getTaskId());
}
}
/**
* 清空历史记录
*/
@FXML
private void onClearHistory() {
Alert alert = new Alert(Alert.AlertType.CONFIRMATION);
alert.setTitle("确认操作");
alert.setHeaderText("清空任务历史");
alert.setContentText("确定要清空所有任务历史记录吗?此操作不可恢复。");
alert.showAndWait().ifPresent(response -> {
if (response == ButtonType.OK) {
taskMonitorCenter.getTaskHistory().clear();
}
});
}
/**
* 显示任务详情
*/
private void showTaskDetails(TaskHistory history) {
Alert alert = new Alert(Alert.AlertType.INFORMATION);
alert.setTitle("任务详情");
alert.setHeaderText(history.getTitle());
StringBuilder content = new StringBuilder();
content.append("任务ID: " + history.getTaskId() + "\n");
content.append("状态: " + history.getStatus().getDescription() + "\n");
content.append("开始时间: " + dateFormat.format(new Date(history.getStartTime())) + "\n");
content.append("结束时间: " + dateFormat.format(new Date(history.getEndTime())) + "\n");
content.append("执行耗时: " + history.getFormattedExecutionTime() + "\n");
if (history.getErrorMessage() != null) {
content.append("\n错误信息: " + history.getErrorMessage());
alert.getDialogPane().setExpandableContent(new TextArea(history.getErrorMessage()));
}
alert.setContentText(content.toString());
alert.showAndWait();
}
/**
* 显示任务监控窗口
*/
@SneakyThrows(IOException.class)
public static void show() {
FXMLLoader loader = new FXMLLoader(TaskMonitorViewController.class.getResource("/ui/task/TaskMonitorView.fxml"));
// 设置Spring为控制器工厂
loader.setControllerFactory(SpringApp::getBean);
Parent root = loader.load();
Stage stage = new Stage();
stage.setTitle("任务监控中心");
stage.setScene(new Scene(root, 800, 600));
stage.initModality(Modality.NONE); // 非模态窗口
stage.show();
}
}
```
### 3. 集成到现有系统
#### 3.1 修改UITools类
```java
package com.ecep.contract.manager.util;
import com.ecep.contract.manager.SpringApp;
import com.ecep.contract.manager.ui.Message;
import com.ecep.contract.manager.ui.Tasker;
import com.ecep.contract.manager.ui.task.TaskMonitorCenter;
import com.ecep.contract.manager.ui.task.MonitoredTask;
import javafx.application.Platform;
import javafx.concurrent.Task;
import javafx.scene.Node;
import javafx.scene.control.*;
import javafx.scene.layout.VBox;
import java.util.Optional;
import java.util.function.Consumer;
public class UITools {
// 其他方法保持不变...
/**
* 显示一个对话框, 并等待用户关闭对话框
*
* @param title 对话框标题
* @param task 任务
* @param init 初始化
*/
public static void showTaskDialogAndWait(String title, Task<?> task, Consumer<Consumer<Message>> init) {
Dialog<Message> dialog = createDialog();
dialog.getDialogPane().getScene().getStylesheets().add("/ui/dialog.css");
dialog.setTitle(title);
DialogPane dialogPane = dialog.getDialogPane();
VBox box = (VBox) dialogPane.getContent();
Label headerLabel = (Label) box.lookup("#header");
ListView<Message> listView = (ListView<Message>) box.lookup("#list-view");
listView.setCellFactory(param -> new Tasker.MessageListCell());
headerLabel.textProperty().bind(task.titleProperty());
Consumer<Message> consumer = message -> {
if (Platform.isFxApplicationThread()) {
listView.getItems().add(message);
} else {
Platform.runLater(() -> listView.getItems().add(message));
}
};
if (task instanceof Tasker<?> tasker) {
tasker.setMessageHandler(consumer);
} else {
task.messageProperty().addListener((observable, oldValue, newValue) -> {
consumer.accept(Message.info(newValue));
});
}
// 加一个进度条
ProgressBar progressBar = new ProgressBar(0);
progressBar.setPrefHeight(12);
progressBar.setPrefWidth(200);
progressBar.prefWidthProperty().bind(box.widthProperty().subtract(10));
progressBar.setMinHeight(12);
progressBar.setVisible(true);
VBox.setMargin(progressBar, new Insets(6, 0, 6, 0));
Platform.runLater(() -> {
box.getChildren().add(progressBar);
});
progressBar.visibleProperty().bind(task.runningProperty());
progressBar.progressProperty().bind(task.progressProperty());
// 使用任务监控中心启动任务
if (task instanceof Tasker<?> tasker) {
// 注册到监控中心但不在这里启动因为showAndWait会阻塞UI线程
TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
MonitoredTask<?> monitoredTask = monitorCenter.registerTask(tasker);
}
if (init != null) {
init.accept(consumer);
}
dialog.showAndWait();
if (task.isRunning()) {
task.cancel();
}
}
// 其他方法保持不变...
}
```
#### 3.2 修改SpringApp配置
```java
package com.ecep.contract.manager;
import com.ecep.contract.manager.ui.task.TaskMonitorCenter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.DomainScan;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.concurrent.ScheduledExecutorService;
@SpringBootApplication
@EnableJpaRepositories(basePackages = {"com.ecep.contract.manager.ds"})
@DomainScan(basePackages = {"com.ecep.contract.manager.ds"})
@EnableScheduling
@EnableAsync
public class SpringApp {
private static ConfigurableApplicationContext context;
// 其他方法保持不变...
public static void main(String[] args) {
context = SpringApplication.run(SpringApp.class, args);
// 初始化任务监控中心
TaskMonitorCenter monitorCenter = context.getBean(TaskMonitorCenter.class);
// 可以在这里添加一些初始化代码
}
// 其他方法保持不变...
}
```
### 4. 超时处理机制
```java
package com.ecep.contract.manager.util;
import java.util.concurrent.*;
import java.util.function.Supplier;
/**
* 异步任务工具类 - 提供超时处理等增强功能
*/
public class AsyncUtils {
/**
* 执行带超时的CompletableFuture任务
*/
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
return withTimeout(future, timeout, unit);
}
/**
* 执行带超时的CompletableFuture任务并指定线程池
*/
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, Executor executor, long timeout, TimeUnit unit) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, executor);
return withTimeout(future, timeout, unit);
}
/**
* 为已有的CompletableFuture添加超时处理
*/
public static <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "timeout-scheduler");
thread.setDaemon(true);
return thread;
});
scheduler.schedule(() -> {
TimeoutException timeoutException = new TimeoutException("Task timed out after " + timeout + " " + unit.name());
timeoutFuture.completeExceptionally(timeoutException);
}, timeout, unit);
return future.applyToEither(timeoutFuture, t -> t)
.whenComplete((t, ex) -> scheduler.shutdownNow());
}
/**
* 带重试机制的CompletableFuture任务
*/
public static <T> CompletableFuture<T> supplyAsyncWithRetry(Supplier<T> supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit) {
return supplyAsyncWithRetry(supplier, maxRetries, delayBetweenRetries, unit, null);
}
/**
* 带重试机制的CompletableFuture任务并指定哪些异常需要重试
*/
public static <T> CompletableFuture<T> supplyAsyncWithRetry(
Supplier<T> supplier,
int maxRetries,
long delayBetweenRetries,
TimeUnit unit,
Class<? extends Exception>[] retryableExceptions) {
CompletableFuture<T> resultFuture = new CompletableFuture<>();
// 执行任务的方法
executeWithRetry(supplier, resultFuture, 0, maxRetries, delayBetweenRetries, unit, retryableExceptions);
return resultFuture;
}
private static <T> void executeWithRetry(
Supplier<T> supplier,
CompletableFuture<T> resultFuture,
int currentAttempt,
int maxRetries,
long delayBetweenRetries,
TimeUnit unit,
Class<? extends Exception>[] retryableExceptions) {
CompletableFuture.supplyAsync(supplier)
.thenAccept(resultFuture::complete)
.exceptionally(ex -> {
if (currentAttempt < maxRetries && shouldRetry(ex, retryableExceptions)) {
// 延迟后重试
try {
Thread.sleep(unit.toMillis(delayBetweenRetries));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
resultFuture.completeExceptionally(ie);
return null;
}
// 递归调用,增加尝试次数
executeWithRetry(supplier, resultFuture, currentAttempt + 1,
maxRetries, delayBetweenRetries, unit, retryableExceptions);
} else {
// 达到最大重试次数或异常不可重试完成Future
resultFuture.completeExceptionally(ex);
}
return null;
});
}
private static boolean shouldRetry(Throwable ex, Class<? extends Exception>[] retryableExceptions) {
// 如果未指定重试异常类型,则所有异常都重试
if (retryableExceptions == null || retryableExceptions.length == 0) {
return true;
}
// 检查异常是否属于可重试类型
for (Class<? extends Exception> exceptionClass : retryableExceptions) {
if (exceptionClass.isInstance(ex.getCause())) {
return true;
}
}
return false;
}
}
```
## 使用示例
### 1. 基本任务监控
```java
// 在任何需要执行异步任务的地方
Tasker<?> task = new ContractSyncTask();
TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
// 注册任务设置30秒超时
monitorCenter.registerTask(task, 30L);
// 显示任务监控窗口
TaskMonitorViewController.show();
```
### 2. 带超时处理的CompletableFuture
```java
// 使用AsyncUtils工具类执行带超时的异步任务
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithTimeout(() -> {
// 执行耗时操作
Thread.sleep(5000);
return "任务完成";
}, 3, TimeUnit.SECONDS);
// 处理结果
future.thenAccept(result -> System.out.println("结果: " + result))
.exceptionally(ex -> {
if (ex.getCause() instanceof TimeoutException) {
System.out.println("任务执行超时");
} else {
System.out.println("任务执行异常: " + ex.getMessage());
}
return null;
});
```
### 3. 带重试机制的任务
```java
// 使用AsyncUtils工具类执行带重试的异步任务
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithRetry(() -> {
// 模拟一个可能失败的操作
if (Math.random() > 0.7) {
throw new RuntimeException("随机失败");
}
return "操作成功";
}, 3, 1, TimeUnit.SECONDS, new Class[]{RuntimeException.class});
// 处理结果
future.thenAccept(result -> System.out.println("结果: " + result))
.exceptionally(ex -> {
System.out.println("多次重试后仍然失败: " + ex.getMessage());
return null;
});
```
## 实现要点总结
1. **统一任务管理**:通过`TaskMonitorCenter`集中管理所有异步任务,提供注册、监控和取消功能
2. **完整生命周期监控**:跟踪任务从创建、运行到完成/失败/取消的整个生命周期
3. **超时处理机制**:为任务设置超时时间,防止任务无限期阻塞
4. **历史记录追踪**:保存任务执行历史,方便问题排查和性能分析
5. **可视化界面**:提供直观的任务监控界面,展示任务进度和状态
6. **重试策略**:针对失败任务提供可配置的重试机制
7. **与现有系统集成**:无缝集成到现有任务调度机制中
这套实现方案可以显著提升系统的稳定性和可维护性,防止长时间运行的任务占用系统资源,并为用户提供更好的任务执行状态反馈。