Created
March 22, 2011 21:04
-
-
Save ericmoritz/882055 to your computer and use it in GitHub Desktop.
A counter stored in Riak that handles conflict resolutions on read
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import riak | |
log = logging.getLogger(__name__) | |
class RiakCounter(object): | |
def __init__(self, bucket, key): | |
self.bucket = bucket | |
self.bucket.set_allow_multiples(True) | |
self.key = key | |
def add(self, step, obj=None): | |
if obj is None: | |
obj = self._get_with_resolution() | |
data = obj.get_data() | |
data['last'] = data['last'] + data['step'] | |
data['step'] = step | |
log.debug(data) | |
obj.set_data(data) | |
obj.store() | |
def reset(self, start=0): | |
obj = self._get_with_resolution() | |
data = obj.get_data() | |
data['last'] = start | |
data['step'] = 0 | |
log.debug(data) | |
obj.set_data(data) | |
obj.store() | |
def _get_with_resolution(self): | |
obj = self.bucket.get(self.key) | |
# If the object doesn't exist, return a new object with | |
# a value of zero | |
# If the object has siblings, resolve the conflict | |
if obj.has_siblings(): | |
first_obj = obj.get_sibling(0) | |
first = first_obj.get_data() | |
# For each sibling, apply the steps to the first's steps | |
for sibling_obj in map(obj.get_sibling, | |
range(1, obj.get_sibling_count())): | |
sibling = sibling_obj.get_data() | |
first['step'] += sibling['step'] | |
# Update the data with the new last and step values | |
first_obj.set_data(first) | |
log.debug("300: %s" % (first_obj.get_data())) | |
return first_obj | |
elif not obj.exists(): | |
obj = self.bucket.new(self.key, data={'last': 0, | |
'step': 0}) | |
log.debug("404: %s" % (obj.get_data())) | |
return obj | |
else: | |
log.debug("200: %s" % (obj.get_data())) | |
return obj | |
@property | |
def value(self): | |
obj = self._get_with_resolution() | |
data = obj.get_data() | |
return data['last'] + data['step'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from counter import RiakCounter | |
import logging | |
import riak | |
logging.basicConfig(level=logging.DEBUG) | |
# Create 3 independent counters | |
c1 = RiakCounter(riak.RiakClient(port=8098).bucket("test"), "counter") | |
c2 = RiakCounter(riak.RiakClient(port=8098).bucket("test"), "counter") | |
c3 = RiakCounter(riak.RiakClient(port=8098).bucket("test"), "counter") | |
# Counter 1 will increment the counter to three | |
c1.reset() # 0 | |
c1.add(1) # 1 | |
c1.add(1) # 2 | |
c1.add(1) # 3 | |
assert c1.value == 3, c1.value | |
# Counter 2 and Counter 3 will increment the counter in parallel | |
# Split reality by fetching two copies of the some object | |
o2 = c2.bucket.get(c2.key) | |
o3 = c3.bucket.get(c3.key) | |
# Attempt to update the values using that copy, this should break the | |
# space/time continuum. | |
c2.add(1, obj=o2) # 4 | |
c3.add(10, obj=o3) # 14 | |
# c1 should get a conflicted value and the conflict should be resolved with the value returned. | |
assert c1.value == 14, c1.value |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment