Last active
November 2, 2023 08:38
-
-
Save aclisp/34a242f069489a50de9ddffe5c4f3113 to your computer and use it in GitHub Desktop.
优雅结束的定时任务
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package job | |
import ( | |
"fmt" | |
"git.yy.com/ihago/yalert/v2" | |
"git.yy.com/ihago/ylog" | |
"git.yy.com/ihago/ylog/zap" | |
"git.yy.com/ihago/yruntime" | |
"github.com/robfig/cron/v3" | |
"hago-room-srv-metrics2/util" | |
"time" | |
) | |
// Cron 调度任务 | |
type Cron struct { | |
started bool | |
cron *cron.Cron | |
} | |
// Schedule 日程表 | |
type Schedule struct { | |
// Spec 是调度周期,遵循规范 https://en.wikipedia.org/wiki/Cron,参考 https://crontab.guru/ | |
Spec string | |
Cmd func() | |
} | |
// panicAlerter 抓住处理消息时的panic并告警 | |
func (c *Cron) panicAlerter(j cron.Job) cron.Job { | |
return cron.FuncJob(func() { | |
defer func() { | |
if r := recover(); r != nil { | |
s, _ := yruntime.CallStack(1, 10) // skip defer func() | |
yalert.Alert(yalert.DefaultMsgFormat(fmt.Sprintf("cron job panic: %v \n statck: %s", r, s)), yalert.WeChat(wechatRestrainConf)) | |
ylog.Error("cron job panic", zap.Any("error", r), zap.String("stack", s)) | |
} | |
}() | |
j.Run() | |
}) | |
} | |
// Start 启动调度任务,按日程表的设定来运行 | |
func (c *Cron) Start(ss []Schedule) { | |
if c.started { | |
return | |
} | |
c.started = true | |
c.cron = cron.New(cron.WithChain(c.panicAlerter)) | |
for _, s := range ss { | |
if _, err := c.cron.AddFunc(s.Spec, s.Cmd); err != nil { | |
util.AlertAndLog(fmt.Sprintf("cron.AddFunc %q: %v", s.Spec, err)) | |
} | |
} | |
c.cron.Start() | |
} | |
// Stop 停止调度任务 | |
func (c *Cron) Stop() { | |
if !c.started { | |
return | |
} | |
ctx := c.cron.Stop() | |
select { | |
case <-ctx.Done(): | |
case <-time.After(time.Second): | |
ylog.Error("running cron jobs is not completed") | |
} | |
c.started = false | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Start . | |
func (m *RoomPlayerNum) Start() { | |
m.tick.Start(10*time.Second, dao.WithIsolation(m.everyMinute)) | |
m.cron.Start([]job.Schedule{{Spec: "* * * * *", Cmd: dao.WithLeader(m.everyDay)}}) | |
} | |
// Stop . | |
func (m *RoomPlayerNum) Stop() { | |
m.tick.Stop() | |
m.cron.Stop() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 用于装饰定时任务的 f func() | |
package dao | |
import ( | |
"fmt" | |
"git.yy.com/ihago/ycommon/elect" | |
"git.yy.com/ihago/ylog" | |
"git.yy.com/ihago/ylog/zap" | |
"hago-room-srv-metrics2/config" | |
"path/filepath" | |
"runtime" | |
"time" | |
) | |
// WithDistributedLock 基于redis分布式锁执行f,锁的名称为name,过期时间为d | |
func WithDistributedLock(name string, d time.Duration, f func()) func() { | |
return func() { | |
lockName := config.MyName + ":distributed_lock:" + name | |
ok, val, err := mRedis.Lock(lockName, d.Milliseconds()) | |
if err != nil { | |
ylog.Error("redis lock fail", zap.String("lockName", lockName), zap.Int64("lockTime", d.Milliseconds())) | |
return | |
} | |
if !ok { | |
return | |
} | |
defer func() { // unlock even if f panics | |
if err := mRedis.Unlock(lockName, val); err != nil { | |
ylog.Error("redis unlock fail", zap.String("lockName", lockName), zap.Int64("lockTime", d.Milliseconds())) | |
} | |
}() | |
f() | |
} | |
} | |
// WithLeader 只在多个服务实例的领导者实例上执行f | |
func WithLeader(f func()) func() { | |
lockName := lockNameByFileLine() | |
ylog.Debug("leader lock name", zap.String("lockName", lockName)) | |
f = WithDistributedLock(lockName, time.Minute, f) | |
return func() { | |
if elect.IsMaxNode(mService) { | |
f() | |
} | |
} | |
} | |
func lockNameByFileLine() string { | |
_, path, line, _ := runtime.Caller(2) | |
dir, file := filepath.Split(path) | |
dir = filepath.Base(dir) | |
lockName := fmt.Sprintf("%s:%s:%d", dir, file, line) | |
return lockName | |
} | |
// WithIsolation 多个服务实例,串行执行f | |
func WithIsolation(f func()) func() { | |
lockName := lockNameByFileLine() | |
ylog.Debug("isolation lock name", zap.String("lockName", lockName)) | |
return WithDistributedLock(lockName, time.Minute, f) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Stop 停止每个指标计算模块 | |
func Stop() { | |
// 停止会阻塞等待定时任务结束,这里必须用并行停止,节省时间 | |
g := new(errgroup.Group) | |
for _, m := range metrics { | |
m := m // https://golang.org/doc/faq#closures_and_goroutines | |
g.Go(func() error { | |
m.Stop() | |
return nil | |
}) | |
} | |
g.Wait() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package job | |
import ( | |
"context" | |
"fmt" | |
"git.yy.com/ihago/yalert/v2" | |
"git.yy.com/ihago/ylog" | |
"git.yy.com/ihago/ylog/zap" | |
"git.yy.com/ihago/yruntime" | |
"time" | |
) | |
// Tick 定时任务 | |
type Tick struct { | |
started bool | |
ctx context.Context | |
cancel context.CancelFunc | |
} | |
// panicAlerter 抓住处理消息时的panic并告警 | |
func (t *Tick) panicAlerter(f func()) func() { | |
return func() { | |
defer func() { | |
if r := recover(); r != nil { | |
s, _ := yruntime.CallStack(1, 10) // skip defer func() | |
yalert.Alert(yalert.DefaultMsgFormat(fmt.Sprintf("tick job panic: %v \n statck: %s", r, s)), yalert.WeChat(wechatRestrainConf)) | |
ylog.Error("tick job panic", zap.Any("error", r), zap.String("stack", s)) | |
} | |
}() | |
f() | |
} | |
} | |
// Start 启动定时任务,以周期d运行f | |
func (t *Tick) Start(d time.Duration, f func()) { | |
if t.started { | |
return | |
} | |
t.started = true | |
var ctx context.Context | |
ctx, t.cancel = context.WithCancel(context.Background()) | |
var cancel context.CancelFunc | |
t.ctx, cancel = context.WithCancel(context.Background()) | |
go func() { | |
ticker := time.NewTicker(d) | |
defer ticker.Stop() | |
Loop: | |
for { | |
select { | |
case <-ticker.C: | |
t.panicAlerter(f)() | |
case <-ctx.Done(): | |
break Loop | |
} | |
if ctx.Err() != nil { | |
break | |
} | |
} | |
cancel() | |
}() | |
} | |
// Stop 停止定时任务 | |
func (t *Tick) Stop() { | |
if !t.started { | |
return | |
} | |
t.cancel() | |
select { | |
case <-t.ctx.Done(): | |
case <-time.After(time.Second): | |
ylog.Error("running tick jobs is not completed") | |
} | |
t.started = false | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment