Created
May 10, 2019 16:37
-
-
Save TomFaulkner/6d445c85e55c97df920d8722d883109c to your computer and use it in GitHub Desktop.
Dynamo (or other noSQL) record upgrades
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
LOGGER = logging.getLogger(__name__) | |
class SchemaUpgradeException(Exception): | |
"""""" | |
def upgrade(data, versions=None): | |
"""Upgrades Dynamo data to current version using an iterable of upgrades. | |
versions - an iterable of functions that receive and return a dict. | |
Each version function must set the new version. | |
Returns a two tuple, bool whether an upgrade occurred, and data dict. | |
Since current version is determined based on `len(versions)` data versions | |
should be zero indexed. | |
>>> data = {} | |
>>> def v1(data): | |
>>> data['new_key'] = True | |
>>> data['version'] = 1 | |
>>> return data | |
>>> def v2(data): | |
>>> data['great_key'] = 3 | |
>>> data['version'] = 2 | |
>>> return data | |
>>> versions = [v1, v2] | |
>>> current = 2 | |
>>> upgraded, data = upgrade(data, current, versions) | |
(True, {'version': 2, 'new_key': True, 'great_key': 3}) | |
""" | |
if not versions: | |
versions = [] | |
current = len(versions) | |
data_version = data.get('version', 0) | |
if data_version == current: | |
return False, data | |
try: | |
for version in versions[data_version:]: | |
data = version(data) | |
except Exception as e: | |
LOGGER.exception('Failed to upgrade data to current schema.') | |
raise SchemaUpgradeException(e) | |
if data.get('version', 0) == current: | |
return True, data | |
raise SchemaUpgradeException( | |
'Upgrades did not bring data to current version (%s).' % current | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from unittest import TestCase | |
from dynamo_upgrade import upgrade, SchemaUpgradeException | |
def v1(data): | |
data['version'] = 1 | |
data['narrativePagebreaks'] = [] | |
return data | |
def v2(data): | |
data['version'] = 2 | |
return data | |
def v3(data): | |
data['version'] = 3 | |
data['new_key'] = True | |
return data | |
def fail_func(): | |
raise KeyError() | |
def fail_improper_version_after_upgrade(data): | |
data['version'] = 0 | |
return data | |
versions = [ | |
v1, | |
v2, | |
v3, | |
] | |
class TestDynamoUpgrade(TestCase): | |
"""upgrade""" | |
def test_upgrade_from_none_versioned(self): | |
"""upgrade from none to versioned""" | |
upgraded, res = upgrade({}, versions) | |
self.assertDictEqual( | |
{'version': 3, 'narrativePagebreaks': [], 'new_key': True}, | |
res | |
) | |
self.assertTrue(upgraded) | |
def test_upgrade_from_non_zero_version(self): | |
"""upgrade non-zero to versioned doesn't perform earlier upgrades | |
and overwrite data""" | |
upgraded, res = upgrade( | |
{'version': 1, 'narrativePagebreaks': ['1', '2']}, | |
versions | |
) | |
self.assertDictEqual( | |
{'version': 3, 'narrativePagebreaks': ['1', '2'], 'new_key': True}, | |
res, | |
) | |
self.assertTrue(upgraded) | |
def test_upgrade_not_necessary(self): | |
"""no upgrade necessary doesn't run any upgrades""" | |
upgraded, res = upgrade({'version': 3}, versions) | |
self.assertDictEqual({'version': 3}, res) | |
self.assertFalse(upgraded) | |
def test_fail_during_upgrade_raises_exception(self): | |
"""fail during upgrade raises exception""" | |
local_versions = [ | |
v1, | |
fail_func, | |
] | |
with self.assertRaises(SchemaUpgradeException): | |
upgrade({'version': 1}, local_versions) | |
def test_failed_upgrade_raises_exception(self): | |
"""failed upgrade raises exception""" | |
local_versions = [ | |
v1, | |
fail_improper_version_after_upgrade, | |
] | |
with self.assertRaises(SchemaUpgradeException): | |
upgrade({'version': 1}, local_versions) | |
def test_handles_current_version_zero(self): | |
"""handles the current version being zero or no versions list""" | |
local_versions = [] | |
upgraded, data = upgrade({}, local_versions) | |
self.assertDictEqual(data, {}) | |
self.assertFalse(upgraded) | |
upgraded, data = upgrade({}) | |
self.assertDictEqual(data, {}) | |
self.assertFalse(upgraded) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment