Last active
May 26, 2020 07:47
-
-
Save maleck13/b04e143fabd5e4c51e349ca5fc795549 to your computer and use it in GitHub Desktop.
job queue postgres
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var done = sync.Map{} | |
func main() { | |
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+ | |
"password=%s dbname=%s sslmode=disable", | |
host, port, user, password, dbname) | |
db, err := sql.Open("postgres", psqlInfo) | |
if err != nil { | |
panic(err) | |
} | |
defer db.Close() | |
go func() { | |
i := 0 | |
for { | |
i++ | |
clusterID := fmt.Sprintf("cid-%v", i) | |
l := Log{ClusterUUID: clusterID, Description: "rhmi added"} | |
time.Sleep(200 * time.Millisecond) | |
if err := handleLog(db, l); err != nil { | |
fmt.Println("handle log err ", err) | |
} | |
} | |
}() | |
time.Sleep(5*time.Second) | |
rand.Seed(time.Now().UnixNano()) | |
min := 10 | |
max := 500 | |
go func() { | |
for { | |
delay := rand.Intn(max - min + 1) + min | |
id, err := doWork(db,delay) | |
if err != nil { | |
fmt.Println("worker 1 failed for id ", id, err) | |
continue | |
} | |
fmt.Println("worker 1 completed for id ", id) | |
if _,ok := done.Load(id); ok{ | |
fmt.Println("duplicate entry for ", id ) | |
continue | |
} | |
done.Store(id,"true") | |
} | |
}() | |
go func() { | |
for { | |
delay := rand.Intn(max - min + 1) + min | |
id, err := doWork(db,delay) | |
if err != nil { | |
fmt.Println("worker 2 failed for id ", id, err) | |
continue | |
} | |
fmt.Println("worker 2 completed for id ", id) | |
if _,ok := done.Load(id); ok{ | |
fmt.Println("duplicate entry for ", id ) | |
continue | |
} | |
done.Store(id,"true") | |
} | |
}() | |
v := make(chan bool, 1) | |
<-v | |
} | |
func handleLog(db *sql.DB, log Log) error { | |
if strings.Contains(log.Description, "rhmi added") { | |
if err := handleNewCluster(log, db); err != nil { | |
return err | |
} | |
} | |
if strings.Contains(log.Description, "rhmi deleted") { | |
if err := handleDeletedCluster(log, db); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
func doWork(db *sql.DB, delay int) (string, error) { | |
ctx := context.TODO() | |
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) | |
if err != nil { | |
return "", fmt.Errorf("failed to start transaction %v ", err) | |
} | |
id := "" | |
// FOR UPDATE LOCKS THE ROW. SKIP LOCKED ENSURES IT MOVES PAST ANY ALREADY LOCKED ROWS | |
err = tx.QueryRow("SELECT clusterid from smtp where completed=0 ORDER BY created FOR UPDATE SKIP LOCKED LIMIT 1").Scan(&id) | |
if err != nil { | |
tx.Rollback() | |
return "", fmt.Errorf("failed to find job %v ", err) | |
} | |
time.Sleep(time.Duration(delay) * time.Millisecond) | |
n := time.Now().Unix() | |
if _, err := tx.Exec("update smtp set completed=$1 where clusterid=$2", n, id); err != nil { | |
tx.Rollback() | |
return id, fmt.Errorf("failed to update job %v ", err) | |
} | |
return id, tx.Commit() | |
} | |
func handleNewCluster(l Log, db *sql.DB) error { | |
tx, err := db.Begin() | |
if err != nil { | |
return fmt.Errorf("failed to start transaction %v ", err) | |
} | |
if _, err := tx.Exec("insert into smtp (clusterid,remove,created,completed) VALUES($1,false,CURRENT_TIMESTAMP,0) ", l.ClusterUUID); err != nil { | |
tx.Rollback() | |
return fmt.Errorf("failed to insert row %v", err) | |
} | |
if err := tx.Commit(); err != nil { | |
return fmt.Errorf("failed to commit %v", err) | |
} | |
return nil | |
} | |
func handleDeletedCluster(l Log, db *sql.DB) error { | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment