基于 Java 定时任务及定时任务管理器的设计与实现
在现代软件系统中,经常会遇到需要 定时 或 周期性执行 某些操作的场景,例如:
- 定期检查数据状态并更新或归档
- 发送周期性通知或报表
- 对外部系统进行轮询,获取最新结果
- 监控队列或数据库,执行清理操作
针对这些场景,Java 提供了多种可选方案,如 Timer
、ScheduledExecutorService
、Spring @Scheduled
以及第三方调度框架 (Quartz、XXL-Job 等)。
本篇文章将重点介绍一种 基于 ScheduledExecutorService
封装的定时任务管理器 的实现思路,以及如何将业务逻辑与调度逻辑拆分,从而达到 高内聚、低耦合 、易维护的目的。
2. 定时任务管理器设计思路
为了解决定时任务中的复杂度和扩展性问题,我们将定时任务划分为两个核心模块:
-
业务逻辑(
BusinessTask
)- 定义任务执行逻辑,例如:定期查询状态、处理业务数据。
- 抽象接口
BusinessTask
,对外提供doTask()
方法,用于实现具体的业务逻辑。
-
调度服务(
TaskScheduler
)- 提供统一的任务管理接口,通过线程池管理和调度
BusinessTask
的执行。 - 支持任务的超时处理、间隔执行,以及手动停止。
- 提供统一的任务管理接口,通过线程池管理和调度
3. 代码实现
以下是定时任务和任务管理器的完整实现代码。
3.1 定义任务接口
/**
* 定义任务接口,用于实现不同的业务逻辑
*/
public interface BusinessTask {
/**
* 执行任务的核心逻辑
* @return true 表示任务完成;false 表示任务未完成,需要继续调度
*/
boolean doTask();
}
3.2 实现业务任务类
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 示例任务:处理未提交影像资料的逻辑
*/
@Slf4j
public class UnCommittedImagesHandleTask implements BusinessTask {
private final AtomicInteger executionCount; // 当前执行次数
private final AtomicBoolean isTaskSuccessful; // 任务完成标志
private final int maxExecutionCount; // 最大允许执行次数
public UnCommittedImagesHandleTask(int maxExecutionCount) {
this.maxExecutionCount = maxExecutionCount;
this.executionCount = new AtomicInteger(0);
this.isTaskSuccessful = new AtomicBoolean(false);
}
@Override
public boolean doTask() {
if (isTaskSuccessful.get()) {
log.info("任务已完成,不再执行...");
return true;
}
int count = executionCount.incrementAndGet();
log.info("当前执行次数: {}", count);
if (count >= maxExecutionCount) {
log.info("达到最大执行次数,任务完成");
isTaskSuccessful.set(true);
return true;
}
log.info("任务尚未完成,等待下一次执行...");
return false;
}
}
3.3 定时任务管理器
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
* 定时任务管理器,封装调度逻辑
*/
@Slf4j
public class TaskScheduler {
private final ScheduledExecutorService scheduledExecutorService;
public TaskScheduler() {
this.scheduledExecutorService = Executors.newScheduledThreadPool(4);
}
/**
* 提交定时任务
*
* @param task 业务任务
* @param initialDelay 初始延迟时间(秒)
* @param period 执行间隔(秒)
* @param timeout 超时时间(秒)
* @return 返回 ScheduledFuture,用于控制任务
*/
public ScheduledFuture<?> scheduleTask(BusinessTask task, long initialDelay, long period, long timeout) {
AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
Runnable taskWrapper = () -> {
try {
boolean done = task.doTask();
if (done) {
log.info("任务完成,取消调度...");
cancelTask(futureRef.get());
}
} catch (Exception e) {
log.error("任务执行异常", e);
}
};
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
taskWrapper,
initialDelay,
period,
TimeUnit.SECONDS
);
futureRef.set(scheduledFuture);
scheduledExecutorService.schedule(() -> {
if (!scheduledFuture.isDone()) {
log.warn("任务超时,自动取消...");
cancelTask(scheduledFuture);
}
}, timeout, TimeUnit.SECONDS);
return scheduledFuture;
}
/**
* 取消任务
*/
public void cancelTask(ScheduledFuture<?> future) {
if (future != null && !future.isCancelled()) {
future.cancel(true);
log.info("任务已手动取消");
}
}
/**
* 关闭调度器
*/
public void shutdown() {
log.info("关闭任务调度器...");
scheduledExecutorService.shutdown();
}
}
3.4 测试入口
public class Main {
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
UnCommittedImagesHandleTask task = new UnCommittedImagesHandleTask(3);
ScheduledFuture<?> future = scheduler.scheduleTask(task, 1, 2, 10);
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.cancelTask(future);
scheduler.shutdown();
}
}
4. 优化与扩展
-
线程池配置
- 支持通过配置文件调整线程池大小和队列容量。
-
监控与日志
- 集成监控系统,记录任务运行状态、错误次数和超时事件。
-
分布式调度
- 使用 Quartz、Elastic-Job 等框架实现跨节点任务调度。
5. 总结
通过将任务调度与业务逻辑解耦,我们构建了一个灵活的定时任务管理模型:
- 任务管理器 负责调度、取消和超时处理。
- 业务任务类 专注于具体逻辑实现。
这种设计既提高了代码的可读性,也为任务管理扩展提供了更高的灵活性。