package com.ecep.contract.manager.util; import java.util.concurrent.*; import java.util.function.Supplier; import com.ecep.contract.manager.Desktop; /** * 异步任务工具类 - 提供超时处理等增强功能 */ public class AsyncUtils { /** * 执行带超时的CompletableFuture任务 */ public static CompletableFuture supplyAsyncWithTimeout(Supplier supplier, long timeout, TimeUnit unit) { CompletableFuture future = CompletableFuture.supplyAsync(supplier); return withTimeout(future, timeout, unit); } /** * 执行带超时的CompletableFuture任务,并指定线程池 */ public static CompletableFuture supplyAsyncWithTimeout(Supplier supplier, Executor executor, long timeout, TimeUnit unit) { CompletableFuture future = CompletableFuture.supplyAsync(supplier, executor); return withTimeout(future, timeout, unit); } /** * 为已有的CompletableFuture添加超时处理 */ public static CompletableFuture withTimeout(CompletableFuture future, long timeout, TimeUnit unit) { CompletableFuture timeoutFuture = new CompletableFuture<>(); ScheduledExecutorService executor = Desktop.instance.getExecutorService(); ScheduledFuture schedule = executor.schedule(() -> { TimeoutException timeoutException = new TimeoutException( "Task timed out after " + timeout + " " + unit.name()); timeoutFuture.completeExceptionally(timeoutException); }, timeout, unit); ; return future.applyToEither(timeoutFuture, t -> t) .whenComplete((t, ex) -> schedule.cancel(true)); } /** * 带重试机制的CompletableFuture任务 */ public static CompletableFuture supplyAsyncWithRetry(Supplier supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit) { return supplyAsyncWithRetry(supplier, maxRetries, delayBetweenRetries, unit, null); } /** * 带重试机制的CompletableFuture任务,并指定哪些异常需要重试 */ public static CompletableFuture supplyAsyncWithRetry( Supplier supplier, int maxRetries, long delayBetweenRetries, TimeUnit unit, Class[] retryableExceptions) { CompletableFuture resultFuture = new CompletableFuture<>(); // 执行任务的方法 executeWithRetry(supplier, resultFuture, 0, maxRetries, delayBetweenRetries, unit, retryableExceptions); return resultFuture; } private static void executeWithRetry( Supplier supplier, CompletableFuture resultFuture, int currentAttempt, int maxRetries, long delayBetweenRetries, TimeUnit unit, Class[] retryableExceptions) { CompletableFuture.supplyAsync(supplier) .thenAccept(resultFuture::complete) .exceptionally(ex -> { if (currentAttempt < maxRetries && shouldRetry(ex, retryableExceptions)) { // 延迟后重试 try { Thread.sleep(unit.toMillis(delayBetweenRetries)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); resultFuture.completeExceptionally(ie); return null; } // 递归调用,增加尝试次数 executeWithRetry(supplier, resultFuture, currentAttempt + 1, maxRetries, delayBetweenRetries, unit, retryableExceptions); } else { // 达到最大重试次数或异常不可重试,完成Future resultFuture.completeExceptionally(ex); } return null; }); } private static boolean shouldRetry(Throwable ex, Class[] retryableExceptions) { // 如果未指定重试异常类型,则所有异常都重试 if (retryableExceptions == null || retryableExceptions.length == 0) { return true; } // 检查异常是否属于可重试类型 for (Class exceptionClass : retryableExceptions) { if (exceptionClass.isInstance(ex.getCause())) { return true; } } return false; } }