wangguangwu
wangguangwu
发布于 2024-12-30 / 21 阅读
0
0

基于 Java 定时任务及定时任务管理器的设计与实现

基于 Java 定时任务及定时任务管理器的设计与实现

在现代软件系统中,经常会遇到需要 定时周期性执行 某些操作的场景,例如:

  • 定期检查数据状态并更新或归档
  • 发送周期性通知或报表
  • 对外部系统进行轮询,获取最新结果
  • 监控队列或数据库,执行清理操作

针对这些场景,Java 提供了多种可选方案,如 TimerScheduledExecutorServiceSpring @Scheduled 以及第三方调度框架 (Quartz、XXL-Job 等)。
本篇文章将重点介绍一种 基于 ScheduledExecutorService 封装的定时任务管理器 的实现思路,以及如何将业务逻辑与调度逻辑拆分,从而达到 高内聚、低耦合 、易维护的目的。

2. 定时任务管理器设计思路

为了解决定时任务中的复杂度和扩展性问题,我们将定时任务划分为两个核心模块:

  1. 业务逻辑(BusinessTask

    • 定义任务执行逻辑,例如:定期查询状态、处理业务数据。
    • 抽象接口 BusinessTask,对外提供 doTask() 方法,用于实现具体的业务逻辑。
  2. 调度服务(TaskScheduler

    • 提供统一的任务管理接口,通过线程池管理和调度 BusinessTask 的执行。
    • 支持任务的超时处理、间隔执行,以及手动停止。

3. 代码实现

以下是定时任务和任务管理器的完整实现代码。

3.1 定义任务接口

/**
 * 定义任务接口,用于实现不同的业务逻辑
 */
public interface BusinessTask {
    /**
     * 执行任务的核心逻辑
     * @return true 表示任务完成;false 表示任务未完成,需要继续调度
     */
    boolean doTask();
}

3.2 实现业务任务类

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 示例任务:处理未提交影像资料的逻辑
 */
@Slf4j
public class UnCommittedImagesHandleTask implements BusinessTask {

    private final AtomicInteger executionCount;   // 当前执行次数
    private final AtomicBoolean isTaskSuccessful; // 任务完成标志
    private final int maxExecutionCount;          // 最大允许执行次数

    public UnCommittedImagesHandleTask(int maxExecutionCount) {
        this.maxExecutionCount = maxExecutionCount;
        this.executionCount = new AtomicInteger(0);
        this.isTaskSuccessful = new AtomicBoolean(false);
    }

    @Override
    public boolean doTask() {
        if (isTaskSuccessful.get()) {
            log.info("任务已完成,不再执行...");
            return true;
        }

        int count = executionCount.incrementAndGet();
        log.info("当前执行次数: {}", count);

        if (count >= maxExecutionCount) {
            log.info("达到最大执行次数,任务完成");
            isTaskSuccessful.set(true);
            return true;
        }

        log.info("任务尚未完成,等待下一次执行...");
        return false;
    }
}

3.3 定时任务管理器

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 定时任务管理器,封装调度逻辑
 */
@Slf4j
public class TaskScheduler {

    private final ScheduledExecutorService scheduledExecutorService;

    public TaskScheduler() {
        this.scheduledExecutorService = Executors.newScheduledThreadPool(4);
    }

    /**
     * 提交定时任务
     *
     * @param task 业务任务
     * @param initialDelay 初始延迟时间(秒)
     * @param period 执行间隔(秒)
     * @param timeout 超时时间(秒)
     * @return 返回 ScheduledFuture,用于控制任务
     */
    public ScheduledFuture<?> scheduleTask(BusinessTask task, long initialDelay, long period, long timeout) {
        AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
        Runnable taskWrapper = () -> {
            try {
                boolean done = task.doTask();
                if (done) {
                    log.info("任务完成,取消调度...");
                    cancelTask(futureRef.get());
                }
            } catch (Exception e) {
                log.error("任务执行异常", e);
            }
        };

        ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
                taskWrapper,
                initialDelay,
                period,
                TimeUnit.SECONDS
        );

        futureRef.set(scheduledFuture);

        scheduledExecutorService.schedule(() -> {
            if (!scheduledFuture.isDone()) {
                log.warn("任务超时,自动取消...");
                cancelTask(scheduledFuture);
            }
        }, timeout, TimeUnit.SECONDS);

        return scheduledFuture;
    }

    /**
     * 取消任务
     */
    public void cancelTask(ScheduledFuture<?> future) {
        if (future != null && !future.isCancelled()) {
            future.cancel(true);
            log.info("任务已手动取消");
        }
    }

    /**
     * 关闭调度器
     */
    public void shutdown() {
        log.info("关闭任务调度器...");
        scheduledExecutorService.shutdown();
    }
}

3.4 测试入口

public class Main {
    public static void main(String[] args) {
        TaskScheduler scheduler = new TaskScheduler();
        UnCommittedImagesHandleTask task = new UnCommittedImagesHandleTask(3);

        ScheduledFuture<?> future = scheduler.scheduleTask(task, 1, 2, 10);

        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        scheduler.cancelTask(future);
        scheduler.shutdown();
    }
}

4. 优化与扩展

  1. 线程池配置

    • 支持通过配置文件调整线程池大小和队列容量。
  2. 监控与日志

    • 集成监控系统,记录任务运行状态、错误次数和超时事件。
  3. 分布式调度

    • 使用 Quartz、Elastic-Job 等框架实现跨节点任务调度。

5. 总结

通过将任务调度与业务逻辑解耦,我们构建了一个灵活的定时任务管理模型:

  • 任务管理器 负责调度、取消和超时处理。
  • 业务任务类 专注于具体逻辑实现。
    这种设计既提高了代码的可读性,也为任务管理扩展提供了更高的灵活性。

评论