Last active
January 13, 2016 22:37
-
-
Save zr40/e280a45efdb35e3a2c9b to your computer and use it in GitHub Desktop.
Upsert+delete voodoo test (implemented in Rust)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate postgres; | |
use postgres::{Connection,SslMode}; | |
use std::sync::mpsc::{channel,Receiver,TryRecvError}; | |
use std::thread; | |
#[test] | |
fn test_upsert_delete_mvcc_voodoo() { | |
// This tests an upsert+delete technique to replace a set of rows with | |
// another set of rows. The intended use case is to replicate an external | |
// data source, of which an older version was previously replicated. Rows | |
// of this data source may have been added, kept, updated, or deleted. | |
// | |
// PostgreSQL 9.5 introduces INSERT ... ON CONFLICT DO UPDATE, which is | |
// sufficient for the add, keep, and update cases, however, it does not | |
// handle the delete case. Ideally, the way to do this is proper, concise, | |
// efficient, and without side-effects, however no method exists that | |
// provides all four properties. | |
// | |
// The methods are: | |
// | |
// * DELETE everything, then INSERT everything. | |
// Although proper and concise, this will produce dead rows for unchanged | |
// data, and will trigger cascading foreign keys. | |
// | |
// * Add a change number column to recognize which rows are to be deleted. | |
// Proper, concise and side-effect free, but will produce dead rows like | |
// the previous method. It will also increase the size of a row. | |
// | |
// * Keep track of all keys provided to INSERT ... ON CONFLICT DO UPDATE, | |
// then exclude the same keys in the DELETE. | |
// This is proper, efficient and side-effect free, but the exclusion | |
// clause must list all keys within the set of rows. | |
// | |
// * Put the data in a temporary table, and use that as a source for the | |
// INSERT ... ON CONFLICT DO UPDATE and the DELETE. | |
// This is proper, concise and side-effect free, but requires temporary | |
// duplication of the entire set. | |
// | |
// * Abuse MVCC and PostgreSQL's row locking implementation. | |
// This is concise, efficient and side-effect free, but is obviously | |
// terribly improper, and could delete rows matching the add, keep and | |
// update cases. | |
// | |
// The improper method works as follows: | |
// | |
// 1. Begin a new transaction. | |
// | |
// 2. Perform an upsert as follows: | |
// | |
// INSERT INTO tbl (key, set_id, col1, col2, ...) | |
// VALUES ($1, $2, $3, $4, ...) | |
// ON CONFLICT (key) DO UPDATE | |
// SET key = tbl.key, | |
// col1 = $3, col2 = $4, ... | |
// WHERE col1 <> $3 AND col2 <> $4 AND ... | |
// | |
// By setting the key to itself, the row will be locked in FOR UPDATE | |
// instead of FOR NO KEY UPDATE, which would not block FOR KEY SHARE. | |
// | |
// 3. Delete the remainder: | |
// | |
// DELETE FROM tbl | |
// WHERE set_id = $1 | |
// AND age(xmin) <> 0 | |
// AND age(xmax) <> 0 | |
// | |
// When a row has previously been inserted or updated by this | |
// transaction, xmin will contain its xid. When it has only been locked | |
// by this transaction, xmax will contain its xid. Therefore, if both | |
// xmin and xmax differ from the transaction's xid, it must have been | |
// present at the start of the transaction and not locked. | |
// | |
// For the four cases: | |
// * Add: row is newly inserted. | |
// age(xmin) = 0 | |
// * Keep: row is locked. | |
// age(xmax) = 0 (assuming no MultiXact) | |
// * Update: old row version is not visible. New version is inserted. | |
// age(xmin) = 0 | |
// * Delete: row is not inserted by this transaction and is not locked. | |
// age(xmin) <> 0 AND age(xmax) <> 0 | |
// | |
// 4. Commit the transaction. | |
// | |
// This improper method assumes that xmax contains a regular xid, not a | |
// MultiXact xid. This cannot be guaranteed in a general case, however, the | |
// use of FOR UPDATE row locks removes one particular opportunity of a | |
// MultiXact appearing. The effect of having a MultiXact xid in a row's | |
// xmax is that the row will be deleted when it would otherwise be kept. | |
let conn = "postgres://zr40@%2Ftmp/test"; | |
let db = Connection::connect(conn, &SslMode::None).unwrap(); | |
db.execute("drop table if exists test", &[]).unwrap(); | |
db.execute("create table test (id int4 primary key, set_id int4 not null)", &[]).unwrap(); | |
db.execute("create index on test (set_id)", &[]).unwrap(); | |
// Populate test table with 10000 rows, equally divided into 20 groups of | |
// 500 rows | |
db.execute("insert into test (id, set_id) select generate_series(0, 9999), generate_series(0, 19)", &[]).unwrap(); | |
// Prepared statement: lock 100 rows (0..1980 step 20) | |
let upsert_100 = db.prepare("insert into test (id, set_id) select generate_series($1, 1980 + $1, 20), 4 on conflict (id) do update set id = test.id where false").unwrap(); | |
// Prepared statement: lock 50 rows (9000..9980 step 20) | |
let upsert_50 = db.prepare("insert into test (id, set_id) select generate_series(9000 + $1, 9980 + $1, 20), 4 on conflict (id) do update set id = test.id where false").unwrap(); | |
// Prepared statement: insert 201 rows | |
let insert_201 = db.prepare("insert into test (id, set_id) select generate_series(20000, 20200), $1 on conflict (id) do update set id = test.id where false").unwrap(); | |
// Prepared statement: delete using MVCC voodoo | |
let delete_remainder = db.prepare("delete from test where set_id = $1 and age(xmin) <> 0 and age(xmax) <> 0").unwrap(); | |
// Start some concurrent lockers | |
let threads: Vec<_> = (0..6).map(|x| { | |
let x = match x { | |
// Start three at the same offset | |
4 | 5 | 6 => 4, | |
x => x, | |
}; | |
let (tx, rx) = channel(); | |
let thread = thread::spawn(move|| { | |
concurrency(rx, x * 421); | |
}); | |
(tx, thread) | |
}).collect(); | |
for x in 0..2000 { | |
let set_id = x % 20; | |
let xact = db.transaction().unwrap(); | |
// The first upsert will lock 100 rows | |
upsert_100.execute(&[&set_id]).unwrap(); | |
// The second upsert will lock another 50 rows | |
upsert_50.execute(&[&set_id]).unwrap(); | |
// Add a bunch of non-conflicting rows. | |
insert_201.execute(&[&set_id]).unwrap(); | |
// Out of the original 500 rows, 350 rows remain unlocked. This must be | |
// the exact number of rows affected by the DELETE. | |
let deleted = delete_remainder.execute(&[&set_id]).unwrap(); | |
assert_eq!(deleted, 350); | |
// Rollback | |
xact.finish().unwrap(); | |
} | |
// stop the concurrent activity | |
for thread in threads.iter() { | |
thread.0.send(()).unwrap(); | |
} | |
for thread in threads.into_iter() { | |
thread.1.join().unwrap(); | |
} | |
db.execute("drop table test", &[]).unwrap(); | |
} | |
fn concurrency(rx: Receiver<()>, start: i32) { | |
let conn = "postgres://zr40@%2Ftmp/test"; | |
let db = Connection::connect(conn, &SslMode::None).unwrap(); | |
// non-update row lock types | |
let lock0 = db.prepare("select from test where id = $1 for key share").unwrap(); | |
let lock1 = db.prepare("select from test where id = $1 for share").unwrap(); | |
let lock2 = db.prepare("select from test where id = $1 for no key update").unwrap(); | |
let lock3 = db.prepare("select from test where id = $1 for update").unwrap(); | |
// update row lock types | |
let lock4 = db.prepare("update test set set_id = set_id where id = $1").unwrap(); | |
let lock5 = db.prepare("update test set id = id where id = $1").unwrap(); | |
while rx.try_recv() == Err(TryRecvError::Empty) { | |
for x in 0..9999 { | |
let x = (x + start) % 10000; | |
let xact = db.transaction().unwrap(); | |
match x % 10 { | |
0 | 1 => &lock0, | |
2 | 3 => &lock1, | |
4 | 5 => &lock2, | |
6 | 7 => &lock3, | |
8 => &lock4, | |
9 => &lock5, | |
_ => panic!(), | |
}.execute(&[&x]).unwrap(); | |
// Test both commits and rollbacks of row locks. | |
// | |
// Committing an update (in cases 8 and 9) will make the old row | |
// version invisible to the main thread's DELETE, causing less rows | |
// to be deleted. This is not what's being tested, and is also | |
// acceptable, so don't commit in that case. | |
if x % 2 == 0 && x % 10 < 8 { | |
xact.commit().unwrap(); | |
} else { | |
xact.finish().unwrap(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment