Skip to content

Instantly share code, notes, and snippets.

@maleck13
Last active May 26, 2020 07:47
Show Gist options
  • Save maleck13/b04e143fabd5e4c51e349ca5fc795549 to your computer and use it in GitHub Desktop.
Save maleck13/b04e143fabd5e4c51e349ca5fc795549 to your computer and use it in GitHub Desktop.
job queue postgres
var done = sync.Map{}
func main() {
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+
"password=%s dbname=%s sslmode=disable",
host, port, user, password, dbname)
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
panic(err)
}
defer db.Close()
go func() {
i := 0
for {
i++
clusterID := fmt.Sprintf("cid-%v", i)
l := Log{ClusterUUID: clusterID, Description: "rhmi added"}
time.Sleep(200 * time.Millisecond)
if err := handleLog(db, l); err != nil {
fmt.Println("handle log err ", err)
}
}
}()
time.Sleep(5*time.Second)
rand.Seed(time.Now().UnixNano())
min := 10
max := 500
go func() {
for {
delay := rand.Intn(max - min + 1) + min
id, err := doWork(db,delay)
if err != nil {
fmt.Println("worker 1 failed for id ", id, err)
continue
}
fmt.Println("worker 1 completed for id ", id)
if _,ok := done.Load(id); ok{
fmt.Println("duplicate entry for ", id )
continue
}
done.Store(id,"true")
}
}()
go func() {
for {
delay := rand.Intn(max - min + 1) + min
id, err := doWork(db,delay)
if err != nil {
fmt.Println("worker 2 failed for id ", id, err)
continue
}
fmt.Println("worker 2 completed for id ", id)
if _,ok := done.Load(id); ok{
fmt.Println("duplicate entry for ", id )
continue
}
done.Store(id,"true")
}
}()
v := make(chan bool, 1)
<-v
}
func handleLog(db *sql.DB, log Log) error {
if strings.Contains(log.Description, "rhmi added") {
if err := handleNewCluster(log, db); err != nil {
return err
}
}
if strings.Contains(log.Description, "rhmi deleted") {
if err := handleDeletedCluster(log, db); err != nil {
return err
}
}
return nil
}
func doWork(db *sql.DB, delay int) (string, error) {
ctx := context.TODO()
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
return "", fmt.Errorf("failed to start transaction %v ", err)
}
id := ""
// FOR UPDATE LOCKS THE ROW. SKIP LOCKED ENSURES IT MOVES PAST ANY ALREADY LOCKED ROWS
err = tx.QueryRow("SELECT clusterid from smtp where completed=0 ORDER BY created FOR UPDATE SKIP LOCKED LIMIT 1").Scan(&id)
if err != nil {
tx.Rollback()
return "", fmt.Errorf("failed to find job %v ", err)
}
time.Sleep(time.Duration(delay) * time.Millisecond)
n := time.Now().Unix()
if _, err := tx.Exec("update smtp set completed=$1 where clusterid=$2", n, id); err != nil {
tx.Rollback()
return id, fmt.Errorf("failed to update job %v ", err)
}
return id, tx.Commit()
}
func handleNewCluster(l Log, db *sql.DB) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("failed to start transaction %v ", err)
}
if _, err := tx.Exec("insert into smtp (clusterid,remove,created,completed) VALUES($1,false,CURRENT_TIMESTAMP,0) ", l.ClusterUUID); err != nil {
tx.Rollback()
return fmt.Errorf("failed to insert row %v", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit %v", err)
}
return nil
}
func handleDeletedCluster(l Log, db *sql.DB) error {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment