Last active
May 9, 2022 01:48
-
-
Save c4pt0r/0cf957c606268cf212cce3a2edaf6607 to your computer and use it in GitHub Desktop.
Job Queue using TiDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"fmt" | |
"log" | |
"math/rand" | |
"time" | |
"github.com/fatih/color" | |
_ "github.com/go-sql-driver/mysql" | |
) | |
type JobStatus int | |
const ( | |
JobStatusInitialized JobStatus = iota | |
JobStatusRunning | |
JobStatusFinished | |
JobStatusFailed | |
) | |
type Job struct { | |
ID int64 | |
CreatedAt int64 | |
UpdatedAt int64 | |
FinishedAt int64 | |
Status JobStatus | |
WorkerID int64 | |
Data string | |
} | |
type Worker struct { | |
ID int64 | |
Name string | |
} | |
func createDB(db *sql.DB) error { | |
stmt := ` | |
CREATE TABLE IF NOT EXISTS jobs ( | |
id BIGINT AUTO_RANDOM, | |
created_at BIGINT, | |
updated_at BIGINT, | |
status BIGINT, | |
worker_id BIGINT, | |
finished_at BIGINT, | |
data TEXT, | |
PRIMARY KEY (id), | |
KEY(finished_at), | |
KEY(created_at), | |
KEY(status), | |
KEY(worker_id) | |
);` | |
_, err := db.Exec(stmt) | |
if err != nil { | |
return err | |
} | |
stmt = ` | |
CREATE TABLE IF NOT EXISTS workers ( | |
id BIGINT AUTO_RANDOM, | |
name TEXT, | |
PRIMARY KEY (id) | |
);` | |
_, err = db.Exec(stmt) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
func createJob(db *sql.DB, job *Job) error { | |
txn, err := db.Begin() | |
defer txn.Rollback() | |
if err != nil { | |
return err | |
} | |
sql := ` | |
INSERT INTO jobs ( | |
created_at, | |
updated_at, | |
status, | |
worker_id, | |
finished_at, | |
data | |
) VALUES ( | |
?, | |
?, | |
?, | |
?, | |
?, | |
? | |
)` | |
res, err := txn.Exec(sql, job.CreatedAt, job.UpdatedAt, job.Status, job.WorkerID, job.FinishedAt, job.Data) | |
if err != nil { | |
return err | |
} | |
jobID, err := res.LastInsertId() | |
if err != nil { | |
return err | |
} | |
err = txn.Commit() | |
if err != nil { | |
txn.Rollback() | |
return err | |
} | |
job.ID = jobID | |
return nil | |
} | |
func createWorker(db *sql.DB, worker *Worker) error { | |
txn, err := db.Begin() | |
defer txn.Rollback() | |
if err != nil { | |
return err | |
} | |
stmt := ` | |
INSERT INTO workers ( | |
name | |
) VALUES ( | |
? | |
)` | |
res, err := txn.Exec(stmt, worker.Name) | |
if err != nil { | |
return err | |
} | |
workerID, err := res.LastInsertId() | |
if err != nil { | |
return err | |
} | |
err = txn.Commit() | |
if err != nil { | |
txn.Rollback() | |
return err | |
} | |
worker.ID = workerID | |
return nil | |
} | |
func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) { | |
/* | |
Here's the magic. | |
TiDB supports pessimistic transactions, | |
And in TiDB 6.0 locks for pessimistic transactions are distributed and in-memory, | |
which means that concurrent transactions become less costly to acquire locks. | |
for more details: https://github.com/tikv/rfcs/blob/master/text/0077-in-memory-pessimistic-locks.md | |
*/ | |
stmt := ` | |
SELECT | |
id, | |
created_at, | |
updated_at, | |
status, | |
worker_id, | |
finished_at, | |
data | |
FROM jobs | |
WHERE status = ? | |
ORDER BY created_at DESC | |
LIMIT 1 | |
FOR UPDATE` | |
txn, err := db.Begin() | |
defer txn.Rollback() | |
if err != nil { | |
return nil, err | |
} | |
var job Job | |
err = txn.QueryRow(stmt, JobStatusInitialized).Scan( | |
&job.ID, | |
&job.CreatedAt, | |
&job.UpdatedAt, | |
&job.Status, | |
&job.WorkerID, | |
&job.FinishedAt, | |
&job.Data, | |
) | |
if err != nil && err != sql.ErrNoRows { | |
return nil, err | |
} | |
if job.ID == 0 { | |
return nil, nil | |
} | |
stmt = ` | |
UPDATE jobs | |
SET worker_id = ?, updated_at = ?, status = ? | |
WHERE id = ?` | |
_, err = txn.Exec(stmt, worker.ID, time.Now().Unix(), JobStatusRunning, job.ID) | |
if err != nil { | |
return nil, err | |
} | |
err = txn.Commit() | |
if err != nil { | |
return nil, err | |
} | |
return &job, nil | |
} | |
func updateJob(db *sql.DB, job *Job) error { | |
stmt := ` | |
UPDATE jobs | |
SET updated_at = ?, status = ?, finished_at = ?, data = ? | |
WHERE id = ?` | |
_, err := db.Exec(stmt, job.UpdatedAt, job.Status, job.FinishedAt, job.Data, job.ID) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
func randomString(length int) string { | |
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") | |
b := make([]rune, length) | |
for i := range b { | |
b[i] = letters[rand.Intn(len(letters))] | |
} | |
return string(b) | |
} | |
func workLoop(db *sql.DB, n int) { | |
for i := 0; i < n; i++ { | |
worker := Worker{ | |
Name: fmt.Sprintf("worker-%d", i), | |
} | |
err := createWorker(db, &worker) | |
if err != nil { | |
panic(err) | |
} | |
go func() { | |
for { | |
job, err := workerFetchJob(db, &worker) | |
if err != nil { | |
panic(err) | |
} | |
if job == nil { | |
time.Sleep(10 * time.Millisecond) | |
continue | |
} | |
log.Printf("%s %s get job id: %d\n", color.YellowString("[FETCH]"), worker.Name, job.ID) | |
// simulate work | |
time.Sleep(500 * time.Millisecond) | |
job.Status = JobStatusFinished | |
job.FinishedAt = time.Now().Unix() | |
job.WorkerID = 0 | |
err = updateJob(db, job) | |
if err != nil { | |
panic(err) | |
} | |
log.Printf("%s %s finish job id: %d\n", color.GreenString("[DONE]"), worker.Name, job.ID) | |
} | |
}() | |
} | |
} | |
func main() { | |
db, err := sql.Open("mysql", "root:@tcp(localhost:4000)/test") | |
if err != nil { | |
panic(err) | |
} | |
db.SetConnMaxLifetime(time.Minute * 3) | |
db.SetMaxOpenConns(50) | |
db.SetMaxIdleConns(50) | |
err = createDB(db) | |
if err != nil { | |
panic(err) | |
} | |
// create 100 workers | |
workLoop(db, 100) | |
// create random jobs, every 100ms | |
for { | |
job := Job{ | |
CreatedAt: time.Now().Unix(), | |
UpdatedAt: time.Now().Unix(), | |
Status: JobStatusInitialized, | |
WorkerID: 0, | |
FinishedAt: 0, | |
Data: randomString(1024), | |
} | |
err = createJob(db, &job) | |
if err != nil { | |
panic(err) | |
} | |
log.Printf("%s create job id: %d\n", color.RedString("[NEWJOB]"), job.ID) | |
time.Sleep(100 * time.Millisecond) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment