Last active
December 22, 2015 10:28
-
-
Save steveyen/6458663 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# fake, pseudocode for an incremental backup client that uses libupr | |
# and a mythical memcached-binary network library (mcb). | |
import sys | |
import sqlite | |
import uuid | |
import libupr | |
import mcb | |
c = mcb.connect(sys.argv[0]) | |
c.authenticate(sys.argv[1]) | |
d = sqlite.open(sys.argv[2]) | |
d.execute("CREATE TABLE blob ...; CREATE TABLE ops ...;") | |
binding = { | |
"close_handle" => func(handle) {}, | |
"load_bytes" => func(handle, key) { | |
return d.execute( | |
"SELECT * FROM blob WHERE key = %0 AND vbucketId = %1", | |
key, handle.vbucketId)[0] | |
}, | |
"store_bytes" => func(handle, key, blob) { | |
d.execute( | |
"UPDATE OR INSERT INTO blob VALUES (%0, %1, %2)", | |
key, handle.vbucketId, blob) | |
}, | |
"send_message" => func(handle, opcode, body) { | |
c.sendMessage(handle.vbucketId, opcode, body) | |
}, | |
"item_op_received" => func(handle, op, seqNum, item) { | |
# op is MUTATION, DELETION. | |
d.execute("INSERT INTO ops VALUES (vbucketId, op, seqNum, item, snapshotId)", | |
handle.vbucketId, op, seqNum, item, handle.snapshotId) | |
}, | |
"snapshot_start_received" => func(handle) { | |
handle.snapshotId = uuid.gen() | |
}, | |
"snapshot_end_received" => func(handle) { | |
// no-op here, since the backup use-case is simple. | |
}, | |
"rollback" => func(handle, seqNum) { | |
d.execute("DELETE FROM ops WHERE vbucketId = %0 and seqNum > %1", | |
handle.vbucketId, seqNum) | |
} | |
} | |
def createHandle(vbucketId): | |
# A handle is opaque callback data that libupr passes back to us | |
# on any binding callback function invocations. | |
handle = { | |
"vbucketId" => vbucketId, | |
"snapshotId" => uuid.gen() | |
# We could put c and d in the handle too, | |
# to get rid of global variables, but | |
# keeping it simple instead. | |
} | |
return handle | |
} | |
m = {} # map from vbucketId to tuple of libupr-session | |
for vbucketId in c.getVBucketIds(): | |
handle = createHandle(vbucketId) | |
s = libupr.start_session(handle, binding) # I'm a replica, and the peer is the master. | |
m[vbucketId] = s | |
# A message pump loop. | |
while True: | |
err, vbucketId, opcode, body = c.receiveMsg() | |
if err: | |
for s in m: | |
libupr.terminate_session(s) | |
sys.exit(err) | |
s = m[vbucketId] | |
libupr.handle_message(s, opcode, body) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# fake, pseudocode restartable restore client that uses libupr and a | |
# mythical memcached-binary networking library (mcb). | |
import sys | |
import sqlite | |
import libupr | |
import mcb | |
c = mcb.listen(sys.argv[0]).accept() | |
d = sqlite.open(sys.argv[1]) | |
binding = { | |
"close_handle" => func(handle) {}, | |
"load_bytes" => func(handle, key) { | |
return handle.overlay[key] or d.execute( | |
"SELECT * FROM blob WHERE key = %0 AND vbucketId = %1", | |
key, handle.vbucketId)[0] | |
}, | |
"store_bytes" => func(handle, key, blob) { | |
handle.overlay[key] = blob # Not mutating our backup file. | |
}, | |
"send_message" => func(handle, opcode, body) { | |
c.sendMessage(handle.vbucketId, opcode, body) | |
}, | |
"ok_to_handle_snapshots_and_items" => func(handle, lastSeqNum) { | |
Thread.new(func() { | |
pushSnapshotsToLibUPR(handle.s, handle.vbucketId, lastSeqNum) | |
}) | |
} | |
} | |
def pushSnapshotsToLibUPR(s, vbucketId, lastSeqNum): | |
lastSnapshotId = nil | |
cursor = d.execute("SELECT op, seqNum, item, snapshotId FROM ops" + | |
" WHERE vbucketId = %0 AND seqNum > %1"+ | |
" ORDER BY seqNum ASC", | |
vbucketId, lastSeqNum) | |
while True: | |
done, op, seqNum, item, snapshotId = cursor.next() | |
if done: | |
# Need a clean way to shutdown c when we're done restoring all vbuckets. | |
break | |
if lastShapshotId != snapshotId: | |
if lastSnapshotId: | |
libupr.end_snapshot(s) | |
libupr.start_snapshot(s) | |
libupr.handle_next_item(s, op, seqNum, item) | |
def createHandle(vbucketId): | |
handle = { | |
"vbucketId" => vbucketId, | |
"overlay" => {} # Using an overlay to not to mutate the backup file. | |
# We could put c and d in the handle too, | |
# to get rid of global variables, but | |
# keeping it simple instead. | |
} | |
return handle | |
m = {} # map from vbucketId to libupr-session | |
for vbucketId in d.execute("SELECT DISTINCT vbucketId FROM ops"): | |
mcb.addVBucket(vbucketId) | |
handle = createHandle(vbucketId) | |
s = libupr.start_master_session(handle, binding, force=True) | |
handle.s = s | |
m[vbucketId] = s | |
# A message pump loop. | |
while True: | |
err, vbucketId, opcode, body = c.receiveMsg() | |
if err: | |
for s in m: | |
libupr.terminate_session(s) | |
sys.exit(err) | |
if opcode == mcb.AUTH: | |
c.sendResponse(mcb.SUCCESS) | |
continue | |
handle = m[vbucketId] | |
libupr.handle_message(handle.s, opcode, body) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment