Skip to content

Instantly share code, notes, and snippets.

@aclisp
Last active April 30, 2024 11:44
Show Gist options
  • Save aclisp/f7e8843fc14f388b03b55bbbc03e7bb9 to your computer and use it in GitHub Desktop.
Save aclisp/f7e8843fc14f388b03b55bbbc03e7bb9 to your computer and use it in GitHub Desktop.
一个分布式调度器,能执行持续10小时以上的长时间定时任务
package cloud.youxin.logserver.controller;
import cloud.youxin.logserver.config.SchedulerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.SimpleTriggerContext;
import org.springframework.stereotype.Component;
import oryx.utils.widget.ShortUUID;
import tech.rongxin.oryx.wechatbot.core.MessageSender;
import javax.annotation.PreDestroy;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
/**
* 这一个类就实现了一个分布式调度器,能执行持续10小时以上的长时间定时任务
*/
@Component
@Slf4j
public class LongLivedScheduler {
// 依赖一个自己的调度器,不跟Spring框架里的混淆
@Autowired @Qualifier(SchedulerConfig.LONG_LIVED_TASK_SCHEDULER)
private TaskScheduler scheduler;
// 依赖Redis做分布式协调
@Autowired
private RedisTemplate<String, String> redis;
// 发告警用
@Autowired
private MessageSender messageSender;
// 用来协调的redis脚本
private final RedisScript<Boolean> SET_GREATER_THAN_SCRIPT = new DefaultRedisScript<>("\n" +
" local key = KEYS[1]\n" +
" local value = tonumber(ARGV[1])\n" +
"\n" +
" if redis.call(\"EXISTS\", key) == 1 then\n" +
" local oldValue = tonumber(redis.call('GET', key))\n" +
"\n" +
" if value <= oldValue then\n" +
" return false\n" +
" end\n" +
" end\n" +
"\n" +
" redis.call('SET', key, value, 'EX', 600)\n" +
"\n" +
" return true\n", Boolean.class);
private final String REDIS_NAMESPACE = SchedulerConfig.LONG_LIVED_TASK_SCHEDULER + "-sync";
// 活跃的定时任务列表,每个定时任务(有自己的线程)自行维护此列表,所以用线程安全的Vector
private final Vector<LongLivedTask> activeTasks = new Vector<>();
// 活跃的定时任务的上限,超过则本节点不调度,让渡给分布式的它节点调度
public final int MAX_ACTIVE = 4;
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
log.info("开始准备分布式调度器");
// TODO 从数据库加载(反复加载)定时任务
// 示例:如何调度一个任务
runCron("*/10 * * * * *", "cronTest");
runAt(Instant.now().plus(13, ChronoUnit.SECONDS), "runOnceTest");
}
@PreDestroy
public void onDestroy() {
log.info("开始停止分布式调度器: 当前活跃任务数 {}", countActiveTask());
if (countActiveTask() > 0) {
messageSender.fatal("定时流分布式调度器", "有一个服务节点["+ System.getenv("HOSTNAME") +"]重启了,强行结束了"+countActiveTask()+"个正在执行的定时任务");
}
activeTasks.forEach(LongLivedTask::stop);
}
private LongLivedTask runCron(String cron, String id) {
CronTrigger cronTrigger = new CronTrigger(cron);
LongLivedTask task = new LongLivedTask(this, cronTrigger, id);
ScheduledFuture<?> future = scheduler.schedule(task::start, cronTrigger);
task.setScheduledFuture(future);
return task;
}
private LongLivedTask runAt(Instant at, String id) {
LongLivedTask task = new LongLivedTask(this, at, id);
ScheduledFuture<?> future = scheduler.schedule(task::start, at);
task.setScheduledFuture(future);
return task;
}
/**
* 分布式协调核心函数:每个节点执行任务之前用 setClock 设置下一次执行的时间戳,设置成功的节点获得执行任务的资格。
* 由于每个节点都加载同样的任务,而同样的任务其下一次执行的时间戳肯定一样,因此必然只有一个节点获得资格。
*/
public boolean setClock(String key, long timestamp) {
Boolean ret = redis.execute(SET_GREATER_THAN_SCRIPT, Collections.singletonList(
REDIS_NAMESPACE + ":clock:" + key
), String.valueOf(timestamp));
return Boolean.TRUE.equals(ret);
}
public void resetClock(String key) {
redis.delete(REDIS_NAMESPACE + ":clock:" + key);
}
public void addActiveTask(LongLivedTask task) {
activeTasks.add(task);
}
public void removeActiveTask(LongLivedTask task) {
activeTasks.remove(task);
}
public int countActiveTask() {
return activeTasks.size();
}
public boolean canStart() {
return countActiveTask() < MAX_ACTIVE;
}
// TODO 要查数据库里的活跃任务,而不是本地
public boolean hasActiveTask(String taskId) {
List<Boolean> holder = new ArrayList<>(1);
holder.add(false);
activeTasks.forEach(task -> {
if (taskId.equals(task.getTaskId())) {
holder.set(0, true);
}
});
return holder.get(0);
}
}
@Slf4j
class LongLivedTask {
private final String taskId; // 每个任务的ID
private final LongLivedScheduler scheduler; // 每个任务都持有调度器的引用
private Thread thread; // 长时间定时任务的执行线程,每个任务都有自己的
private ScheduledFuture<?> scheduledFuture; // 调度句柄,用于取消调度
private CronTrigger cronTrigger; // 重复任务才有
private Instant instant; // 一次性任务才有
public LongLivedTask(LongLivedScheduler scheduler, CronTrigger cron, String id) {
this.scheduler = scheduler;
this.cronTrigger = cron;
this.taskId = id;
}
public LongLivedTask(LongLivedScheduler scheduler, Instant at, String id) {
this.scheduler = scheduler;
this.instant = at;
this.taskId = id;
}
public void start() {
if (!scheduler.canStart()) {
log.info("本节点[{}]容量满了,任务[{}]无法启动: active={}, max={}", System.getenv("HOSTNAME"), getTaskKey(), scheduler.countActiveTask(), scheduler.MAX_ACTIVE);
return;
}
if (thread != null && thread.isAlive()) {
log.info("存在执行中的任务[{}],不再重复启动", getTaskKey());
return;
}
if (scheduler.hasActiveTask(taskId)) {
log.info("存在执行中的任务[{}],不再重复启动", getTaskKey());
return;
}
Date nextDate = nextExecutionTime();
boolean wasSet = scheduler.setClock(getTaskKey(), nextDate.getTime());
if (!wasSet) {
log.info("本节点[{}]没有获得执行任务[{}]的资格", System.getenv("HOSTNAME"), getTaskKey());
return;
}
String execId = ShortUUID.generate();
log.info("任务在节点[{}]上启动: {} ({})", System.getenv("HOSTNAME"), getDetail(), execId);
thread = new Thread(() -> {
try {
this.doWork();
log.info("任务结束 ({})", execId);
} catch (Throwable t) {
log.error("任务异常 ({})", execId, t);
} finally {
scheduler.removeActiveTask(this);
}
}, "LLT" + execId);
thread.setDaemon(true);
thread.start();
scheduler.addActiveTask(this);
}
private void doWork() {
try {Thread.sleep(68000);} catch (InterruptedException e) { /**/ }
}
public void stop() {
scheduledFuture.cancel(false);
if (thread != null && thread.isAlive()) {
thread.interrupt();
try {thread.join(100);} catch (InterruptedException e) { /**/ }
thread = null;
}
scheduler.resetClock(getTaskKey());
}
public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}
public Date nextExecutionTime() {
if (cronTrigger != null) {
return cronTrigger.nextExecutionTime(new SimpleTriggerContext());
}
if (instant != null) {
return Date.from(instant);
}
throw new IllegalStateException("LongLivedTask既不是定时任务也不是一次性任务");
}
public String getTaskId() {
return taskId;
}
public String getTaskKey() {
return taskId + ":" + getRule();
}
public String getRule() {
if (cronTrigger != null) {
return cronTrigger.getExpression();
}
if (instant != null) {
return instant.toString();
}
return "";
}
public String getDetail() {
if (cronTrigger != null) {
return "定时任务[" + getTaskKey() + "]";
}
if (instant != null) {
return "一次性任务[" + getTaskKey() + "]";
}
return "错误";
}
}
/**
* 管理从数据库加载的定时任务
*/
class DatabaseTaskLoader {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment