29 KiB
29 KiB
异步任务监控实现方案
现状分析
通过分析项目代码,我们发现当前系统的异步任务管理存在以下特点和不足:
- 任务调度机制:系统使用JavaFX的
Task和Spring的ScheduledExecutorService进行异步任务调度 - 任务基类:
Tasker类作为所有任务的基类,实现了基本的任务生命周期管理 - 进度展示:
UITools.showTaskDialogAndWait()方法提供了简单的任务进度对话框 - 异步编程:项目广泛使用
CompletableFuture进行异步操作 - 主要不足:
- 缺乏统一的任务监控中心
- 任务执行状态缺乏集中管理
- 大部分任务没有超时处理机制
- 任务执行历史无法追溯
- 缺少任务失败后的重试策略
实现方案
1. 任务监控核心组件
1.1 任务监控中心
package com.ecep.contract.manager.ui.task;
import com.ecep.contract.manager.ui.Tasker;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 任务监控中心 - 统一管理所有异步任务
*/
@Component
public class TaskMonitorCenter {
private static final Logger logger = LoggerFactory.getLogger(TaskMonitorCenter.class);
// 正在运行的任务列表
@Getter
private final ObservableList<MonitoredTask<?>> activeTasks = FXCollections.observableArrayList();
// 任务历史记录
@Getter
private final ObservableList<TaskHistory> taskHistory = FXCollections.observableArrayList();
// 任务ID映射
private final Map<String, MonitoredTask<?>> taskMap = new ConcurrentHashMap<>();
// 任务计数器
private final AtomicInteger taskCounter = new AtomicInteger(0);
/**
* 注册并启动任务
*/
public <T> MonitoredTask<T> registerTask(Tasker<T> task) {
return registerTask(task, null);
}
/**
* 注册并启动任务,支持设置超时时间
*/
public <T> MonitoredTask<T> registerTask(Tasker<T> task, Long timeoutSeconds) {
String taskId = generateTaskId();
MonitoredTask<T> monitoredTask = new MonitoredTask<>(task, taskId);
// 设置超时处理
if (timeoutSeconds != null && timeoutSeconds > 0) {
monitoredTask.setTimeout(timeoutSeconds);
}
// 添加任务状态变更监听器
monitoredTask.setOnSucceeded(event -> {
logger.info("任务[{}]执行成功: {}", taskId, task.getTitle());
recordHistory(monitoredTask, TaskStatus.SUCCEEDED);
removeActiveTask(monitoredTask);
});
monitoredTask.setOnFailed(event -> {
logger.error("任务[{}]执行失败: {}", taskId, task.getTitle(), task.getException());
recordHistory(monitoredTask, TaskStatus.FAILED);
removeActiveTask(monitoredTask);
});
monitoredTask.setOnCancelled(event -> {
logger.info("任务[{}]被取消: {}", taskId, task.getTitle());
recordHistory(monitoredTask, TaskStatus.CANCELLED);
removeActiveTask(monitoredTask);
});
// 保存任务引用并启动
taskMap.put(taskId, monitoredTask);
activeTasks.add(monitoredTask);
monitoredTask.start();
logger.info("任务[{}]已注册并启动: {}", taskId, task.getTitle());
return monitoredTask;
}
/**
* 取消任务
*/
public boolean cancelTask(String taskId) {
MonitoredTask<?> task = taskMap.get(taskId);
if (task != null && task.isRunning()) {
task.cancel();
return true;
}
return false;
}
/**
* 取消所有任务
*/
public void cancelAllTasks() {
for (MonitoredTask<?> task : new ArrayList<>(activeTasks)) {
task.cancel();
}
}
/**
* 从活动列表中移除任务
*/
private void removeActiveTask(MonitoredTask<?> task) {
Platform.runLater(() -> activeTasks.remove(task));
taskMap.remove(task.getTaskId());
}
/**
* 记录任务历史
*/
private void recordHistory(MonitoredTask<?> task, TaskStatus status) {
TaskHistory history = new TaskHistory(
task.getTaskId(),
task.getTitle(),
status,
task.getStartTime(),
System.currentTimeMillis()
);
if (task.getException() != null) {
history.setErrorMessage(task.getException().getMessage());
}
Platform.runLater(() -> {
taskHistory.add(0, history);
// 只保留最近100条历史记录
if (taskHistory.size() > 100) {
taskHistory.remove(taskHistory.size() - 1);
}
});
}
/**
* 生成任务ID
*/
private String generateTaskId() {
return "task-" + UUID.randomUUID().toString().substring(0, 8) + "-" + taskCounter.incrementAndGet();
}
}
1.2 监控任务包装类
package com.ecep.contract.manager.ui.task;
import com.ecep.contract.manager.ui.Tasker;
import javafx.concurrent.Task;
import lombok.Getter;
import lombok.Setter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 被监控的任务包装类
*/
public class MonitoredTask<T> extends Task<T> {
@Getter
private final Tasker<T> delegate;
@Getter
private final String taskId;
@Getter
private final long startTime;
@Setter
private Long timeoutSeconds;
private ScheduledExecutorService timeoutExecutor;
private final AtomicBoolean timeoutHandled = new AtomicBoolean(false);
public MonitoredTask(Tasker<T> delegate, String taskId) {
this.delegate = delegate;
this.taskId = taskId;
this.startTime = System.currentTimeMillis();
// 绑定属性
titleProperty().bind(delegate.titleProperty());
messageProperty().bind(delegate.messageProperty());
progressProperty().bind(delegate.progressProperty());
}
@Override
protected T call() throws Exception {
// 设置超时监控
if (timeoutSeconds != null) {
setupTimeoutMonitor();
}
try {
return delegate.call();
} finally {
// 取消超时监控
if (timeoutExecutor != null) {
timeoutExecutor.shutdownNow();
}
}
}
/**
* 设置超时监控
*/
private void setupTimeoutMonitor() {
timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "task-timeout-monitor-" + taskId);
thread.setDaemon(true);
return thread;
});
timeoutExecutor.schedule(() -> {
if (isRunning() && timeoutHandled.compareAndSet(false, true)) {
updateMessage("任务执行超时,正在取消...");
cancel();
}
}, timeoutSeconds, TimeUnit.SECONDS);
}
@Override
public boolean cancel() {
boolean cancelled = super.cancel();
if (cancelled) {
delegate.cancel();
}
return cancelled;
}
/**
* 启动任务
*/
public void start() {
Thread thread = new Thread(this, "task-" + taskId);
thread.setDaemon(true);
thread.start();
}
/**
* 获取任务执行耗时
*/
public long getExecutionTime() {
return System.currentTimeMillis() - startTime;
}
}
1.3 任务状态和历史记录类
package com.ecep.contract.manager.ui.task;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 任务状态枚举
*/
public enum TaskStatus {
SCHEDULED("已调度"),
RUNNING("运行中"),
SUCCEEDED("成功"),
FAILED("失败"),
CANCELLED("已取消"),
TIMED_OUT("超时");
private final String description;
TaskStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
/**
* 任务历史记录
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskHistory {
private String taskId;
private String title;
private TaskStatus status;
private long startTime;
private long endTime;
private String errorMessage;
/**
* 获取任务执行耗时(毫秒)
*/
public long getExecutionTime() {
return endTime - startTime;
}
/**
* 获取格式化的执行耗时
*/
public String getFormattedExecutionTime() {
long ms = getExecutionTime();
if (ms < 1000) {
return ms + "ms";
} else if (ms < 60000) {
return String.format("%.2fs", ms / 1000.0);
} else {
return String.format("%dm %.2fs", ms / 60000, (ms % 60000) / 1000.0);
}
}
}
2. 任务监控界面
package com.ecep.contract.manager.ui.task;
import com.ecep.contract.manager.SpringApp;
import com.ecep.contract.manager.ui.Tasker;
import javafx.application.Platform;
import javafx.beans.property.SimpleStringProperty;
import javafx.collections.ListChangeListener;
import javafx.fxml.FXML;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.scene.control.*;
import javafx.scene.control.cell.PropertyValueFactory;
import javafx.stage.Modality;
import javafx.stage.Stage;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 任务监控界面控制器
*/
@Component
public class TaskMonitorViewController {
@FXML
private TableView<MonitoredTask<?>> activeTasksTable;
@FXML
private TableColumn<MonitoredTask<?>, String> activeTaskIdColumn;
@FXML
private TableColumn<MonitoredTask<?>, String> activeTaskTitleColumn;
@FXML
private TableColumn<MonitoredTask<?>, ProgressBar> activeTaskProgressColumn;
@FXML
private TableColumn<MonitoredTask<?>, String> activeTaskStatusColumn;
@FXML
private TableView<TaskHistory> historyTasksTable;
@FXML
private TableColumn<TaskHistory, String> historyTaskIdColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskTitleColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskStatusColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskStartTimeColumn;
@FXML
private TableColumn<TaskHistory, String> historyTaskExecutionTimeColumn;
@FXML
private Button cancelTaskButton;
@FXML
private Button clearHistoryButton;
private final TaskMonitorCenter taskMonitorCenter;
private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public TaskMonitorViewController() {
this.taskMonitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
}
@FXML
public void initialize() {
// 初始化活动任务表格
activeTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId"));
activeTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title"));
// 进度条列
activeTaskProgressColumn.setCellFactory(column -> new TableCell<>() {
private final ProgressBar progressBar = new ProgressBar();
@Override
protected void updateItem(ProgressBar item, boolean empty) {
super.updateItem(item, empty);
if (empty || getTableRow().getItem() == null) {
setGraphic(null);
} else {
MonitoredTask<?> task = getTableRow().getItem();
progressBar.progressProperty().bind(task.progressProperty());
setGraphic(progressBar);
}
}
});
// 状态列
activeTaskStatusColumn.setCellValueFactory(cellData -> {
MonitoredTask<?> task = cellData.getValue();
String status;
if (task.isRunning()) {
status = "运行中";
} else if (task.isCancelled()) {
status = "已取消";
} else if (task.isFailed()) {
status = "失败";
} else if (task.isSucceeded()) {
status = "成功";
} else {
status = "等待中";
}
return new SimpleStringProperty(status);
});
// 初始化历史任务表格
historyTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId"));
historyTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title"));
historyTaskStatusColumn.setCellValueFactory(cellData ->
new SimpleStringProperty(cellData.getValue().getStatus().getDescription()));
historyTaskStartTimeColumn.setCellValueFactory(cellData ->
new SimpleStringProperty(dateFormat.format(new Date(cellData.getValue().getStartTime()))));
historyTaskExecutionTimeColumn.setCellValueFactory(cellData ->
new SimpleStringProperty(cellData.getValue().getFormattedExecutionTime()));
// 绑定数据
activeTasksTable.setItems(taskMonitorCenter.getActiveTasks());
historyTasksTable.setItems(taskMonitorCenter.getTaskHistory());
// 添加任务双击事件(显示详情)
historyTasksTable.setRowFactory(tv -> {
TableRow<TaskHistory> row = new TableRow<>();
row.setOnMouseClicked(event -> {
if (event.getClickCount() == 2 && !row.isEmpty()) {
TaskHistory history = row.getItem();
showTaskDetails(history);
}
});
return row;
});
// 更新按钮状态
updateButtonStates();
activeTasksTable.getSelectionModel().selectedItemProperty().addListener((obs, oldVal, newVal) ->
updateButtonStates());
}
/**
* 更新按钮状态
*/
private void updateButtonStates() {
cancelTaskButton.setDisable(activeTasksTable.getSelectionModel().getSelectedItem() == null);
}
/**
* 取消选中的任务
*/
@FXML
private void onCancelTask() {
MonitoredTask<?> selectedTask = activeTasksTable.getSelectionModel().getSelectedItem();
if (selectedTask != null) {
taskMonitorCenter.cancelTask(selectedTask.getTaskId());
}
}
/**
* 清空历史记录
*/
@FXML
private void onClearHistory() {
Alert alert = new Alert(Alert.AlertType.CONFIRMATION);
alert.setTitle("确认操作");
alert.setHeaderText("清空任务历史");
alert.setContentText("确定要清空所有任务历史记录吗?此操作不可恢复。");
alert.showAndWait().ifPresent(response -> {
if (response == ButtonType.OK) {
taskMonitorCenter.getTaskHistory().clear();
}
});
}
/**
* 显示任务详情
*/
private void showTaskDetails(TaskHistory history) {
Alert alert = new Alert(Alert.AlertType.INFORMATION);
alert.setTitle("任务详情");
alert.setHeaderText(history.getTitle());
StringBuilder content = new StringBuilder();
content.append("任务ID: " + history.getTaskId() + "\n");
content.append("状态: " + history.getStatus().getDescription() + "\n");
content.append("开始时间: " + dateFormat.format(new Date(history.getStartTime())) + "\n");
content.append("结束时间: " + dateFormat.format(new Date(history.getEndTime())) + "\n");
content.append("执行耗时: " + history.getFormattedExecutionTime() + "\n");
if (history.getErrorMessage() != null) {
content.append("\n错误信息: " + history.getErrorMessage());
alert.getDialogPane().setExpandableContent(new TextArea(history.getErrorMessage()));
}
alert.setContentText(content.toString());
alert.showAndWait();
}
/**
* 显示任务监控窗口
*/
@SneakyThrows(IOException.class)
public static void show() {
FXMLLoader loader = new FXMLLoader(TaskMonitorViewController.class.getResource("/ui/task/TaskMonitorView.fxml"));
// 设置Spring为控制器工厂
loader.setControllerFactory(SpringApp::getBean);
Parent root = loader.load();
Stage stage = new Stage();
stage.setTitle("任务监控中心");
stage.setScene(new Scene(root, 800, 600));
stage.initModality(Modality.NONE); // 非模态窗口
stage.show();
}
}
3. 集成到现有系统
3.1 修改UITools类
package com.ecep.contract.manager.util;
import com.ecep.contract.manager.SpringApp;
import com.ecep.contract.manager.ui.Message;
import com.ecep.contract.manager.ui.Tasker;
import com.ecep.contract.manager.ui.task.TaskMonitorCenter;
import com.ecep.contract.manager.ui.task.MonitoredTask;
import javafx.application.Platform;
import javafx.concurrent.Task;
import javafx.scene.Node;
import javafx.scene.control.*;
import javafx.scene.layout.VBox;
import java.util.Optional;
import java.util.function.Consumer;
public class UITools {
// 其他方法保持不变...
/**
* 显示一个对话框, 并等待用户关闭对话框
*
* @param title 对话框标题
* @param task 任务
* @param init 初始化
*/
public static void showTaskDialogAndWait(String title, Task<?> task, Consumer<Consumer<Message>> init) {
Dialog<Message> dialog = createDialog();
dialog.getDialogPane().getScene().getStylesheets().add("/ui/dialog.css");
dialog.setTitle(title);
DialogPane dialogPane = dialog.getDialogPane();
VBox box = (VBox) dialogPane.getContent();
Label headerLabel = (Label) box.lookup("#header");
ListView<Message> listView = (ListView<Message>) box.lookup("#list-view");
listView.setCellFactory(param -> new Tasker.MessageListCell());
headerLabel.textProperty().bind(task.titleProperty());
Consumer<Message> consumer = message -> {
if (Platform.isFxApplicationThread()) {
listView.getItems().add(message);
} else {
Platform.runLater(() -> listView.getItems().add(message));
}
};
if (task instanceof Tasker<?> tasker) {
tasker.setMessageHandler(consumer);
} else {
task.messageProperty().addListener((observable, oldValue, newValue) -> {
consumer.accept(Message.info(newValue));
});
}
// 加一个进度条
ProgressBar progressBar = new ProgressBar(0);
progressBar.setPrefHeight(12);
progressBar.setPrefWidth(200);
progressBar.prefWidthProperty().bind(box.widthProperty().subtract(10));
progressBar.setMinHeight(12);
progressBar.setVisible(true);
VBox.setMargin(progressBar, new Insets(6, 0, 6, 0));
Platform.runLater(() -> {
box.getChildren().add(progressBar);
});
progressBar.visibleProperty().bind(task.runningProperty());
progressBar.progressProperty().bind(task.progressProperty());
// 使用任务监控中心启动任务
if (task instanceof Tasker<?> tasker) {
// 注册到监控中心,但不在这里启动,因为showAndWait会阻塞UI线程
TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
MonitoredTask<?> monitoredTask = monitorCenter.registerTask(tasker);
}
if (init != null) {
init.accept(consumer);
}
dialog.showAndWait();
if (task.isRunning()) {
task.cancel();
}
}
// 其他方法保持不变...
}
3.2 修改SpringApp配置
package com.ecep.contract.manager;
import com.ecep.contract.manager.ui.task.TaskMonitorCenter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.DomainScan;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.concurrent.ScheduledExecutorService;
@SpringBootApplication
@EnableJpaRepositories(basePackages = {"com.ecep.contract.manager.ds"})
@DomainScan(basePackages = {"com.ecep.contract.manager.ds"})
@EnableScheduling
@EnableAsync
public class SpringApp {
private static ConfigurableApplicationContext context;
// 其他方法保持不变...
public static void main(String[] args) {
context = SpringApplication.run(SpringApp.class, args);
// 初始化任务监控中心
TaskMonitorCenter monitorCenter = context.getBean(TaskMonitorCenter.class);
// 可以在这里添加一些初始化代码
}
// 其他方法保持不变...
}
4. 超时处理机制
package com.ecep.contract.manager.util;
import java.util.concurrent.*;
import java.util.function.Supplier;
/**
* 异步任务工具类 - 提供超时处理等增强功能
*/
public class AsyncUtils {
/**
* 执行带超时的CompletableFuture任务
*/
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
return withTimeout(future, timeout, unit);
}
/**
* 执行带超时的CompletableFuture任务,并指定线程池
*/
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, Executor executor, long timeout, TimeUnit unit) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, executor);
return withTimeout(future, timeout, unit);
}
/**
* 为已有的CompletableFuture添加超时处理
*/
public static <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "timeout-scheduler");
thread.setDaemon(true);
return thread;
});
scheduler.schedule(() -> {
TimeoutException timeoutException = new TimeoutException("Task timed out after " + timeout + " " + unit.name());
timeoutFuture.completeExceptionally(timeoutException);
}, timeout, unit);
return future.applyToEither(timeoutFuture, t -> t)
.whenComplete((t, ex) -> scheduler.shutdownNow());
}
/**
* 带重试机制的CompletableFuture任务
*/
public static <T> CompletableFuture<T> supplyAsyncWithRetry(Supplier<T> supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit) {
return supplyAsyncWithRetry(supplier, maxRetries, delayBetweenRetries, unit, null);
}
/**
* 带重试机制的CompletableFuture任务,并指定哪些异常需要重试
*/
public static <T> CompletableFuture<T> supplyAsyncWithRetry(
Supplier<T> supplier,
int maxRetries,
long delayBetweenRetries,
TimeUnit unit,
Class<? extends Exception>[] retryableExceptions) {
CompletableFuture<T> resultFuture = new CompletableFuture<>();
// 执行任务的方法
executeWithRetry(supplier, resultFuture, 0, maxRetries, delayBetweenRetries, unit, retryableExceptions);
return resultFuture;
}
private static <T> void executeWithRetry(
Supplier<T> supplier,
CompletableFuture<T> resultFuture,
int currentAttempt,
int maxRetries,
long delayBetweenRetries,
TimeUnit unit,
Class<? extends Exception>[] retryableExceptions) {
CompletableFuture.supplyAsync(supplier)
.thenAccept(resultFuture::complete)
.exceptionally(ex -> {
if (currentAttempt < maxRetries && shouldRetry(ex, retryableExceptions)) {
// 延迟后重试
try {
Thread.sleep(unit.toMillis(delayBetweenRetries));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
resultFuture.completeExceptionally(ie);
return null;
}
// 递归调用,增加尝试次数
executeWithRetry(supplier, resultFuture, currentAttempt + 1,
maxRetries, delayBetweenRetries, unit, retryableExceptions);
} else {
// 达到最大重试次数或异常不可重试,完成Future
resultFuture.completeExceptionally(ex);
}
return null;
});
}
private static boolean shouldRetry(Throwable ex, Class<? extends Exception>[] retryableExceptions) {
// 如果未指定重试异常类型,则所有异常都重试
if (retryableExceptions == null || retryableExceptions.length == 0) {
return true;
}
// 检查异常是否属于可重试类型
for (Class<? extends Exception> exceptionClass : retryableExceptions) {
if (exceptionClass.isInstance(ex.getCause())) {
return true;
}
}
return false;
}
}
使用示例
1. 基本任务监控
// 在任何需要执行异步任务的地方
Tasker<?> task = new ContractSyncTask();
TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
// 注册任务,设置30秒超时
monitorCenter.registerTask(task, 30L);
// 显示任务监控窗口
TaskMonitorViewController.show();
2. 带超时处理的CompletableFuture
// 使用AsyncUtils工具类执行带超时的异步任务
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithTimeout(() -> {
// 执行耗时操作
Thread.sleep(5000);
return "任务完成";
}, 3, TimeUnit.SECONDS);
// 处理结果
future.thenAccept(result -> System.out.println("结果: " + result))
.exceptionally(ex -> {
if (ex.getCause() instanceof TimeoutException) {
System.out.println("任务执行超时");
} else {
System.out.println("任务执行异常: " + ex.getMessage());
}
return null;
});
3. 带重试机制的任务
// 使用AsyncUtils工具类执行带重试的异步任务
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithRetry(() -> {
// 模拟一个可能失败的操作
if (Math.random() > 0.7) {
throw new RuntimeException("随机失败");
}
return "操作成功";
}, 3, 1, TimeUnit.SECONDS, new Class[]{RuntimeException.class});
// 处理结果
future.thenAccept(result -> System.out.println("结果: " + result))
.exceptionally(ex -> {
System.out.println("多次重试后仍然失败: " + ex.getMessage());
return null;
});
实现要点总结
- 统一任务管理:通过
TaskMonitorCenter集中管理所有异步任务,提供注册、监控和取消功能 - 完整生命周期监控:跟踪任务从创建、运行到完成/失败/取消的整个生命周期
- 超时处理机制:为任务设置超时时间,防止任务无限期阻塞
- 历史记录追踪:保存任务执行历史,方便问题排查和性能分析
- 可视化界面:提供直观的任务监控界面,展示任务进度和状态
- 重试策略:针对失败任务提供可配置的重试机制
- 与现有系统集成:无缝集成到现有任务调度机制中
这套实现方案可以显著提升系统的稳定性和可维护性,防止长时间运行的任务占用系统资源,并为用户提供更好的任务执行状态反馈。