Last active
April 13, 2017 18:56
-
-
Save ankitml/143db3e5a1b3b56bca16d362f9f7a3d0 to your computer and use it in GitHub Desktop.
Interactive Django migrations. Supports +1/-1 interface for django migrations, similar to Alembic for SQLAlchemy. Also supports fast forward and fast rewind facility"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.core.management.base import BaseCommand, CommandError | |
from django_redis import get_redis_connection | |
from django.db import DEFAULT_DB_ALIAS, connections | |
from django.db.migrations.loader import MigrationLoader | |
from django.core.management import call_command | |
class Command(BaseCommand): | |
help = """Interactive Django migrations. Supports +1/-1 interface for django migrations, | |
similar to Alembic for SQLAlchemy. Also supports fast forward and fast rewind | |
facility for forwarding and undoing all migrations. Clear for starting a fresh deployment""" | |
cache_key = "command:imigrate" | |
blacklisted_apps = ["djcelery"] | |
def add_arguments(self, parser): | |
parser.add_argument( | |
'--database', action='store', dest='database', default=DEFAULT_DB_ALIAS, | |
help='Nominates a database to synchronize. Defaults to the "default" database.', | |
) | |
parser.add_argument( | |
'action', nargs='?', | |
help='Action - list, forward, backward, clear', | |
) | |
def setup(self): | |
self.redis = get_redis_connection("default") | |
self.pending_migrations = self.get_pending_migrations(self.database) | |
self.cached_migrations = self.get_cached_migrations() | |
self.done_migrations = self.get_done_migrations(self.database) | |
def handle(self, *args, **kwargs): | |
self.database = kwargs['database'] | |
action = kwargs['action'] | |
self.setup() | |
call_map = { | |
'list': self.handle_list, | |
'forward': self.handle_forward, | |
'rewind': self.handle_rollback, | |
'fast-forward': self.handle_forward_all, | |
'fast-rewind': self.handle_rollback_all, | |
'clear': self.handle_clear | |
} | |
try: | |
call_map[action]() | |
except KeyError: | |
return "bad input, 'list, forward, fast-forward, rewind, fast-rewind and clear' are correct commands" | |
def end(self): | |
self.setup() | |
self.handle_list() | |
def handle_list(self): | |
formatters = { | |
'RED': '\033[91m', | |
'GREEN': '\033[92m', | |
'END': '\033[0m', | |
} | |
print() | |
print() | |
print('{GREEN}-------------- TO MIGRATE -----------{END}'.format(**formatters)) | |
for index,migration in enumerate(self.pending_migrations): | |
# print('Master is currently {GREEN}red{END}!'.format(**formatters)) | |
print("{i}. {GREEN}{0}{END}: {1}".format(migration[0].ljust(18), migration[1], **formatters, i=index+1)) | |
print() | |
print() | |
for index,migration in enumerate(self.cached_migrations): | |
if index ==0: | |
print('{RED}-------------- CAN BE ROLLED BACK -----------{END}'.format(**formatters)) | |
print("{i}. {RED}{0}{END}: {1}".format(migration[0].ljust(18), migration[1], **formatters, i=index+1)) | |
def handle_rollback(self, *args, **kwargs): | |
try: | |
app_name, migration_name = self.cached_migrations[0] | |
except IndexError: | |
print('No migrations to rollback') | |
return False | |
old_app_migrations = [node for node in self.done_migrations if node[0] == app_name] | |
old_app_migrations = [node for node in old_app_migrations if node[1] != migration_name] | |
previous_migration_state = old_app_migrations[-1] | |
try: | |
call_command('migrate', previous_migration_state[0], previous_migration_state[1]) | |
rollbacked = True | |
except CommandError as e: | |
print(e) | |
if rollbacked: | |
self.redis.lpop(self.cache_key) | |
self.end() | |
return True | |
def handle_forward(self, *args, **kwargs): | |
try: | |
app_name, migration_name = self.pending_migrations[0] | |
except IndexError: | |
print('No migration to apply') | |
return False | |
try: | |
call_command('migrate', app_name, migration_name) | |
migrated = True | |
except CommandError as e: | |
print(e) | |
if migrated: | |
migration_string = "{0}:{1}".format(app_name, migration_name) | |
self.redis.lpush(self.cache_key,migration_string) | |
self.end() | |
return True | |
def handle_forward_all(self, *args, **kwargs): | |
while True: | |
result = self.handle_forward() | |
if not result: | |
break | |
def handle_rollback_all(self, *args, **kwargs): | |
while True: | |
result = self.handle_rollback() | |
if not result: | |
break | |
def handle_clear(self): | |
self.redis.delete(self.cache_key) | |
self.end() | |
def get_done_migrations(self, database, app_name=None): | |
plan, done = self.get_all_migrations(database) | |
applied_migrations = [] | |
for node in plan: | |
if node.key in done: | |
applied_migrations.append((node[0], node[1])) | |
return applied_migrations | |
def get_pending_migrations(self, database, app_name=None): | |
plan, done = self.get_all_migrations(database) | |
unapplied_migrations = [] | |
for node in plan: | |
if node.key not in done: | |
unapplied_migrations.append((node[0], node[1])) | |
return unapplied_migrations | |
def get_all_migrations(self, database): | |
connection = connections[database] | |
loader = MigrationLoader(connection) | |
graph = loader.graph | |
#ignore app names for now | |
targets = graph.leaf_nodes() | |
plan = [] | |
seen = set() | |
for target in targets: | |
for migration in graph.forwards_plan(target): | |
if migration not in seen and migration[0] not in self.blacklisted_apps: | |
node = graph.node_map[migration] | |
plan.append(node) | |
seen.add(migration) | |
return plan, loader.applied_migrations | |
def get_cached_migrations(self, app_name=None): | |
migration_bytes = self.redis.lrange(self.cache_key, 0, -1) | |
migration_strings = [i.decode("utf-8") for i in migration_bytes] | |
return [i.split(":") for i in migration_strings] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment