Created
May 5, 2016 08:34
-
-
Save kshvakov/5c7f3c405fc41d122841b7ebff200e2b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ time ./try_advisory_lock | |
Waiting | |
[5] Done, processed 1274 rows. | |
[7] Done, processed 1236 rows. | |
[2] Done, processed 1252 rows. | |
[4] Done, processed 1237 rows. | |
[8] Done, processed 1257 rows. | |
[6] Done, processed 1257 rows. | |
[3] Done, processed 1244 rows. | |
[1] Done, processed 1243 rows. | |
Done! | |
real 0m1.228s | |
user 0m1.104s | |
sys 0m1.024s | |
$ time ./skip_locked | |
Waiting | |
[5] Done, processed 1233 rows. | |
[7] Done, processed 1273 rows. | |
[8] Done, processed 1260 rows. | |
[6] Done, processed 1249 rows. | |
[1] Done, processed 1228 rows. | |
[4] Done, processed 1306 rows. | |
[2] Done, processed 1220 rows. | |
[3] Done, processed 1231 rows. | |
Done! | |
real 0m1.146s | |
user 0m1.008s | |
sys 0m1.072s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/jmoiron/sqlx" | |
_ "github.com/lib/pq" | |
"crypto/md5" | |
"database/sql" | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
) | |
type Item struct { | |
Job string `db:"job"` | |
Priority int16 `db:"priority"` | |
AddedOn time.Time `db:"added_on"` | |
} | |
const query = ` | |
select * | |
from queue | |
order by priority desc, added_on asc limit 1 for update skip locked | |
` | |
func main() { | |
connect := sqlx.MustConnect("postgres", "host=127.0.0.1 user=kshvakov dbname=postgres") | |
var wg sync.WaitGroup | |
wg.Add(8) | |
for i := 1; i <= 8; i++ { | |
go func(num int) { | |
var processed int | |
for { | |
var item Item | |
tx, _ := connect.Beginx() | |
if err := tx.Get(&item, query); err != nil { | |
if err != sql.ErrNoRows { | |
log.Fatal(err) | |
} | |
break | |
} | |
processItem(&item) | |
tx.Exec("delete from queue where priority = $1 and added_on = $2", item.Priority, item.AddedOn) | |
tx.Commit() | |
processed++ | |
} | |
fmt.Printf("[%d] Done, processed %d rows.\n", num, processed) | |
wg.Done() | |
}(i) | |
} | |
fmt.Println("Waiting") | |
wg.Wait() | |
fmt.Println("Done!") | |
} | |
func processItem(item *Item) string { | |
return fmt.Sprintf("%x", md5.Sum([]byte(item.Job))) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/jmoiron/sqlx" | |
_ "github.com/lib/pq" | |
"crypto/md5" | |
"database/sql" | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
) | |
type Item struct { | |
Job string `db:"job"` | |
Priority int16 `db:"priority"` | |
AddedOn time.Time `db:"added_on"` | |
} | |
const query = ` | |
select * | |
from queue | |
where pg_try_advisory_xact_lock(123, hashtext( priority::text || added_on::text ) ) | |
order by priority desc, added_on asc limit 1 for update | |
` | |
func main() { | |
connect := sqlx.MustConnect("postgres", "host=127.0.0.1 user=kshvakov dbname=postgres") | |
var wg sync.WaitGroup | |
wg.Add(8) | |
for i := 1; i <= 8; i++ { | |
go func(num int) { | |
var processed int | |
for { | |
var item Item | |
tx, _ := connect.Beginx() | |
if err := tx.Get(&item, query); err != nil { | |
if err != sql.ErrNoRows { | |
log.Fatal(err) | |
} | |
break | |
} | |
processItem(&item) | |
tx.Exec("delete from queue where priority = $1 and added_on = $2", item.Priority, item.AddedOn) | |
tx.Commit() | |
processed++ | |
} | |
fmt.Printf("[%d] Done, processed %d rows.\n", num, processed) | |
wg.Done() | |
}(i) | |
} | |
fmt.Println("Waiting") | |
wg.Wait() | |
fmt.Println("Done!") | |
} | |
func processItem(item *Item) string { | |
return fmt.Sprintf("%x", md5.Sum([]byte(item.Job))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment