Skip to content

Instantly share code, notes, and snippets.

@oduvan
Created June 29, 2012 12:54
Show Gist options
  • Save oduvan/3017754 to your computer and use it in GitHub Desktop.
Save oduvan/3017754 to your computer and use it in GitHub Desktop.
from django import db
from django.conf import settings
from nose.plugins import Plugin
from django.test import TransactionTestCase
import uuid
import warnings
TEST_DB_PREFIX = settings.DATABASES['default']['NAME'] + '__'
def set_cur_db(db_name):
settings.DATABASES['default']['NAME'] =\
settings.DATABASES['default']['TEST_NAME'] =\
db_name
set_cur_db(settings.DATABASES['default']['NAME'])
TEST_DB_0 = TEST_DB_PREFIX + '0'
def psql(sql):
db.close_connection()
cursor = db.connection.cursor()
db.connection.creation._prepare_for_test_db_ddl()
sql = sql.rstrip(';').split(';')
for item in sql:
cursor.execute(item)
print(item)
db.close_connection()
def psql_create_db(name, tmpl_name):
psql(
'DROP DATABASE IF EXISTS "{0}";'
'CREATE DATABASE "{0}" WITH ENCODING=\'UTF8\' TEMPLATE {1};'
.format(name, tmpl_name)
)
class MagicDbWrapper(Plugin):
'''Wrap each test with clear db, has two modes:
1. transaction mode (default mode);
2. unique db mode, enable:
class TestMain(TestCase):
_test_unique_db = True
'''
name = 'magic-db'
@property
def db_conf(self):
return TestSuiteRunner.db_conf
def get_db_name(self, pattern_db_name):
db_name = TEST_DB_PREFIX + pattern_db_name
cursor = db.connection.cursor()
cursor.execute('select count(*) from pg_database where datname=%s',[db_name])
if cursor.fetchone()[0]:
return db_name
psql_create_db(db_name, pattern_db_name)
return db_name
def prepareTestCase(self, test):
test = test.test
# don't use django wraps of database
if isinstance(test, TransactionTestCase):
test._fixture_setup = lambda: None
test._fixture_teardown = lambda: None
self.db_src = self.db_conf['NAME']
if hasattr(test, '_test_db'):
db_name = self.get_db_name(test._test_db)
else:
db_name = TEST_DB_0
set_cur_db(db_name)
if hasattr(test, '_test_unique_db'):
self.start_unique_db()
else:
self.start_transaction()
def afterTest(self, test):
if hasattr(test.test, '_test_unique_db'):
self.stop_unique_db()
else:
self.stop_transaction()
set_cur_db(self.db_src)
db.close_connection()
def start_unique_db(self):
src = self.db_conf['TEST_NAME']
self.db_new = new = '%s_%s' % (src, uuid.uuid4().hex)
psql_create_db(new, src)
set_cur_db(new)
def stop_unique_db(self):
set_cur_db(self.db_src)
psql('DROP DATABASE "{0}";'.format(self.db_new))
def start_transaction(self):
self.db_expects = self.get_tables_count()
db.transaction.enter_transaction_management()
db.transaction.managed(True)
def stop_transaction(self):
db.transaction.rollback()
db.transaction.leave_transaction_management()
db_last = self.get_tables_count()
if self.db_expects != db_last:
db_diff = diff(self.db_expects, db_last)
warnings.warn(
'Has difference of database after test:\n %s' % db_diff,
RuntimeWarning
)
def get_tables_count(self):
cursor = db.connection.cursor()
cursor.execute(
'SELECT relname,n_live_tup '
'FROM pg_stat_user_tables '
'ORDER BY relname'
)
return cursor.fetchall()
from django_nose import NoseTestSuiteRunner
class TestSuiteRunner(NoseTestSuiteRunner):
db_conf = settings.DATABASES['default']
db_name = db_conf['NAME']
def __init__(self, *args, **kwargs):
if not getattr(settings, 'TESTING', False):
raise SystemExit(
'Wrong settings, try again with --settings=settings.testing'
)
super(TestSuiteRunner, self).__init__(*args, **kwargs)
def setup_databases(self, *args, **kw):
cursor = db.connection.cursor()
cursor.execute("select datname from pg_database where datname LIKE '"+TEST_DB_PREFIX+"%'")
dbs = map(lambda a:a[0], cursor.fetchall())
for test_db in dbs:
psql('DROP DATABASE "{0}"'.format(test_db))
psql_create_db(TEST_DB_0, settings.DATABASES['default']['NAME'])
set_cur_db(TEST_DB_0)
def teardown_databases(self, *args, **kw):
self.db_conf['NAME'] = TEST_DB_0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment