Skip to content

Instantly share code, notes, and snippets.

@ankitml
Last active April 13, 2017 18:56
Show Gist options
  • Save ankitml/143db3e5a1b3b56bca16d362f9f7a3d0 to your computer and use it in GitHub Desktop.
Save ankitml/143db3e5a1b3b56bca16d362f9f7a3d0 to your computer and use it in GitHub Desktop.
Interactive Django migrations. Supports +1/-1 interface for django migrations, similar to Alembic for SQLAlchemy. Also supports fast forward and fast rewind facility"
from django.core.management.base import BaseCommand, CommandError
from django_redis import get_redis_connection
from django.db import DEFAULT_DB_ALIAS, connections
from django.db.migrations.loader import MigrationLoader
from django.core.management import call_command
class Command(BaseCommand):
help = """Interactive Django migrations. Supports +1/-1 interface for django migrations,
similar to Alembic for SQLAlchemy. Also supports fast forward and fast rewind
facility for forwarding and undoing all migrations. Clear for starting a fresh deployment"""
cache_key = "command:imigrate"
blacklisted_apps = ["djcelery"]
def add_arguments(self, parser):
parser.add_argument(
'--database', action='store', dest='database', default=DEFAULT_DB_ALIAS,
help='Nominates a database to synchronize. Defaults to the "default" database.',
)
parser.add_argument(
'action', nargs='?',
help='Action - list, forward, backward, clear',
)
def setup(self):
self.redis = get_redis_connection("default")
self.pending_migrations = self.get_pending_migrations(self.database)
self.cached_migrations = self.get_cached_migrations()
self.done_migrations = self.get_done_migrations(self.database)
def handle(self, *args, **kwargs):
self.database = kwargs['database']
action = kwargs['action']
self.setup()
call_map = {
'list': self.handle_list,
'forward': self.handle_forward,
'rewind': self.handle_rollback,
'fast-forward': self.handle_forward_all,
'fast-rewind': self.handle_rollback_all,
'clear': self.handle_clear
}
try:
call_map[action]()
except KeyError:
return "bad input, 'list, forward, fast-forward, rewind, fast-rewind and clear' are correct commands"
def end(self):
self.setup()
self.handle_list()
def handle_list(self):
formatters = {
'RED': '\033[91m',
'GREEN': '\033[92m',
'END': '\033[0m',
}
print()
print()
print('{GREEN}-------------- TO MIGRATE -----------{END}'.format(**formatters))
for index,migration in enumerate(self.pending_migrations):
# print('Master is currently {GREEN}red{END}!'.format(**formatters))
print("{i}. {GREEN}{0}{END}: {1}".format(migration[0].ljust(18), migration[1], **formatters, i=index+1))
print()
print()
for index,migration in enumerate(self.cached_migrations):
if index ==0:
print('{RED}-------------- CAN BE ROLLED BACK -----------{END}'.format(**formatters))
print("{i}. {RED}{0}{END}: {1}".format(migration[0].ljust(18), migration[1], **formatters, i=index+1))
def handle_rollback(self, *args, **kwargs):
try:
app_name, migration_name = self.cached_migrations[0]
except IndexError:
print('No migrations to rollback')
return False
old_app_migrations = [node for node in self.done_migrations if node[0] == app_name]
old_app_migrations = [node for node in old_app_migrations if node[1] != migration_name]
previous_migration_state = old_app_migrations[-1]
try:
call_command('migrate', previous_migration_state[0], previous_migration_state[1])
rollbacked = True
except CommandError as e:
print(e)
if rollbacked:
self.redis.lpop(self.cache_key)
self.end()
return True
def handle_forward(self, *args, **kwargs):
try:
app_name, migration_name = self.pending_migrations[0]
except IndexError:
print('No migration to apply')
return False
try:
call_command('migrate', app_name, migration_name)
migrated = True
except CommandError as e:
print(e)
if migrated:
migration_string = "{0}:{1}".format(app_name, migration_name)
self.redis.lpush(self.cache_key,migration_string)
self.end()
return True
def handle_forward_all(self, *args, **kwargs):
while True:
result = self.handle_forward()
if not result:
break
def handle_rollback_all(self, *args, **kwargs):
while True:
result = self.handle_rollback()
if not result:
break
def handle_clear(self):
self.redis.delete(self.cache_key)
self.end()
def get_done_migrations(self, database, app_name=None):
plan, done = self.get_all_migrations(database)
applied_migrations = []
for node in plan:
if node.key in done:
applied_migrations.append((node[0], node[1]))
return applied_migrations
def get_pending_migrations(self, database, app_name=None):
plan, done = self.get_all_migrations(database)
unapplied_migrations = []
for node in plan:
if node.key not in done:
unapplied_migrations.append((node[0], node[1]))
return unapplied_migrations
def get_all_migrations(self, database):
connection = connections[database]
loader = MigrationLoader(connection)
graph = loader.graph
#ignore app names for now
targets = graph.leaf_nodes()
plan = []
seen = set()
for target in targets:
for migration in graph.forwards_plan(target):
if migration not in seen and migration[0] not in self.blacklisted_apps:
node = graph.node_map[migration]
plan.append(node)
seen.add(migration)
return plan, loader.applied_migrations
def get_cached_migrations(self, app_name=None):
migration_bytes = self.redis.lrange(self.cache_key, 0, -1)
migration_strings = [i.decode("utf-8") for i in migration_bytes]
return [i.split(":") for i in migration_strings]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment