Skip to content

Instantly share code, notes, and snippets.

@steveyen
Last active December 22, 2015 10:28
Show Gist options
  • Save steveyen/6458663 to your computer and use it in GitHub Desktop.
Save steveyen/6458663 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# fake, pseudocode for an incremental backup client that uses libupr
# and a mythical memcached-binary network library (mcb).
import sys
import sqlite
import uuid
import libupr
import mcb
c = mcb.connect(sys.argv[0])
c.authenticate(sys.argv[1])
d = sqlite.open(sys.argv[2])
d.execute("CREATE TABLE blob ...; CREATE TABLE ops ...;")
binding = {
"close_handle" => func(handle) {},
"load_bytes" => func(handle, key) {
return d.execute(
"SELECT * FROM blob WHERE key = %0 AND vbucketId = %1",
key, handle.vbucketId)[0]
},
"store_bytes" => func(handle, key, blob) {
d.execute(
"UPDATE OR INSERT INTO blob VALUES (%0, %1, %2)",
key, handle.vbucketId, blob)
},
"send_message" => func(handle, opcode, body) {
c.sendMessage(handle.vbucketId, opcode, body)
},
"item_op_received" => func(handle, op, seqNum, item) {
# op is MUTATION, DELETION.
d.execute("INSERT INTO ops VALUES (vbucketId, op, seqNum, item, snapshotId)",
handle.vbucketId, op, seqNum, item, handle.snapshotId)
},
"snapshot_start_received" => func(handle) {
handle.snapshotId = uuid.gen()
},
"snapshot_end_received" => func(handle) {
// no-op here, since the backup use-case is simple.
},
"rollback" => func(handle, seqNum) {
d.execute("DELETE FROM ops WHERE vbucketId = %0 and seqNum > %1",
handle.vbucketId, seqNum)
}
}
def createHandle(vbucketId):
# A handle is opaque callback data that libupr passes back to us
# on any binding callback function invocations.
handle = {
"vbucketId" => vbucketId,
"snapshotId" => uuid.gen()
# We could put c and d in the handle too,
# to get rid of global variables, but
# keeping it simple instead.
}
return handle
}
m = {} # map from vbucketId to tuple of libupr-session
for vbucketId in c.getVBucketIds():
handle = createHandle(vbucketId)
s = libupr.start_session(handle, binding) # I'm a replica, and the peer is the master.
m[vbucketId] = s
# A message pump loop.
while True:
err, vbucketId, opcode, body = c.receiveMsg()
if err:
for s in m:
libupr.terminate_session(s)
sys.exit(err)
s = m[vbucketId]
libupr.handle_message(s, opcode, body)
#!/usr/bin/env python
# fake, pseudocode restartable restore client that uses libupr and a
# mythical memcached-binary networking library (mcb).
import sys
import sqlite
import libupr
import mcb
c = mcb.listen(sys.argv[0]).accept()
d = sqlite.open(sys.argv[1])
binding = {
"close_handle" => func(handle) {},
"load_bytes" => func(handle, key) {
return handle.overlay[key] or d.execute(
"SELECT * FROM blob WHERE key = %0 AND vbucketId = %1",
key, handle.vbucketId)[0]
},
"store_bytes" => func(handle, key, blob) {
handle.overlay[key] = blob # Not mutating our backup file.
},
"send_message" => func(handle, opcode, body) {
c.sendMessage(handle.vbucketId, opcode, body)
},
"ok_to_handle_snapshots_and_items" => func(handle, lastSeqNum) {
Thread.new(func() {
pushSnapshotsToLibUPR(handle.s, handle.vbucketId, lastSeqNum)
})
}
}
def pushSnapshotsToLibUPR(s, vbucketId, lastSeqNum):
lastSnapshotId = nil
cursor = d.execute("SELECT op, seqNum, item, snapshotId FROM ops" +
" WHERE vbucketId = %0 AND seqNum > %1"+
" ORDER BY seqNum ASC",
vbucketId, lastSeqNum)
while True:
done, op, seqNum, item, snapshotId = cursor.next()
if done:
# Need a clean way to shutdown c when we're done restoring all vbuckets.
break
if lastShapshotId != snapshotId:
if lastSnapshotId:
libupr.end_snapshot(s)
libupr.start_snapshot(s)
libupr.handle_next_item(s, op, seqNum, item)
def createHandle(vbucketId):
handle = {
"vbucketId" => vbucketId,
"overlay" => {} # Using an overlay to not to mutate the backup file.
# We could put c and d in the handle too,
# to get rid of global variables, but
# keeping it simple instead.
}
return handle
m = {} # map from vbucketId to libupr-session
for vbucketId in d.execute("SELECT DISTINCT vbucketId FROM ops"):
mcb.addVBucket(vbucketId)
handle = createHandle(vbucketId)
s = libupr.start_master_session(handle, binding, force=True)
handle.s = s
m[vbucketId] = s
# A message pump loop.
while True:
err, vbucketId, opcode, body = c.receiveMsg()
if err:
for s in m:
libupr.terminate_session(s)
sys.exit(err)
if opcode == mcb.AUTH:
c.sendResponse(mcb.SUCCESS)
continue
handle = m[vbucketId]
libupr.handle_message(handle.s, opcode, body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment