Files
contract-manager/client/src/main/java/com/ecep/contract/util/AsyncUtils.java
2025-09-03 20:56:44 +08:00

122 lines
4.6 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.ecep.contract.util;
import java.util.concurrent.*;
import java.util.function.Supplier;
import com.ecep.contract.Desktop;
/**
* 异步任务工具类 - 提供超时处理等增强功能
*/
public class AsyncUtils {
/**
* 执行带超时的CompletableFuture任务
*/
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
return withTimeout(future, timeout, unit);
}
/**
* 执行带超时的CompletableFuture任务并指定线程池
*/
public static <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, Executor executor, long timeout,
TimeUnit unit) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, executor);
return withTimeout(future, timeout, unit);
}
/**
* 为已有的CompletableFuture添加超时处理
*/
public static <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
ScheduledExecutorService executor = Desktop.instance.getExecutorService();
ScheduledFuture<?> schedule = executor.schedule(() -> {
TimeoutException timeoutException = new TimeoutException(
"Task timed out after " + timeout + " " + unit.name());
timeoutFuture.completeExceptionally(timeoutException);
}, timeout, unit);
;
return future.applyToEither(timeoutFuture, t -> t)
.whenComplete((t, ex) -> schedule.cancel(true));
}
/**
* 带重试机制的CompletableFuture任务
*/
public static <T> CompletableFuture<T> supplyAsyncWithRetry(Supplier<T> supplier, int maxRetries,
long delayBetweenRetries, TimeUnit unit) {
return supplyAsyncWithRetry(supplier, maxRetries, delayBetweenRetries, unit, null);
}
/**
* 带重试机制的CompletableFuture任务并指定哪些异常需要重试
*/
public static <T> CompletableFuture<T> supplyAsyncWithRetry(
Supplier<T> supplier,
int maxRetries,
long delayBetweenRetries,
TimeUnit unit,
Class<? extends Exception>[] retryableExceptions) {
CompletableFuture<T> resultFuture = new CompletableFuture<>();
// 执行任务的方法
executeWithRetry(supplier, resultFuture, 0, maxRetries, delayBetweenRetries, unit, retryableExceptions);
return resultFuture;
}
private static <T> void executeWithRetry(
Supplier<T> supplier,
CompletableFuture<T> resultFuture,
int currentAttempt,
int maxRetries,
long delayBetweenRetries,
TimeUnit unit,
Class<? extends Exception>[] retryableExceptions) {
CompletableFuture.supplyAsync(supplier)
.thenAccept(resultFuture::complete)
.exceptionally(ex -> {
if (currentAttempt < maxRetries && shouldRetry(ex, retryableExceptions)) {
// 延迟后重试
try {
Thread.sleep(unit.toMillis(delayBetweenRetries));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
resultFuture.completeExceptionally(ie);
return null;
}
// 递归调用,增加尝试次数
executeWithRetry(supplier, resultFuture, currentAttempt + 1,
maxRetries, delayBetweenRetries, unit, retryableExceptions);
} else {
// 达到最大重试次数或异常不可重试完成Future
resultFuture.completeExceptionally(ex);
}
return null;
});
}
private static boolean shouldRetry(Throwable ex, Class<? extends Exception>[] retryableExceptions) {
// 如果未指定重试异常类型,则所有异常都重试
if (retryableExceptions == null || retryableExceptions.length == 0) {
return true;
}
// 检查异常是否属于可重试类型
for (Class<? extends Exception> exceptionClass : retryableExceptions) {
if (exceptionClass.isInstance(ex.getCause())) {
return true;
}
}
return false;
}
}