Skip to content

Instantly share code, notes, and snippets.

@dsoprea
Last active June 28, 2024 05:28
Show Gist options
  • Save dsoprea/6ebd1e63e613349c9f4ddc4cf0cd575c to your computer and use it in GitHub Desktop.
Save dsoprea/6ebd1e63e613349c9f4ddc4cf0cd575c to your computer and use it in GitHub Desktop.
Redis: Transaction Queued Operations
import logging
import collections
_LOGGER = logging.getLogger(__name__)
_OPERATION_METADATA_COLLECTION = \
collections.namedtuple(
'_OPERATION_METADATA_COLLECTION', [
'reader_input_metadata',
'reader_output_metadata',
'writer_output_metadata',
])
class TransactionQueuedOperations(object):
def __init__(self, r, metadata=None):
if metadata is None:
metadata = {}
self._resource = r
self._metadata = metadata
self._watch_keys_s = set()
self._callbacks_and_metadata = []
self._operations_metadata = {}
self._has_run = False
def add_operation(
self, name, watch_keys, read_fn, write_fn, **reader_input_metadata):
# TODO(dustin): Add test
assert \
self._has_run is False, \
"Can only add operations before running."
assert \
watch_keys, \
"At least one watched-key is necessary per operation."
assert \
name not in self._operations_metadata, \
"Name [{}] already used for another operation.".format(name)
self._watch_keys_s.update(watch_keys)
self._callbacks_and_metadata.append((
name,
read_fn,
write_fn,
))
# Keep separate dictionary for callback metadata, reader metadata, and
# writer metadata
self._operations_metadata[name] = \
_OPERATION_METADATA_COLLECTION(
reader_input_metadata=reader_input_metadata,
reader_output_metadata={},
writer_output_metadata={})
@property
def metadata(self):
return self._metadata
def get_operation_metadata(self, name):
return self._operations_metadata[name]
def _get_function_reference(self, fn):
# TODO(dustin): Add test
# Add module name
scope = [
fn.__module__,
]
# If method in an object
try:
o = fn.__self__
except AttributeError:
pass
else:
scope += [
o.__class__.__name__,
]
# If closure
try:
parts = fn.__qualname__.split('.')
except AttributeError:
pass
else:
scope += parts[:-1]
# Add function name
scope += [
fn.__name__,
]
return scope
def dump(self):
# TODO(dustin): Add test
exported_operations = []
for name, read_fn, write_fn in self._callbacks_and_metadata:
metadata = {}
om = self._operations_metadata[name]
if om.reader_input_metadata:
metadata['reader_input_metadata'] = om.reader_input_metadata
if om.reader_output_metadata:
metadata['reader_output_metadata'] = om.reader_output_metadata
if om.writer_output_metadata:
metadata['writer_output_metadata'] = om.writer_output_metadata
exported_operation = {
'name': name,
'read_fn': self._get_function_reference(read_fn),
'write_fn': self._get_function_reference(write_fn),
'metadata': metadata,
}
exported_operations.append(exported_operation)
exported = {
'operations': exported_operations,
}
return exported
def run(self, intermediate_cb=None):
assert \
self._callbacks_and_metadata, \
"No operations have been added."
self._has_run = True
def fn(p):
# Perform read operations
q = []
aggregate_context = {}
for i, (name, read_fn, write_fn) \
in enumerate(self._callbacks_and_metadata):
om = self._operations_metadata[name]
_LOGGER.debug("TRANSACTION: Executing operation ({}) [{}] "
"{}".format(i, name, om.reader_input_metadata))
# the reader cna add additional operations. Note that if the
# transaction is rerun then the added operation will already be
# present and the add will fail.
try:
reader_output_metadata = read_fn(self, p, om)
except Exception as e:
_LOGGER.exception("Operation ({}) [{}] failed: [{}]".format(
i, name, str(e)))
raise
if reader_output_metadata:
# Stash output
om.reader_output_metadata.update(reader_output_metadata)
# Collect the read results to return in the overall result
aggregate_context.update(reader_output_metadata)
# Queue corresponding writer function
q.append((name, write_fn))
# Invoke intermediate callback. This supports faulting the
# transaction from tests.
if intermediate_cb is not None:
intermediate_cb(aggregate_context)
# Perform write operations
p.multi()
for i, (name, write_fn) in enumerate(q):
om = self._operations_metadata[name]
try:
writer_output_metadata = write_fn(self, p, om)
except Exception as e:
raise \
Exception(
"Operation ({}) [{}] failed: [{}]".format(
i, name, str(e))) \
from e
raise
if writer_output_metadata is not None:
# Stash output
om.writer_output_metadata.update(writer_output_metadata)
# Collect the write results to return in the overall result
aggregate_context.update(writer_output_metadata)
return aggregate_context
watch_keys = list(self._watch_keys_s)
action = \
self._resource.transaction(
fn,
*watch_keys,
value_from_callable=True)
return action
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment