885 lines
29 KiB
Markdown
885 lines
29 KiB
Markdown
# 异步任务监控实现方案
|
||
|
||
## 现状分析
|
||
|
||
通过分析项目代码,我们发现当前系统的异步任务管理存在以下特点和不足:
|
||
|
||
1. **任务调度机制**:系统使用JavaFX的`Task`和Spring的`ScheduledExecutorService`进行异步任务调度
|
||
2. **任务基类**:`Tasker`类作为所有任务的基类,实现了基本的任务生命周期管理
|
||
3. **进度展示**:`UITools.showTaskDialogAndWait()`方法提供了简单的任务进度对话框
|
||
4. **异步编程**:项目广泛使用`CompletableFuture`进行异步操作
|
||
5. **主要不足**:
|
||
- 缺乏统一的任务监控中心
|
||
- 任务执行状态缺乏集中管理
|
||
- 大部分任务没有超时处理机制
|
||
- 任务执行历史无法追溯
|
||
- 缺少任务失败后的重试策略
|
||
|
||
## 实现方案
|
||
|
||
### 1. 任务监控核心组件
|
||
|
||
#### 1.1 任务监控中心
|
||
|
||
```java
|
||
package com.ecep.contract.manager.ui.task;
|
||
|
||
import com.ecep.contract.manager.ui.Tasker;
|
||
import javafx.collections.FXCollections;
|
||
import javafx.collections.ObservableList;
|
||
import lombok.Getter;
|
||
import org.slf4j.Logger;
|
||
import org.slf4j.LoggerFactory;
|
||
import org.springframework.stereotype.Component;
|
||
|
||
import java.util.Map;
|
||
import java.util.UUID;
|
||
import java.util.concurrent.ConcurrentHashMap;
|
||
import java.util.concurrent.atomic.AtomicInteger;
|
||
|
||
/**
|
||
* 任务监控中心 - 统一管理所有异步任务
|
||
*/
|
||
@Component
|
||
public class TaskMonitorCenter {
|
||
private static final Logger logger = LoggerFactory.getLogger(TaskMonitorCenter.class);
|
||
|
||
// 正在运行的任务列表
|
||
@Getter
|
||
private final ObservableList<MonitoredTask<?>> activeTasks = FXCollections.observableArrayList();
|
||
|
||
// 任务历史记录
|
||
@Getter
|
||
private final ObservableList<TaskHistory> taskHistory = FXCollections.observableArrayList();
|
||
|
||
// 任务ID映射
|
||
private final Map<String, MonitoredTask<?>> taskMap = new ConcurrentHashMap<>();
|
||
|
||
// 任务计数器
|
||
private final AtomicInteger taskCounter = new AtomicInteger(0);
|
||
|
||
/**
|
||
* 注册并启动任务
|
||
*/
|
||
public <T> MonitoredTask<T> registerTask(Tasker<T> task) {
|
||
return registerTask(task, null);
|
||
}
|
||
|
||
/**
|
||
* 注册并启动任务,支持设置超时时间
|
||
*/
|
||
public <T> MonitoredTask<T> registerTask(Tasker<T> task, Long timeoutSeconds) {
|
||
String taskId = generateTaskId();
|
||
MonitoredTask<T> monitoredTask = new MonitoredTask<>(task, taskId);
|
||
|
||
// 设置超时处理
|
||
if (timeoutSeconds != null && timeoutSeconds > 0) {
|
||
monitoredTask.setTimeout(timeoutSeconds);
|
||
}
|
||
|
||
// 添加任务状态变更监听器
|
||
monitoredTask.setOnSucceeded(event -> {
|
||
logger.info("任务[{}]执行成功: {}", taskId, task.getTitle());
|
||
recordHistory(monitoredTask, TaskStatus.SUCCEEDED);
|
||
removeActiveTask(monitoredTask);
|
||
});
|
||
|
||
monitoredTask.setOnFailed(event -> {
|
||
logger.error("任务[{}]执行失败: {}", taskId, task.getTitle(), task.getException());
|
||
recordHistory(monitoredTask, TaskStatus.FAILED);
|
||
removeActiveTask(monitoredTask);
|
||
});
|
||
|
||
monitoredTask.setOnCancelled(event -> {
|
||
logger.info("任务[{}]被取消: {}", taskId, task.getTitle());
|
||
recordHistory(monitoredTask, TaskStatus.CANCELLED);
|
||
removeActiveTask(monitoredTask);
|
||
});
|
||
|
||
// 保存任务引用并启动
|
||
taskMap.put(taskId, monitoredTask);
|
||
activeTasks.add(monitoredTask);
|
||
monitoredTask.start();
|
||
|
||
logger.info("任务[{}]已注册并启动: {}", taskId, task.getTitle());
|
||
return monitoredTask;
|
||
}
|
||
|
||
/**
|
||
* 取消任务
|
||
*/
|
||
public boolean cancelTask(String taskId) {
|
||
MonitoredTask<?> task = taskMap.get(taskId);
|
||
if (task != null && task.isRunning()) {
|
||
task.cancel();
|
||
return true;
|
||
}
|
||
return false;
|
||
}
|
||
|
||
/**
|
||
* 取消所有任务
|
||
*/
|
||
public void cancelAllTasks() {
|
||
for (MonitoredTask<?> task : new ArrayList<>(activeTasks)) {
|
||
task.cancel();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 从活动列表中移除任务
|
||
*/
|
||
private void removeActiveTask(MonitoredTask<?> task) {
|
||
Platform.runLater(() -> activeTasks.remove(task));
|
||
taskMap.remove(task.getTaskId());
|
||
}
|
||
|
||
/**
|
||
* 记录任务历史
|
||
*/
|
||
private void recordHistory(MonitoredTask<?> task, TaskStatus status) {
|
||
TaskHistory history = new TaskHistory(
|
||
task.getTaskId(),
|
||
task.getTitle(),
|
||
status,
|
||
task.getStartTime(),
|
||
System.currentTimeMillis()
|
||
);
|
||
|
||
if (task.getException() != null) {
|
||
history.setErrorMessage(task.getException().getMessage());
|
||
}
|
||
|
||
Platform.runLater(() -> {
|
||
taskHistory.add(0, history);
|
||
// 只保留最近100条历史记录
|
||
if (taskHistory.size() > 100) {
|
||
taskHistory.remove(taskHistory.size() - 1);
|
||
}
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 生成任务ID
|
||
*/
|
||
private String generateTaskId() {
|
||
return "task-" + UUID.randomUUID().toString().substring(0, 8) + "-" + taskCounter.incrementAndGet();
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 1.2 监控任务包装类
|
||
|
||
```java
|
||
package com.ecep.contract.manager.ui.task;
|
||
|
||
import com.ecep.contract.manager.ui.Tasker;
|
||
import javafx.concurrent.Task;
|
||
import lombok.Getter;
|
||
import lombok.Setter;
|
||
|
||
import java.util.concurrent.Executors;
|
||
import java.util.concurrent.ScheduledExecutorService;
|
||
import java.util.concurrent.TimeUnit;
|
||
import java.util.concurrent.atomic.AtomicBoolean;
|
||
|
||
/**
|
||
* 被监控的任务包装类
|
||
*/
|
||
public class MonitoredTask<T> extends Task<T> {
|
||
@Getter
|
||
private final Tasker<T> delegate;
|
||
@Getter
|
||
private final String taskId;
|
||
@Getter
|
||
private final long startTime;
|
||
@Setter
|
||
private Long timeoutSeconds;
|
||
private ScheduledExecutorService timeoutExecutor;
|
||
private final AtomicBoolean timeoutHandled = new AtomicBoolean(false);
|
||
|
||
public MonitoredTask(Tasker<T> delegate, String taskId) {
|
||
this.delegate = delegate;
|
||
this.taskId = taskId;
|
||
this.startTime = System.currentTimeMillis();
|
||
|
||
// 绑定属性
|
||
titleProperty().bind(delegate.titleProperty());
|
||
messageProperty().bind(delegate.messageProperty());
|
||
progressProperty().bind(delegate.progressProperty());
|
||
}
|
||
|
||
@Override
|
||
protected T call() throws Exception {
|
||
// 设置超时监控
|
||
if (timeoutSeconds != null) {
|
||
setupTimeoutMonitor();
|
||
}
|
||
|
||
try {
|
||
return delegate.call();
|
||
} finally {
|
||
// 取消超时监控
|
||
if (timeoutExecutor != null) {
|
||
timeoutExecutor.shutdownNow();
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 设置超时监控
|
||
*/
|
||
private void setupTimeoutMonitor() {
|
||
timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
|
||
Thread thread = new Thread(r, "task-timeout-monitor-" + taskId);
|
||
thread.setDaemon(true);
|
||
return thread;
|
||
});
|
||
|
||
timeoutExecutor.schedule(() -> {
|
||
if (isRunning() && timeoutHandled.compareAndSet(false, true)) {
|
||
updateMessage("任务执行超时,正在取消...");
|
||
cancel();
|
||
}
|
||
}, timeoutSeconds, TimeUnit.SECONDS);
|
||
}
|
||
|
||
@Override
|
||
public boolean cancel() {
|
||
boolean cancelled = super.cancel();
|
||
if (cancelled) {
|
||
delegate.cancel();
|
||
}
|
||
return cancelled;
|
||
}
|
||
|
||
/**
|
||
* 启动任务
|
||
*/
|
||
public void start() {
|
||
Thread thread = new Thread(this, "task-" + taskId);
|
||
thread.setDaemon(true);
|
||
thread.start();
|
||
}
|
||
|
||
/**
|
||
* 获取任务执行耗时
|
||
*/
|
||
public long getExecutionTime() {
|
||
return System.currentTimeMillis() - startTime;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 1.3 任务状态和历史记录类
|
||
|
||
```java
|
||
package com.ecep.contract.manager.ui.task;
|
||
|
||
import lombok.AllArgsConstructor;
|
||
import lombok.Data;
|
||
import lombok.NoArgsConstructor;
|
||
|
||
/**
|
||
* 任务状态枚举
|
||
*/
|
||
public enum TaskStatus {
|
||
SCHEDULED("已调度"),
|
||
RUNNING("运行中"),
|
||
SUCCEEDED("成功"),
|
||
FAILED("失败"),
|
||
CANCELLED("已取消"),
|
||
TIMED_OUT("超时");
|
||
|
||
private final String description;
|
||
|
||
TaskStatus(String description) {
|
||
this.description = description;
|
||
}
|
||
|
||
public String getDescription() {
|
||
return description;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 任务历史记录
|
||
*/
|
||
@Data
|
||
@NoArgsConstructor
|
||
@AllArgsConstructor
|
||
public class TaskHistory {
|
||
private String taskId;
|
||
private String title;
|
||
private TaskStatus status;
|
||
private long startTime;
|
||
private long endTime;
|
||
private String errorMessage;
|
||
|
||
/**
|
||
* 获取任务执行耗时(毫秒)
|
||
*/
|
||
public long getExecutionTime() {
|
||
return endTime - startTime;
|
||
}
|
||
|
||
/**
|
||
* 获取格式化的执行耗时
|
||
*/
|
||
public String getFormattedExecutionTime() {
|
||
long ms = getExecutionTime();
|
||
if (ms < 1000) {
|
||
return ms + "ms";
|
||
} else if (ms < 60000) {
|
||
return String.format("%.2fs", ms / 1000.0);
|
||
} else {
|
||
return String.format("%dm %.2fs", ms / 60000, (ms % 60000) / 1000.0);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 2. 任务监控界面
|
||
|
||
```java
|
||
package com.ecep.contract.manager.ui.task;
|
||
|
||
import com.ecep.contract.manager.SpringApp;
|
||
import com.ecep.contract.manager.ui.Tasker;
|
||
import javafx.application.Platform;
|
||
import javafx.beans.property.SimpleStringProperty;
|
||
import javafx.collections.ListChangeListener;
|
||
import javafx.fxml.FXML;
|
||
import javafx.fxml.FXMLLoader;
|
||
import javafx.scene.Parent;
|
||
import javafx.scene.Scene;
|
||
import javafx.scene.control.*;
|
||
import javafx.scene.control.cell.PropertyValueFactory;
|
||
import javafx.stage.Modality;
|
||
import javafx.stage.Stage;
|
||
import lombok.SneakyThrows;
|
||
import org.springframework.stereotype.Component;
|
||
|
||
import java.io.IOException;
|
||
import java.text.SimpleDateFormat;
|
||
import java.util.Date;
|
||
|
||
/**
|
||
* 任务监控界面控制器
|
||
*/
|
||
@Component
|
||
public class TaskMonitorViewController {
|
||
@FXML
|
||
private TableView<MonitoredTask<?>> activeTasksTable;
|
||
@FXML
|
||
private TableColumn<MonitoredTask<?>, String> activeTaskIdColumn;
|
||
@FXML
|
||
private TableColumn<MonitoredTask<?>, String> activeTaskTitleColumn;
|
||
@FXML
|
||
private TableColumn<MonitoredTask<?>, ProgressBar> activeTaskProgressColumn;
|
||
@FXML
|
||
private TableColumn<MonitoredTask<?>, String> activeTaskStatusColumn;
|
||
|
||
@FXML
|
||
private TableView<TaskHistory> historyTasksTable;
|
||
@FXML
|
||
private TableColumn<TaskHistory, String> historyTaskIdColumn;
|
||
@FXML
|
||
private TableColumn<TaskHistory, String> historyTaskTitleColumn;
|
||
@FXML
|
||
private TableColumn<TaskHistory, String> historyTaskStatusColumn;
|
||
@FXML
|
||
private TableColumn<TaskHistory, String> historyTaskStartTimeColumn;
|
||
@FXML
|
||
private TableColumn<TaskHistory, String> historyTaskExecutionTimeColumn;
|
||
|
||
@FXML
|
||
private Button cancelTaskButton;
|
||
@FXML
|
||
private Button clearHistoryButton;
|
||
|
||
private final TaskMonitorCenter taskMonitorCenter;
|
||
private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||
|
||
public TaskMonitorViewController() {
|
||
this.taskMonitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
|
||
}
|
||
|
||
@FXML
|
||
public void initialize() {
|
||
// 初始化活动任务表格
|
||
activeTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId"));
|
||
activeTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title"));
|
||
|
||
// 进度条列
|
||
activeTaskProgressColumn.setCellFactory(column -> new TableCell<>() {
|
||
private final ProgressBar progressBar = new ProgressBar();
|
||
|
||
@Override
|
||
protected void updateItem(ProgressBar item, boolean empty) {
|
||
super.updateItem(item, empty);
|
||
if (empty || getTableRow().getItem() == null) {
|
||
setGraphic(null);
|
||
} else {
|
||
MonitoredTask<?> task = getTableRow().getItem();
|
||
progressBar.progressProperty().bind(task.progressProperty());
|
||
setGraphic(progressBar);
|
||
}
|
||
}
|
||
});
|
||
|
||
// 状态列
|
||
activeTaskStatusColumn.setCellValueFactory(cellData -> {
|
||
MonitoredTask<?> task = cellData.getValue();
|
||
String status;
|
||
if (task.isRunning()) {
|
||
status = "运行中";
|
||
} else if (task.isCancelled()) {
|
||
status = "已取消";
|
||
} else if (task.isFailed()) {
|
||
status = "失败";
|
||
} else if (task.isSucceeded()) {
|
||
status = "成功";
|
||
} else {
|
||
status = "等待中";
|
||
}
|
||
return new SimpleStringProperty(status);
|
||
});
|
||
|
||
// 初始化历史任务表格
|
||
historyTaskIdColumn.setCellValueFactory(new PropertyValueFactory<>("taskId"));
|
||
historyTaskTitleColumn.setCellValueFactory(new PropertyValueFactory<>("title"));
|
||
historyTaskStatusColumn.setCellValueFactory(cellData ->
|
||
new SimpleStringProperty(cellData.getValue().getStatus().getDescription()));
|
||
historyTaskStartTimeColumn.setCellValueFactory(cellData ->
|
||
new SimpleStringProperty(dateFormat.format(new Date(cellData.getValue().getStartTime()))));
|
||
historyTaskExecutionTimeColumn.setCellValueFactory(cellData ->
|
||
new SimpleStringProperty(cellData.getValue().getFormattedExecutionTime()));
|
||
|
||
// 绑定数据
|
||
activeTasksTable.setItems(taskMonitorCenter.getActiveTasks());
|
||
historyTasksTable.setItems(taskMonitorCenter.getTaskHistory());
|
||
|
||
// 添加任务双击事件(显示详情)
|
||
historyTasksTable.setRowFactory(tv -> {
|
||
TableRow<TaskHistory> row = new TableRow<>();
|
||
row.setOnMouseClicked(event -> {
|
||
if (event.getClickCount() == 2 && !row.isEmpty()) {
|
||
TaskHistory history = row.getItem();
|
||
showTaskDetails(history);
|
||
}
|
||
});
|
||
return row;
|
||
});
|
||
|
||
// 更新按钮状态
|
||
updateButtonStates();
|
||
activeTasksTable.getSelectionModel().selectedItemProperty().addListener((obs, oldVal, newVal) ->
|
||
updateButtonStates());
|
||
}
|
||
|
||
/**
|
||
* 更新按钮状态
|
||
*/
|
||
private void updateButtonStates() {
|
||
cancelTaskButton.setDisable(activeTasksTable.getSelectionModel().getSelectedItem() == null);
|
||
}
|
||
|
||
/**
|
||
* 取消选中的任务
|
||
*/
|
||
@FXML
|
||
private void onCancelTask() {
|
||
MonitoredTask<?> selectedTask = activeTasksTable.getSelectionModel().getSelectedItem();
|
||
if (selectedTask != null) {
|
||
taskMonitorCenter.cancelTask(selectedTask.getTaskId());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 清空历史记录
|
||
*/
|
||
@FXML
|
||
private void onClearHistory() {
|
||
Alert alert = new Alert(Alert.AlertType.CONFIRMATION);
|
||
alert.setTitle("确认操作");
|
||
alert.setHeaderText("清空任务历史");
|
||
alert.setContentText("确定要清空所有任务历史记录吗?此操作不可恢复。");
|
||
|
||
alert.showAndWait().ifPresent(response -> {
|
||
if (response == ButtonType.OK) {
|
||
taskMonitorCenter.getTaskHistory().clear();
|
||
}
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 显示任务详情
|
||
*/
|
||
private void showTaskDetails(TaskHistory history) {
|
||
Alert alert = new Alert(Alert.AlertType.INFORMATION);
|
||
alert.setTitle("任务详情");
|
||
alert.setHeaderText(history.getTitle());
|
||
|
||
StringBuilder content = new StringBuilder();
|
||
content.append("任务ID: " + history.getTaskId() + "\n");
|
||
content.append("状态: " + history.getStatus().getDescription() + "\n");
|
||
content.append("开始时间: " + dateFormat.format(new Date(history.getStartTime())) + "\n");
|
||
content.append("结束时间: " + dateFormat.format(new Date(history.getEndTime())) + "\n");
|
||
content.append("执行耗时: " + history.getFormattedExecutionTime() + "\n");
|
||
|
||
if (history.getErrorMessage() != null) {
|
||
content.append("\n错误信息: " + history.getErrorMessage());
|
||
alert.getDialogPane().setExpandableContent(new TextArea(history.getErrorMessage()));
|
||
}
|
||
|
||
alert.setContentText(content.toString());
|
||
alert.showAndWait();
|
||
}
|
||
|
||
/**
|
||
* 显示任务监控窗口
|
||
*/
|
||
@SneakyThrows(IOException.class)
|
||
public static void show() {
|
||
FXMLLoader loader = new FXMLLoader(TaskMonitorViewController.class.getResource("/ui/task/TaskMonitorView.fxml"));
|
||
// 设置Spring为控制器工厂
|
||
loader.setControllerFactory(SpringApp::getBean);
|
||
|
||
Parent root = loader.load();
|
||
Stage stage = new Stage();
|
||
stage.setTitle("任务监控中心");
|
||
stage.setScene(new Scene(root, 800, 600));
|
||
stage.initModality(Modality.NONE); // 非模态窗口
|
||
stage.show();
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3. 集成到现有系统
|
||
|
||
#### 3.1 修改UITools类
|
||
|
||
```java
|
||
package com.ecep.contract.manager.util;
|
||
|
||
import com.ecep.contract.manager.SpringApp;
|
||
import com.ecep.contract.manager.ui.Message;
|
||
import com.ecep.contract.manager.ui.Tasker;
|
||
import com.ecep.contract.manager.ui.task.TaskMonitorCenter;
|
||
import com.ecep.contract.manager.ui.task.MonitoredTask;
|
||
import javafx.application.Platform;
|
||
import javafx.concurrent.Task;
|
||
import javafx.scene.Node;
|
||
import javafx.scene.control.*;
|
||
import javafx.scene.layout.VBox;
|
||
|
||
import java.util.Optional;
|
||
import java.util.function.Consumer;
|
||
|
||
public class UITools {
|
||
// 其他方法保持不变...
|
||
|
||
/**
|
||
* 显示一个对话框, 并等待用户关闭对话框
|
||
*
|
||
* @param title 对话框标题
|
||
* @param task 任务
|
||
* @param init 初始化
|
||
*/
|
||
public static void showTaskDialogAndWait(String title, Task<?> task, Consumer<Consumer<Message>> init) {
|
||
Dialog<Message> dialog = createDialog();
|
||
dialog.getDialogPane().getScene().getStylesheets().add("/ui/dialog.css");
|
||
dialog.setTitle(title);
|
||
|
||
DialogPane dialogPane = dialog.getDialogPane();
|
||
VBox box = (VBox) dialogPane.getContent();
|
||
Label headerLabel = (Label) box.lookup("#header");
|
||
ListView<Message> listView = (ListView<Message>) box.lookup("#list-view");
|
||
listView.setCellFactory(param -> new Tasker.MessageListCell());
|
||
|
||
headerLabel.textProperty().bind(task.titleProperty());
|
||
|
||
Consumer<Message> consumer = message -> {
|
||
if (Platform.isFxApplicationThread()) {
|
||
listView.getItems().add(message);
|
||
} else {
|
||
Platform.runLater(() -> listView.getItems().add(message));
|
||
}
|
||
};
|
||
if (task instanceof Tasker<?> tasker) {
|
||
tasker.setMessageHandler(consumer);
|
||
} else {
|
||
task.messageProperty().addListener((observable, oldValue, newValue) -> {
|
||
consumer.accept(Message.info(newValue));
|
||
});
|
||
}
|
||
|
||
// 加一个进度条
|
||
ProgressBar progressBar = new ProgressBar(0);
|
||
progressBar.setPrefHeight(12);
|
||
progressBar.setPrefWidth(200);
|
||
progressBar.prefWidthProperty().bind(box.widthProperty().subtract(10));
|
||
progressBar.setMinHeight(12);
|
||
progressBar.setVisible(true);
|
||
VBox.setMargin(progressBar, new Insets(6, 0, 6, 0));
|
||
|
||
Platform.runLater(() -> {
|
||
box.getChildren().add(progressBar);
|
||
});
|
||
|
||
progressBar.visibleProperty().bind(task.runningProperty());
|
||
progressBar.progressProperty().bind(task.progressProperty());
|
||
|
||
// 使用任务监控中心启动任务
|
||
if (task instanceof Tasker<?> tasker) {
|
||
// 注册到监控中心,但不在这里启动,因为showAndWait会阻塞UI线程
|
||
TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
|
||
MonitoredTask<?> monitoredTask = monitorCenter.registerTask(tasker);
|
||
}
|
||
|
||
if (init != null) {
|
||
init.accept(consumer);
|
||
}
|
||
|
||
dialog.showAndWait();
|
||
if (task.isRunning()) {
|
||
task.cancel();
|
||
}
|
||
}
|
||
|
||
// 其他方法保持不变...
|
||
}
|
||
```
|
||
|
||
#### 3.2 修改SpringApp配置
|
||
|
||
```java
|
||
package com.ecep.contract.manager;
|
||
|
||
import com.ecep.contract.manager.ui.task.TaskMonitorCenter;
|
||
import org.springframework.boot.SpringApplication;
|
||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||
import org.springframework.boot.autoconfigure.domain.DomainScan;
|
||
import org.springframework.cache.annotation.EnableCaching;
|
||
import org.springframework.context.ConfigurableApplicationContext;
|
||
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
|
||
import org.springframework.scheduling.annotation.EnableAsync;
|
||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||
|
||
import java.util.concurrent.ScheduledExecutorService;
|
||
|
||
@SpringBootApplication
|
||
@EnableJpaRepositories(basePackages = {"com.ecep.contract.manager.ds"})
|
||
@DomainScan(basePackages = {"com.ecep.contract.manager.ds"})
|
||
@EnableScheduling
|
||
@EnableAsync
|
||
public class SpringApp {
|
||
private static ConfigurableApplicationContext context;
|
||
|
||
// 其他方法保持不变...
|
||
|
||
public static void main(String[] args) {
|
||
context = SpringApplication.run(SpringApp.class, args);
|
||
|
||
// 初始化任务监控中心
|
||
TaskMonitorCenter monitorCenter = context.getBean(TaskMonitorCenter.class);
|
||
// 可以在这里添加一些初始化代码
|
||
}
|
||
|
||
// 其他方法保持不变...
|
||
}
|
||
```
|
||
|
||
### 4. 超时处理机制
|
||
|
||
```java
|
||
package com.ecep.contract.manager.util;
|
||
|
||
import java.util.concurrent.*;
|
||
import java.util.function.Supplier;
|
||
|
||
/**
|
||
* 异步任务工具类 - 提供超时处理等增强功能
|
||
*/
|
||
public class AsyncUtils {
|
||
/**
|
||
* 执行带超时的CompletableFuture任务
|
||
*/
|
||
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
|
||
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
|
||
return withTimeout(future, timeout, unit);
|
||
}
|
||
|
||
/**
|
||
* 执行带超时的CompletableFuture任务,并指定线程池
|
||
*/
|
||
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, Executor executor, long timeout, TimeUnit unit) {
|
||
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, executor);
|
||
return withTimeout(future, timeout, unit);
|
||
}
|
||
|
||
/**
|
||
* 为已有的CompletableFuture添加超时处理
|
||
*/
|
||
public static <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
|
||
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
|
||
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
||
Thread thread = new Thread(r, "timeout-scheduler");
|
||
thread.setDaemon(true);
|
||
return thread;
|
||
});
|
||
|
||
scheduler.schedule(() -> {
|
||
TimeoutException timeoutException = new TimeoutException("Task timed out after " + timeout + " " + unit.name());
|
||
timeoutFuture.completeExceptionally(timeoutException);
|
||
}, timeout, unit);
|
||
|
||
return future.applyToEither(timeoutFuture, t -> t)
|
||
.whenComplete((t, ex) -> scheduler.shutdownNow());
|
||
}
|
||
|
||
/**
|
||
* 带重试机制的CompletableFuture任务
|
||
*/
|
||
public static <T> CompletableFuture<T> supplyAsyncWithRetry(Supplier<T> supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit) {
|
||
return supplyAsyncWithRetry(supplier, maxRetries, delayBetweenRetries, unit, null);
|
||
}
|
||
|
||
/**
|
||
* 带重试机制的CompletableFuture任务,并指定哪些异常需要重试
|
||
*/
|
||
public static <T> CompletableFuture<T> supplyAsyncWithRetry(
|
||
Supplier<T> supplier,
|
||
int maxRetries,
|
||
long delayBetweenRetries,
|
||
TimeUnit unit,
|
||
Class<? extends Exception>[] retryableExceptions) {
|
||
|
||
CompletableFuture<T> resultFuture = new CompletableFuture<>();
|
||
|
||
// 执行任务的方法
|
||
executeWithRetry(supplier, resultFuture, 0, maxRetries, delayBetweenRetries, unit, retryableExceptions);
|
||
|
||
return resultFuture;
|
||
}
|
||
|
||
private static <T> void executeWithRetry(
|
||
Supplier<T> supplier,
|
||
CompletableFuture<T> resultFuture,
|
||
int currentAttempt,
|
||
int maxRetries,
|
||
long delayBetweenRetries,
|
||
TimeUnit unit,
|
||
Class<? extends Exception>[] retryableExceptions) {
|
||
|
||
CompletableFuture.supplyAsync(supplier)
|
||
.thenAccept(resultFuture::complete)
|
||
.exceptionally(ex -> {
|
||
if (currentAttempt < maxRetries && shouldRetry(ex, retryableExceptions)) {
|
||
// 延迟后重试
|
||
try {
|
||
Thread.sleep(unit.toMillis(delayBetweenRetries));
|
||
} catch (InterruptedException ie) {
|
||
Thread.currentThread().interrupt();
|
||
resultFuture.completeExceptionally(ie);
|
||
return null;
|
||
}
|
||
|
||
// 递归调用,增加尝试次数
|
||
executeWithRetry(supplier, resultFuture, currentAttempt + 1,
|
||
maxRetries, delayBetweenRetries, unit, retryableExceptions);
|
||
} else {
|
||
// 达到最大重试次数或异常不可重试,完成Future
|
||
resultFuture.completeExceptionally(ex);
|
||
}
|
||
return null;
|
||
});
|
||
}
|
||
|
||
private static boolean shouldRetry(Throwable ex, Class<? extends Exception>[] retryableExceptions) {
|
||
// 如果未指定重试异常类型,则所有异常都重试
|
||
if (retryableExceptions == null || retryableExceptions.length == 0) {
|
||
return true;
|
||
}
|
||
|
||
// 检查异常是否属于可重试类型
|
||
for (Class<? extends Exception> exceptionClass : retryableExceptions) {
|
||
if (exceptionClass.isInstance(ex.getCause())) {
|
||
return true;
|
||
}
|
||
}
|
||
|
||
return false;
|
||
}
|
||
}
|
||
```
|
||
|
||
## 使用示例
|
||
|
||
### 1. 基本任务监控
|
||
|
||
```java
|
||
// 在任何需要执行异步任务的地方
|
||
Tasker<?> task = new ContractSyncTask();
|
||
TaskMonitorCenter monitorCenter = SpringApp.getBean(TaskMonitorCenter.class);
|
||
// 注册任务,设置30秒超时
|
||
monitorCenter.registerTask(task, 30L);
|
||
|
||
// 显示任务监控窗口
|
||
TaskMonitorViewController.show();
|
||
```
|
||
|
||
### 2. 带超时处理的CompletableFuture
|
||
|
||
```java
|
||
// 使用AsyncUtils工具类执行带超时的异步任务
|
||
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithTimeout(() -> {
|
||
// 执行耗时操作
|
||
Thread.sleep(5000);
|
||
return "任务完成";
|
||
}, 3, TimeUnit.SECONDS);
|
||
|
||
// 处理结果
|
||
future.thenAccept(result -> System.out.println("结果: " + result))
|
||
.exceptionally(ex -> {
|
||
if (ex.getCause() instanceof TimeoutException) {
|
||
System.out.println("任务执行超时");
|
||
} else {
|
||
System.out.println("任务执行异常: " + ex.getMessage());
|
||
}
|
||
return null;
|
||
});
|
||
```
|
||
|
||
### 3. 带重试机制的任务
|
||
|
||
```java
|
||
// 使用AsyncUtils工具类执行带重试的异步任务
|
||
CompletableFuture<String> future = AsyncUtils.supplyAsyncWithRetry(() -> {
|
||
// 模拟一个可能失败的操作
|
||
if (Math.random() > 0.7) {
|
||
throw new RuntimeException("随机失败");
|
||
}
|
||
return "操作成功";
|
||
}, 3, 1, TimeUnit.SECONDS, new Class[]{RuntimeException.class});
|
||
|
||
// 处理结果
|
||
future.thenAccept(result -> System.out.println("结果: " + result))
|
||
.exceptionally(ex -> {
|
||
System.out.println("多次重试后仍然失败: " + ex.getMessage());
|
||
return null;
|
||
});
|
||
```
|
||
|
||
## 实现要点总结
|
||
|
||
1. **统一任务管理**:通过`TaskMonitorCenter`集中管理所有异步任务,提供注册、监控和取消功能
|
||
2. **完整生命周期监控**:跟踪任务从创建、运行到完成/失败/取消的整个生命周期
|
||
3. **超时处理机制**:为任务设置超时时间,防止任务无限期阻塞
|
||
4. **历史记录追踪**:保存任务执行历史,方便问题排查和性能分析
|
||
5. **可视化界面**:提供直观的任务监控界面,展示任务进度和状态
|
||
6. **重试策略**:针对失败任务提供可配置的重试机制
|
||
7. **与现有系统集成**:无缝集成到现有任务调度机制中
|
||
|
||
这套实现方案可以显著提升系统的稳定性和可维护性,防止长时间运行的任务占用系统资源,并为用户提供更好的任务执行状态反馈。 |