Skip to content

Instantly share code, notes, and snippets.

@c4pt0r
Last active May 9, 2022 01:48
Show Gist options
  • Save c4pt0r/0cf957c606268cf212cce3a2edaf6607 to your computer and use it in GitHub Desktop.
Save c4pt0r/0cf957c606268cf212cce3a2edaf6607 to your computer and use it in GitHub Desktop.
Job Queue using TiDB
package main
import (
"database/sql"
"fmt"
"log"
"math/rand"
"time"
"github.com/fatih/color"
_ "github.com/go-sql-driver/mysql"
)
type JobStatus int
const (
JobStatusInitialized JobStatus = iota
JobStatusRunning
JobStatusFinished
JobStatusFailed
)
type Job struct {
ID int64
CreatedAt int64
UpdatedAt int64
FinishedAt int64
Status JobStatus
WorkerID int64
Data string
}
type Worker struct {
ID int64
Name string
}
func createDB(db *sql.DB) error {
stmt := `
CREATE TABLE IF NOT EXISTS jobs (
id BIGINT AUTO_RANDOM,
created_at BIGINT,
updated_at BIGINT,
status BIGINT,
worker_id BIGINT,
finished_at BIGINT,
data TEXT,
PRIMARY KEY (id),
KEY(finished_at),
KEY(created_at),
KEY(status),
KEY(worker_id)
);`
_, err := db.Exec(stmt)
if err != nil {
return err
}
stmt = `
CREATE TABLE IF NOT EXISTS workers (
id BIGINT AUTO_RANDOM,
name TEXT,
PRIMARY KEY (id)
);`
_, err = db.Exec(stmt)
if err != nil {
return err
}
return nil
}
func createJob(db *sql.DB, job *Job) error {
txn, err := db.Begin()
defer txn.Rollback()
if err != nil {
return err
}
sql := `
INSERT INTO jobs (
created_at,
updated_at,
status,
worker_id,
finished_at,
data
) VALUES (
?,
?,
?,
?,
?,
?
)`
res, err := txn.Exec(sql, job.CreatedAt, job.UpdatedAt, job.Status, job.WorkerID, job.FinishedAt, job.Data)
if err != nil {
return err
}
jobID, err := res.LastInsertId()
if err != nil {
return err
}
err = txn.Commit()
if err != nil {
txn.Rollback()
return err
}
job.ID = jobID
return nil
}
func createWorker(db *sql.DB, worker *Worker) error {
txn, err := db.Begin()
defer txn.Rollback()
if err != nil {
return err
}
stmt := `
INSERT INTO workers (
name
) VALUES (
?
)`
res, err := txn.Exec(stmt, worker.Name)
if err != nil {
return err
}
workerID, err := res.LastInsertId()
if err != nil {
return err
}
err = txn.Commit()
if err != nil {
txn.Rollback()
return err
}
worker.ID = workerID
return nil
}
func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) {
/*
Here's the magic.
TiDB supports pessimistic transactions,
And in TiDB 6.0 locks for pessimistic transactions are distributed and in-memory,
which means that concurrent transactions become less costly to acquire locks.
for more details: https://github.com/tikv/rfcs/blob/master/text/0077-in-memory-pessimistic-locks.md
*/
stmt := `
SELECT
id,
created_at,
updated_at,
status,
worker_id,
finished_at,
data
FROM jobs
WHERE status = ?
ORDER BY created_at DESC
LIMIT 1
FOR UPDATE`
txn, err := db.Begin()
defer txn.Rollback()
if err != nil {
return nil, err
}
var job Job
err = txn.QueryRow(stmt, JobStatusInitialized).Scan(
&job.ID,
&job.CreatedAt,
&job.UpdatedAt,
&job.Status,
&job.WorkerID,
&job.FinishedAt,
&job.Data,
)
if err != nil && err != sql.ErrNoRows {
return nil, err
}
if job.ID == 0 {
return nil, nil
}
stmt = `
UPDATE jobs
SET worker_id = ?, updated_at = ?, status = ?
WHERE id = ?`
_, err = txn.Exec(stmt, worker.ID, time.Now().Unix(), JobStatusRunning, job.ID)
if err != nil {
return nil, err
}
err = txn.Commit()
if err != nil {
return nil, err
}
return &job, nil
}
func updateJob(db *sql.DB, job *Job) error {
stmt := `
UPDATE jobs
SET updated_at = ?, status = ?, finished_at = ?, data = ?
WHERE id = ?`
_, err := db.Exec(stmt, job.UpdatedAt, job.Status, job.FinishedAt, job.Data, job.ID)
if err != nil {
return err
}
return nil
}
func randomString(length int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, length)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func workLoop(db *sql.DB, n int) {
for i := 0; i < n; i++ {
worker := Worker{
Name: fmt.Sprintf("worker-%d", i),
}
err := createWorker(db, &worker)
if err != nil {
panic(err)
}
go func() {
for {
job, err := workerFetchJob(db, &worker)
if err != nil {
panic(err)
}
if job == nil {
time.Sleep(10 * time.Millisecond)
continue
}
log.Printf("%s %s get job id: %d\n", color.YellowString("[FETCH]"), worker.Name, job.ID)
// simulate work
time.Sleep(500 * time.Millisecond)
job.Status = JobStatusFinished
job.FinishedAt = time.Now().Unix()
job.WorkerID = 0
err = updateJob(db, job)
if err != nil {
panic(err)
}
log.Printf("%s %s finish job id: %d\n", color.GreenString("[DONE]"), worker.Name, job.ID)
}
}()
}
}
func main() {
db, err := sql.Open("mysql", "root:@tcp(localhost:4000)/test")
if err != nil {
panic(err)
}
db.SetConnMaxLifetime(time.Minute * 3)
db.SetMaxOpenConns(50)
db.SetMaxIdleConns(50)
err = createDB(db)
if err != nil {
panic(err)
}
// create 100 workers
workLoop(db, 100)
// create random jobs, every 100ms
for {
job := Job{
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
Status: JobStatusInitialized,
WorkerID: 0,
FinishedAt: 0,
Data: randomString(1024),
}
err = createJob(db, &job)
if err != nil {
panic(err)
}
log.Printf("%s create job id: %d\n", color.RedString("[NEWJOB]"), job.ID)
time.Sleep(100 * time.Millisecond)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment