Last active
April 29, 2024 22:53
-
-
Save Blquinn/119df3a4decb70cdd9b1175df6e6ff72 to your computer and use it in GitHub Desktop.
Global scheduler lock implementation using redis, daily scheduled task using redis + db
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/go-redis/redis" | |
"github.com/jmoiron/sqlx" | |
_ "github.com/lib/pq" | |
"github.com/robfig/cron" | |
"github.com/sirupsen/logrus" | |
"time" | |
) | |
func init() { | |
logrus.SetFormatter(&logrus.TextFormatter{}) | |
logrus.SetLevel(logrus.DebugLevel) | |
} | |
func main() { | |
db := sqlx.MustConnect("postgres", "host=localhost port=5432 user=ben password=password dbname=globalcron sslmode=disable") | |
_, err := db.Exec(` delete from task_log; `) | |
if err != nil { | |
panic(err) | |
} | |
client := redis.NewClient(&redis.Options{ | |
Addr: "localhost:6379", | |
Password: "", // no password set | |
DB: 0, // use default DB | |
}) | |
c := cron.New() | |
err = c.AddJob("*/5 * * * * *", newAtomicCronTask( | |
"print", | |
client, | |
func() { | |
logrus.Infof("Running job: the time is %s", time.Now().Format(time.RFC3339)) | |
}, | |
2 * time.Second)) | |
err = c.AddJob("*/5 * * * * *", newAtomicCronTask( | |
"daily", | |
client, | |
onceDailyCron(db, "daily", func() { | |
logrus.Println("Running once daily cron") | |
}), | |
2 * time.Second)) | |
if err != nil { | |
panic(err) | |
} | |
c.Run() | |
} | |
// AtomicCronTask is a cron task that uses a schedule lock to | |
// ensure that it only runs 1 time across the entire cluster. | |
type AtomicCronTask struct { | |
redis *redis.Client | |
name string | |
lockDur time.Duration | |
fn func() | |
} | |
func newAtomicCronTask(name string, r *redis.Client, fun func(), lockDur time.Duration) *AtomicCronTask { | |
return &AtomicCronTask{ | |
redis: r, | |
name: "atomic_job:" + name, | |
fn: fun, | |
lockDur: lockDur, | |
} | |
} | |
func (s *AtomicCronTask) Run() { | |
err := s.redis.Watch(func(tx *redis.Tx) error { | |
res := tx.Get(s.name) | |
_, err := res.Result() | |
if err == redis.Nil { | |
pipe := tx.TxPipeline() | |
pipe.Set(s.name, time.Now().Format(time.RFC3339Nano), s.lockDur) | |
_, err := pipe.Exec() | |
if err != nil { | |
return err | |
} | |
logrus.Debugf("set key running task %s", s.name) | |
s.fn() | |
return nil | |
} else if err != nil { | |
logrus.Errorf("got err while getting key from redis: %v", err) | |
return nil | |
} else { | |
logrus.Debugf("key exists, skipping task %s", s.name) | |
} | |
return nil | |
}, s.name) | |
if err != nil { | |
if err == redis.TxFailedErr { | |
logrus.Debugln("race condition avoided via transaction") | |
} else { | |
logrus.Errorf("got error during redis transaction: %v", err) | |
} | |
} | |
} | |
var _ cron.Job = &AtomicCronTask{} | |
func onceDailyCron(db *sqlx.DB, name string, fn func()) func() { | |
return func() { | |
var n int | |
err := db.Get(&n, ` | |
select count(*) | |
from task_log | |
where task_name = $1 | |
and run_time > $2 | |
`, name, time.Now().Truncate(24*time.Hour)) | |
if err != nil { | |
logrus.Errorf("Got err while getting task log entry: %v", err) | |
return | |
} | |
if n < 1 { | |
_, execErr := db.Exec(` | |
insert into task_log (task_name, run_time) | |
values ($1, $2); | |
`, name, time.Now()) | |
if execErr != nil { | |
logrus.Errorln("Failed to insert daily cron entry: %v", execErr) | |
return | |
} | |
fn() | |
return | |
} | |
logrus.Debugln("Skipping daily task due to existing entry") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment