Created
September 18, 2020 05:51
-
-
Save SpainTrain/1460db52bc4cf4126eb2f6b8158d59ff to your computer and use it in GitHub Desktop.
Approach to migrations in Google Cloud Datastore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# From https://cloud.google.com/appengine/articles/deferred | |
import logging | |
from google.appengine.ext import deferred, ndb | |
from google.appengine.runtime import DeadlineExceededError | |
class Mapper(object): | |
# Subclasses should replace this with a model class (eg, model.Person). | |
KIND = None | |
# Subclasses can replace this with a list of (property, value) tuples to filter by. | |
FILTERS = [] | |
def __init__(self): | |
self.to_put = [] | |
self.to_delete = [] | |
self.entities_processed_count = 0 | |
def map(self, entity): # pylint: disable=unused-argument | |
'''Updates a single entity. | |
Implementers should return a tuple containing two iterables (to_update, to_delete). | |
*IMPORTANT*: When used for migrations, this function should be idempotent! | |
(https://en.wikipedia.org/wiki/Idempotence#Computer_science_meaning) | |
''' | |
return ([], []) | |
def finish(self): | |
'''Called when the mapper has finished, to allow for any final work to be done.''' | |
pass | |
def get_query(self): | |
'''Returns a query over the specified kind, with any appropriate filters applied.''' | |
q = self.KIND.query() | |
for prop, value in self.FILTERS: | |
q = q.filter(prop == value) | |
q = q.order(getattr(self.KIND, '_key')) | |
return q | |
def run(self, batch_size=100): | |
'''Starts the mapper running.''' | |
self._continue(None, batch_size) | |
def _batch_write(self): | |
'''Writes updates and deletes entities in a batch.''' | |
if self.to_put: | |
ndb.put_multi(self.to_put) | |
logging.info('[%s] Updated %d entities', self.__class__.__name__, len(self.to_put)) | |
self.to_put = [] | |
if self.to_delete: | |
ndb.delete_multi(self.to_delete) | |
logging.info('[%s] Deleted %d entities', self.__class__.__name__, len(self.to_delete)) | |
self.to_delete = [] | |
def _continue(self, start_key, batch_size): | |
q = self.get_query() | |
# If we're resuming, pick up where we left off last time. | |
if start_key: | |
key_prop = getattr(self.KIND, '_key') | |
q = q.filter(key_prop > start_key) | |
# Keep updating records until we run out of time. | |
try: | |
# Steps over the results, returning each entity and its index. | |
for i, entity in enumerate(q): | |
map_updates, map_deletes = self.map(entity) | |
self.to_put.extend(map_updates) | |
self.to_delete.extend(map_deletes) | |
# Do updates and deletes in batches. | |
if (i + 1) % batch_size == 0: | |
self._batch_write() | |
# Record the last entity we processed. | |
start_key = entity.key | |
self.entities_processed_count += 1 | |
self._batch_write() | |
except DeadlineExceededError: | |
# Write any unfinished updates to the datastore. | |
self._batch_write() | |
# Queue a new task to pick up where we left off. | |
deferred.defer(self._continue, start_key, batch_size) | |
return | |
logging.info( | |
'[%s] Finished processing %d %s entities', | |
self.__class__.__name__, | |
self.entities_processed_count, | |
self.KIND.__name__, | |
) | |
self.finish() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SomeModelMigration(Mapper): | |
KIND = SomeModel | |
def map(self, data): | |
if data.some_field is 'magic string': | |
data.some_field = 'different magic string' | |
return ([data], []) | |
return ([], []) | |
def run_some_migration(): | |
mapper = SomeModelMigration() | |
deferred.defer(mapper.run) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note that the
map
function is pure and contains only business logic, so unit testing is straightforward.Idempotence is critical, so unit tests should include idempotence tests. All the idempotence tests need to check is that the update and delete buffers are both empty. This allows rerunning the entire migration in case it is interrupted.