Created
July 12, 2013 20:43
-
-
Save snej/5987686 to your computer and use it in GitHub Desktop.
New implementation of memcached.CAS that doesn't reserve the client connection for the duration of the transaction.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type CASState struct { | |
initialized bool // false on the first call to CASNext, then true | |
Value []byte // Current value of key; update in place to new value | |
Cas uint64 // Current CAS value of key | |
Exists bool // Does a value exist for the key? (If not, Value will be nil) | |
Err error // Error, if any, after CASNext returns false | |
resp *gomemcached.MCResponse | |
} | |
// Non-callback, loop-based version of CAS method. Usage is like this: | |
// | |
// var state memcached.CASState | |
// for client.CASNext(vb, key, exp, &state) { | |
// state.Value = some_mutation(state.Value) | |
// } | |
// if state.Err != nil { ... } | |
func (client *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool { | |
if state.initialized { | |
if !state.Exists { | |
// Adding a new key: | |
if state.Value == nil { | |
state.Cas = 0 | |
return false // no-op (delete of non-existent value) | |
} | |
state.resp, state.Err = UnwrapMemcachedError(client.Add(vb, k, 0, exp, state.Value)) | |
if state.Err != nil { | |
return false | |
} else if state.resp.Status == gomemcached.SUCCESS { | |
return false // Successful add | |
} | |
} else { | |
// Updating / deleting a key: | |
req := &gomemcached.MCRequest{ | |
Opcode: gomemcached.DELETE, | |
VBucket: vb, | |
Key: []byte(k), | |
Cas: state.Cas} | |
if state.Value != nil { | |
req.Opcode = gomemcached.SET | |
req.Opaque = 0 | |
req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0} | |
req.Body = state.Value | |
flags := 0 | |
exp := 0 // ??? Should we use initialexp here instead? | |
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp)) | |
} | |
if state.resp, state.Err = client.Send(req); state.Err == nil { | |
return false // Successful update or delete! | |
} | |
} | |
} | |
// Initial call, or after a conflict: GET the current value and CAS and return them: | |
state.initialized = true | |
if state.resp, state.Err = client.Get(vb, k); state.Err == nil { | |
state.Exists = true | |
state.Value = state.resp.Body | |
state.Cas = state.resp.Cas | |
} else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT { | |
state.Err = nil | |
state.Exists = false | |
state.Value = nil | |
state.Cas = 0 | |
} else { | |
return false | |
} | |
return true // keep going... | |
} | |
// A function to perform a CAS transform. | |
// Input is the current value, or nil if no value exists. | |
// The function should return the new value (if any) to set, and the store/quit/delete operation. | |
type CasFunc func(current []byte) ([]byte, CasOp) | |
// Perform a CAS transform with the given function. | |
// | |
// If the value does not exist, a nil current value will be sent to f. | |
func (client *Client) CAS(vb uint16, k string, f CasFunc, | |
initexp int) (*gomemcached.MCResponse, error) { | |
var state CASState | |
for client.CASNext(vb, k, initexp, &state) { | |
newValue, operation := f(state.Value) | |
if operation == CASQuit || (operation == CASDelete && state.Value == nil) { | |
return nil, operation | |
} | |
state.Value = newValue | |
} | |
return state.resp, state.Err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment