Created
June 29, 2012 12:54
-
-
Save oduvan/3017754 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django import db | |
from django.conf import settings | |
from nose.plugins import Plugin | |
from django.test import TransactionTestCase | |
import uuid | |
import warnings | |
TEST_DB_PREFIX = settings.DATABASES['default']['NAME'] + '__' | |
def set_cur_db(db_name): | |
settings.DATABASES['default']['NAME'] =\ | |
settings.DATABASES['default']['TEST_NAME'] =\ | |
db_name | |
set_cur_db(settings.DATABASES['default']['NAME']) | |
TEST_DB_0 = TEST_DB_PREFIX + '0' | |
def psql(sql): | |
db.close_connection() | |
cursor = db.connection.cursor() | |
db.connection.creation._prepare_for_test_db_ddl() | |
sql = sql.rstrip(';').split(';') | |
for item in sql: | |
cursor.execute(item) | |
print(item) | |
db.close_connection() | |
def psql_create_db(name, tmpl_name): | |
psql( | |
'DROP DATABASE IF EXISTS "{0}";' | |
'CREATE DATABASE "{0}" WITH ENCODING=\'UTF8\' TEMPLATE {1};' | |
.format(name, tmpl_name) | |
) | |
class MagicDbWrapper(Plugin): | |
'''Wrap each test with clear db, has two modes: | |
1. transaction mode (default mode); | |
2. unique db mode, enable: | |
class TestMain(TestCase): | |
_test_unique_db = True | |
''' | |
name = 'magic-db' | |
@property | |
def db_conf(self): | |
return TestSuiteRunner.db_conf | |
def get_db_name(self, pattern_db_name): | |
db_name = TEST_DB_PREFIX + pattern_db_name | |
cursor = db.connection.cursor() | |
cursor.execute('select count(*) from pg_database where datname=%s',[db_name]) | |
if cursor.fetchone()[0]: | |
return db_name | |
psql_create_db(db_name, pattern_db_name) | |
return db_name | |
def prepareTestCase(self, test): | |
test = test.test | |
# don't use django wraps of database | |
if isinstance(test, TransactionTestCase): | |
test._fixture_setup = lambda: None | |
test._fixture_teardown = lambda: None | |
self.db_src = self.db_conf['NAME'] | |
if hasattr(test, '_test_db'): | |
db_name = self.get_db_name(test._test_db) | |
else: | |
db_name = TEST_DB_0 | |
set_cur_db(db_name) | |
if hasattr(test, '_test_unique_db'): | |
self.start_unique_db() | |
else: | |
self.start_transaction() | |
def afterTest(self, test): | |
if hasattr(test.test, '_test_unique_db'): | |
self.stop_unique_db() | |
else: | |
self.stop_transaction() | |
set_cur_db(self.db_src) | |
db.close_connection() | |
def start_unique_db(self): | |
src = self.db_conf['TEST_NAME'] | |
self.db_new = new = '%s_%s' % (src, uuid.uuid4().hex) | |
psql_create_db(new, src) | |
set_cur_db(new) | |
def stop_unique_db(self): | |
set_cur_db(self.db_src) | |
psql('DROP DATABASE "{0}";'.format(self.db_new)) | |
def start_transaction(self): | |
self.db_expects = self.get_tables_count() | |
db.transaction.enter_transaction_management() | |
db.transaction.managed(True) | |
def stop_transaction(self): | |
db.transaction.rollback() | |
db.transaction.leave_transaction_management() | |
db_last = self.get_tables_count() | |
if self.db_expects != db_last: | |
db_diff = diff(self.db_expects, db_last) | |
warnings.warn( | |
'Has difference of database after test:\n %s' % db_diff, | |
RuntimeWarning | |
) | |
def get_tables_count(self): | |
cursor = db.connection.cursor() | |
cursor.execute( | |
'SELECT relname,n_live_tup ' | |
'FROM pg_stat_user_tables ' | |
'ORDER BY relname' | |
) | |
return cursor.fetchall() | |
from django_nose import NoseTestSuiteRunner | |
class TestSuiteRunner(NoseTestSuiteRunner): | |
db_conf = settings.DATABASES['default'] | |
db_name = db_conf['NAME'] | |
def __init__(self, *args, **kwargs): | |
if not getattr(settings, 'TESTING', False): | |
raise SystemExit( | |
'Wrong settings, try again with --settings=settings.testing' | |
) | |
super(TestSuiteRunner, self).__init__(*args, **kwargs) | |
def setup_databases(self, *args, **kw): | |
cursor = db.connection.cursor() | |
cursor.execute("select datname from pg_database where datname LIKE '"+TEST_DB_PREFIX+"%'") | |
dbs = map(lambda a:a[0], cursor.fetchall()) | |
for test_db in dbs: | |
psql('DROP DATABASE "{0}"'.format(test_db)) | |
psql_create_db(TEST_DB_0, settings.DATABASES['default']['NAME']) | |
set_cur_db(TEST_DB_0) | |
def teardown_databases(self, *args, **kw): | |
self.db_conf['NAME'] = TEST_DB_0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment