Last active
June 28, 2024 05:28
-
-
Save dsoprea/6ebd1e63e613349c9f4ddc4cf0cd575c to your computer and use it in GitHub Desktop.
Redis: Transaction Queued Operations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import collections | |
_LOGGER = logging.getLogger(__name__) | |
_OPERATION_METADATA_COLLECTION = \ | |
collections.namedtuple( | |
'_OPERATION_METADATA_COLLECTION', [ | |
'reader_input_metadata', | |
'reader_output_metadata', | |
'writer_output_metadata', | |
]) | |
class TransactionQueuedOperations(object): | |
def __init__(self, r, metadata=None): | |
if metadata is None: | |
metadata = {} | |
self._resource = r | |
self._metadata = metadata | |
self._watch_keys_s = set() | |
self._callbacks_and_metadata = [] | |
self._operations_metadata = {} | |
self._has_run = False | |
def add_operation( | |
self, name, watch_keys, read_fn, write_fn, **reader_input_metadata): | |
# TODO(dustin): Add test | |
assert \ | |
self._has_run is False, \ | |
"Can only add operations before running." | |
assert \ | |
watch_keys, \ | |
"At least one watched-key is necessary per operation." | |
assert \ | |
name not in self._operations_metadata, \ | |
"Name [{}] already used for another operation.".format(name) | |
self._watch_keys_s.update(watch_keys) | |
self._callbacks_and_metadata.append(( | |
name, | |
read_fn, | |
write_fn, | |
)) | |
# Keep separate dictionary for callback metadata, reader metadata, and | |
# writer metadata | |
self._operations_metadata[name] = \ | |
_OPERATION_METADATA_COLLECTION( | |
reader_input_metadata=reader_input_metadata, | |
reader_output_metadata={}, | |
writer_output_metadata={}) | |
@property | |
def metadata(self): | |
return self._metadata | |
def get_operation_metadata(self, name): | |
return self._operations_metadata[name] | |
def _get_function_reference(self, fn): | |
# TODO(dustin): Add test | |
# Add module name | |
scope = [ | |
fn.__module__, | |
] | |
# If method in an object | |
try: | |
o = fn.__self__ | |
except AttributeError: | |
pass | |
else: | |
scope += [ | |
o.__class__.__name__, | |
] | |
# If closure | |
try: | |
parts = fn.__qualname__.split('.') | |
except AttributeError: | |
pass | |
else: | |
scope += parts[:-1] | |
# Add function name | |
scope += [ | |
fn.__name__, | |
] | |
return scope | |
def dump(self): | |
# TODO(dustin): Add test | |
exported_operations = [] | |
for name, read_fn, write_fn in self._callbacks_and_metadata: | |
metadata = {} | |
om = self._operations_metadata[name] | |
if om.reader_input_metadata: | |
metadata['reader_input_metadata'] = om.reader_input_metadata | |
if om.reader_output_metadata: | |
metadata['reader_output_metadata'] = om.reader_output_metadata | |
if om.writer_output_metadata: | |
metadata['writer_output_metadata'] = om.writer_output_metadata | |
exported_operation = { | |
'name': name, | |
'read_fn': self._get_function_reference(read_fn), | |
'write_fn': self._get_function_reference(write_fn), | |
'metadata': metadata, | |
} | |
exported_operations.append(exported_operation) | |
exported = { | |
'operations': exported_operations, | |
} | |
return exported | |
def run(self, intermediate_cb=None): | |
assert \ | |
self._callbacks_and_metadata, \ | |
"No operations have been added." | |
self._has_run = True | |
def fn(p): | |
# Perform read operations | |
q = [] | |
aggregate_context = {} | |
for i, (name, read_fn, write_fn) \ | |
in enumerate(self._callbacks_and_metadata): | |
om = self._operations_metadata[name] | |
_LOGGER.debug("TRANSACTION: Executing operation ({}) [{}] " | |
"{}".format(i, name, om.reader_input_metadata)) | |
# the reader cna add additional operations. Note that if the | |
# transaction is rerun then the added operation will already be | |
# present and the add will fail. | |
try: | |
reader_output_metadata = read_fn(self, p, om) | |
except Exception as e: | |
_LOGGER.exception("Operation ({}) [{}] failed: [{}]".format( | |
i, name, str(e))) | |
raise | |
if reader_output_metadata: | |
# Stash output | |
om.reader_output_metadata.update(reader_output_metadata) | |
# Collect the read results to return in the overall result | |
aggregate_context.update(reader_output_metadata) | |
# Queue corresponding writer function | |
q.append((name, write_fn)) | |
# Invoke intermediate callback. This supports faulting the | |
# transaction from tests. | |
if intermediate_cb is not None: | |
intermediate_cb(aggregate_context) | |
# Perform write operations | |
p.multi() | |
for i, (name, write_fn) in enumerate(q): | |
om = self._operations_metadata[name] | |
try: | |
writer_output_metadata = write_fn(self, p, om) | |
except Exception as e: | |
raise \ | |
Exception( | |
"Operation ({}) [{}] failed: [{}]".format( | |
i, name, str(e))) \ | |
from e | |
raise | |
if writer_output_metadata is not None: | |
# Stash output | |
om.writer_output_metadata.update(writer_output_metadata) | |
# Collect the write results to return in the overall result | |
aggregate_context.update(writer_output_metadata) | |
return aggregate_context | |
watch_keys = list(self._watch_keys_s) | |
action = \ | |
self._resource.transaction( | |
fn, | |
*watch_keys, | |
value_from_callable=True) | |
return action |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment