Last active
April 30, 2024 11:44
-
-
Save aclisp/f7e8843fc14f388b03b55bbbc03e7bb9 to your computer and use it in GitHub Desktop.
一个分布式调度器,能执行持续10小时以上的长时间定时任务
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cloud.youxin.logserver.controller; | |
import cloud.youxin.logserver.config.SchedulerConfig; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.boot.context.event.ApplicationReadyEvent; | |
import org.springframework.context.event.EventListener; | |
import org.springframework.data.redis.core.RedisTemplate; | |
import org.springframework.data.redis.core.script.DefaultRedisScript; | |
import org.springframework.data.redis.core.script.RedisScript; | |
import org.springframework.scheduling.TaskScheduler; | |
import org.springframework.scheduling.support.CronTrigger; | |
import org.springframework.scheduling.support.SimpleTriggerContext; | |
import org.springframework.stereotype.Component; | |
import oryx.utils.widget.ShortUUID; | |
import tech.rongxin.oryx.wechatbot.core.MessageSender; | |
import javax.annotation.PreDestroy; | |
import java.time.Instant; | |
import java.time.temporal.ChronoUnit; | |
import java.util.*; | |
import java.util.concurrent.ScheduledFuture; | |
/** | |
* 这一个类就实现了一个分布式调度器,能执行持续10小时以上的长时间定时任务 | |
*/ | |
@Component | |
@Slf4j | |
public class LongLivedScheduler { | |
// 依赖一个自己的调度器,不跟Spring框架里的混淆 | |
@Autowired @Qualifier(SchedulerConfig.LONG_LIVED_TASK_SCHEDULER) | |
private TaskScheduler scheduler; | |
// 依赖Redis做分布式协调 | |
@Autowired | |
private RedisTemplate<String, String> redis; | |
// 发告警用 | |
@Autowired | |
private MessageSender messageSender; | |
// 用来协调的redis脚本 | |
private final RedisScript<Boolean> SET_GREATER_THAN_SCRIPT = new DefaultRedisScript<>("\n" + | |
" local key = KEYS[1]\n" + | |
" local value = tonumber(ARGV[1])\n" + | |
"\n" + | |
" if redis.call(\"EXISTS\", key) == 1 then\n" + | |
" local oldValue = tonumber(redis.call('GET', key))\n" + | |
"\n" + | |
" if value <= oldValue then\n" + | |
" return false\n" + | |
" end\n" + | |
" end\n" + | |
"\n" + | |
" redis.call('SET', key, value, 'EX', 600)\n" + | |
"\n" + | |
" return true\n", Boolean.class); | |
private final String REDIS_NAMESPACE = SchedulerConfig.LONG_LIVED_TASK_SCHEDULER + "-sync"; | |
// 活跃的定时任务列表,每个定时任务(有自己的线程)自行维护此列表,所以用线程安全的Vector | |
private final Vector<LongLivedTask> activeTasks = new Vector<>(); | |
// 活跃的定时任务的上限,超过则本节点不调度,让渡给分布式的它节点调度 | |
public final int MAX_ACTIVE = 4; | |
@EventListener(ApplicationReadyEvent.class) | |
public void onApplicationReady() { | |
log.info("开始准备分布式调度器"); | |
// TODO 从数据库加载(反复加载)定时任务 | |
// 示例:如何调度一个任务 | |
runCron("*/10 * * * * *", "cronTest"); | |
runAt(Instant.now().plus(13, ChronoUnit.SECONDS), "runOnceTest"); | |
} | |
@PreDestroy | |
public void onDestroy() { | |
log.info("开始停止分布式调度器: 当前活跃任务数 {}", countActiveTask()); | |
if (countActiveTask() > 0) { | |
messageSender.fatal("定时流分布式调度器", "有一个服务节点["+ System.getenv("HOSTNAME") +"]重启了,强行结束了"+countActiveTask()+"个正在执行的定时任务"); | |
} | |
activeTasks.forEach(LongLivedTask::stop); | |
} | |
private LongLivedTask runCron(String cron, String id) { | |
CronTrigger cronTrigger = new CronTrigger(cron); | |
LongLivedTask task = new LongLivedTask(this, cronTrigger, id); | |
ScheduledFuture<?> future = scheduler.schedule(task::start, cronTrigger); | |
task.setScheduledFuture(future); | |
return task; | |
} | |
private LongLivedTask runAt(Instant at, String id) { | |
LongLivedTask task = new LongLivedTask(this, at, id); | |
ScheduledFuture<?> future = scheduler.schedule(task::start, at); | |
task.setScheduledFuture(future); | |
return task; | |
} | |
/** | |
* 分布式协调核心函数:每个节点执行任务之前用 setClock 设置下一次执行的时间戳,设置成功的节点获得执行任务的资格。 | |
* 由于每个节点都加载同样的任务,而同样的任务其下一次执行的时间戳肯定一样,因此必然只有一个节点获得资格。 | |
*/ | |
public boolean setClock(String key, long timestamp) { | |
Boolean ret = redis.execute(SET_GREATER_THAN_SCRIPT, Collections.singletonList( | |
REDIS_NAMESPACE + ":clock:" + key | |
), String.valueOf(timestamp)); | |
return Boolean.TRUE.equals(ret); | |
} | |
public void resetClock(String key) { | |
redis.delete(REDIS_NAMESPACE + ":clock:" + key); | |
} | |
public void addActiveTask(LongLivedTask task) { | |
activeTasks.add(task); | |
} | |
public void removeActiveTask(LongLivedTask task) { | |
activeTasks.remove(task); | |
} | |
public int countActiveTask() { | |
return activeTasks.size(); | |
} | |
public boolean canStart() { | |
return countActiveTask() < MAX_ACTIVE; | |
} | |
// TODO 要查数据库里的活跃任务,而不是本地 | |
public boolean hasActiveTask(String taskId) { | |
List<Boolean> holder = new ArrayList<>(1); | |
holder.add(false); | |
activeTasks.forEach(task -> { | |
if (taskId.equals(task.getTaskId())) { | |
holder.set(0, true); | |
} | |
}); | |
return holder.get(0); | |
} | |
} | |
@Slf4j | |
class LongLivedTask { | |
private final String taskId; // 每个任务的ID | |
private final LongLivedScheduler scheduler; // 每个任务都持有调度器的引用 | |
private Thread thread; // 长时间定时任务的执行线程,每个任务都有自己的 | |
private ScheduledFuture<?> scheduledFuture; // 调度句柄,用于取消调度 | |
private CronTrigger cronTrigger; // 重复任务才有 | |
private Instant instant; // 一次性任务才有 | |
public LongLivedTask(LongLivedScheduler scheduler, CronTrigger cron, String id) { | |
this.scheduler = scheduler; | |
this.cronTrigger = cron; | |
this.taskId = id; | |
} | |
public LongLivedTask(LongLivedScheduler scheduler, Instant at, String id) { | |
this.scheduler = scheduler; | |
this.instant = at; | |
this.taskId = id; | |
} | |
public void start() { | |
if (!scheduler.canStart()) { | |
log.info("本节点[{}]容量满了,任务[{}]无法启动: active={}, max={}", System.getenv("HOSTNAME"), getTaskKey(), scheduler.countActiveTask(), scheduler.MAX_ACTIVE); | |
return; | |
} | |
if (thread != null && thread.isAlive()) { | |
log.info("存在执行中的任务[{}],不再重复启动", getTaskKey()); | |
return; | |
} | |
if (scheduler.hasActiveTask(taskId)) { | |
log.info("存在执行中的任务[{}],不再重复启动", getTaskKey()); | |
return; | |
} | |
Date nextDate = nextExecutionTime(); | |
boolean wasSet = scheduler.setClock(getTaskKey(), nextDate.getTime()); | |
if (!wasSet) { | |
log.info("本节点[{}]没有获得执行任务[{}]的资格", System.getenv("HOSTNAME"), getTaskKey()); | |
return; | |
} | |
String execId = ShortUUID.generate(); | |
log.info("任务在节点[{}]上启动: {} ({})", System.getenv("HOSTNAME"), getDetail(), execId); | |
thread = new Thread(() -> { | |
try { | |
this.doWork(); | |
log.info("任务结束 ({})", execId); | |
} catch (Throwable t) { | |
log.error("任务异常 ({})", execId, t); | |
} finally { | |
scheduler.removeActiveTask(this); | |
} | |
}, "LLT" + execId); | |
thread.setDaemon(true); | |
thread.start(); | |
scheduler.addActiveTask(this); | |
} | |
private void doWork() { | |
try {Thread.sleep(68000);} catch (InterruptedException e) { /**/ } | |
} | |
public void stop() { | |
scheduledFuture.cancel(false); | |
if (thread != null && thread.isAlive()) { | |
thread.interrupt(); | |
try {thread.join(100);} catch (InterruptedException e) { /**/ } | |
thread = null; | |
} | |
scheduler.resetClock(getTaskKey()); | |
} | |
public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) { | |
this.scheduledFuture = scheduledFuture; | |
} | |
public Date nextExecutionTime() { | |
if (cronTrigger != null) { | |
return cronTrigger.nextExecutionTime(new SimpleTriggerContext()); | |
} | |
if (instant != null) { | |
return Date.from(instant); | |
} | |
throw new IllegalStateException("LongLivedTask既不是定时任务也不是一次性任务"); | |
} | |
public String getTaskId() { | |
return taskId; | |
} | |
public String getTaskKey() { | |
return taskId + ":" + getRule(); | |
} | |
public String getRule() { | |
if (cronTrigger != null) { | |
return cronTrigger.getExpression(); | |
} | |
if (instant != null) { | |
return instant.toString(); | |
} | |
return ""; | |
} | |
public String getDetail() { | |
if (cronTrigger != null) { | |
return "定时任务[" + getTaskKey() + "]"; | |
} | |
if (instant != null) { | |
return "一次性任务[" + getTaskKey() + "]"; | |
} | |
return "错误"; | |
} | |
} | |
/** | |
* 管理从数据库加载的定时任务 | |
*/ | |
class DatabaseTaskLoader { | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment