Created
November 3, 2011 20:18
-
-
Save jacobian/1337664 to your computer and use it in GitHub Desktop.
Creating models on-the-fly
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
NB: this is works-for-me quality code, completely not suitable for production. | |
Please use it as inspiration, but please test it better than I have before | |
you use it in your own projects! | |
""" | |
from django import db | |
from django.conf import settings | |
from django.db import models | |
class DB(object): | |
def __init__(self, alias, path): | |
self.alias = alias | |
# Create a new entry in settings.DATABASES for this new db. We don't | |
# need to change django.db.connections.databases because that points | |
# to the same dict as settings.DATABASES. Hhowever, we might need to | |
# clear out the connection cache -- it might have something left over | |
# from a previous connection to this same database. | |
settings.DATABASES[self.alias] = { | |
'ENGINE': 'django.db.backends.sqlite3', | |
'NAME': path, | |
} | |
if self.alias in db.connections._connections: | |
del db.connections._connections[self.alias] | |
# Cache of models loaded from this DB. | |
self._model_cache = None | |
def model_names(self): | |
""" | |
Get a list of model/table names in this database. | |
""" | |
return map(str, db.connections[self.alias].introspection.table_names()) | |
def models(self): | |
""" | |
Get a list of all models (i.e. tables) in this database. | |
""" | |
if self._model_cache is None: | |
self._fill_cache() | |
return self._model_cache.values() | |
def model(self, name): | |
""" | |
Get an individual model by table name. | |
""" | |
if self._model_cache is None: | |
self._fill_cache() | |
return self._model_cache[name] | |
def _fill_cache(self): | |
self._model_cache = {} | |
for name in self.model_names(): | |
self._model_cache[name] = self.model_for_table(name) | |
def model_for_table(self, table_name, name=None, **namespace): | |
introspection = db.connections[self.alias].introspection | |
cursor = db.connections[self.alias].cursor() | |
namespace.update({ | |
'__module__': __name__, | |
'Meta': type("Meta", (object,), {'db_table': table_name}), | |
'_adhocdb_alias': self.alias, | |
}) | |
for row in introspection.get_table_description(cursor, table_name): | |
rname, rtype, _, _, _, _, nullok = row | |
try: | |
fieldclass = getattr(models, introspection.DATA_TYPES_REVERSE[rtype]) | |
except (KeyError, AttributeError): | |
fieldclass = models.TextField | |
namespace[rname] = fieldclass(null=nullok) | |
return type(name if name else table_name, (models.Model,), namespace) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AdHocRouter(object): | |
def db_for_read(self, model, **hints): | |
return getattr(model, '_adhocdb_alias', None) | |
db_for_write = db_for_read |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
*g*