Created
March 6, 2014 14:51
-
-
Save shri/9391400 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#begin serializer https://github.com/Strangemother/django-djasoner | |
from io import StringIO | |
try: | |
from django.db.models import Model | |
from django.db.models.query import QuerySet | |
from django.utils.encoding import smart_unicode | |
from django.utils.simplejson import dumps | |
from django.utils import simplejson | |
from datetime import datetime, date | |
from django.http import HttpResponse | |
except ImportError: | |
django_lib = False | |
else: | |
django_lib = True | |
# no django | |
class Error(Exception): | |
''' Base class for exceptions in this module''' | |
pass | |
class NoDjangoError(Error): | |
'''Exception raised for missing django modules''' | |
def __init__(self, value): | |
self.value = value | |
""" | |
from serializers import to_json(data, **options) | |
""" | |
def model_json(queryset, fields_tuple=None): | |
if django_lib: | |
from django.core import serializers as srl | |
return srl.serialize('json', queryset, fields=fields_tuple) | |
else: | |
raise NoDjangoError('''Cannot use django.core.serializers as Django is | |
not installed.''') | |
def model_json_response(queryset, fields_tuple=None): | |
data = model_json(queryset, fields_tuple) | |
return HttpResponse( | |
data, | |
content_type = 'application/javascript; charset=utf8' | |
) | |
def json_response(something): | |
'''return Simple json dumps() wrapped by a django HttpResponse''' | |
o = [] | |
o.append(something) | |
return HttpResponse( | |
simplejson.dumps(o), | |
content_type = 'application/javascript; charset=utf8' | |
) | |
def json_serialize(object, nested=False): | |
'''serialize a python object''' | |
js = JSONSerializer() | |
j = js.serialize(object, nested=nested) | |
return j | |
def json_serialize_response(object, *args, **kwargs): | |
''' Eveoke the serializer and return a django HttpResponse''' | |
r = json_serialize(object, *args, **kwargs) | |
return HttpResponse(r, mimetype='application/json') | |
def to_json(data, **options): | |
''' Convery the data to JSON | |
return is JSON''' | |
js = JSONSerializer() | |
j = js.serialize(object, options) | |
return j | |
class UnableToSerializeError(Exception): | |
""" Error for not implemented classes """ | |
def __init__(self, value): | |
self.value = value | |
Exception.__init__(self) | |
def __str__(self): | |
return repr(self.value) | |
''' | |
If there are fields requiring serialization, | |
pass a list of SerializeEntities to | |
''' | |
class SerializeEntity(): | |
def __init__(self, search, replacement): | |
self.needle = search | |
self.replacement = replacement | |
def __unicode__(self): | |
return u'%s' % self.needle | |
''' | |
>>> from json import loads | |
>>> from pprint import pprint | |
>>> from core.serializers import json_serialize | |
>>> x = { 'bananas': 'blue' } | |
>>> s= json_serialize(x) | |
>>> cs = loads(s) | |
>>> pprint(cs) | |
''' | |
class JSONSerializer(): | |
boolean_fields = ['BooleanField', 'NullBooleanField'] | |
datetime_fields = ['DatetimeField', 'DateField', 'TimeField'] | |
number_fields = ['IntegerField', 'AutoField', 'DecimalField', 'FloatField', 'PositiveSmallIntegerField'] | |
def serialize(self, obj, *args, **options): | |
self.options = options | |
self.stream = options.pop("stream", StringIO()) | |
self.selectedFields = options.pop("fields", options.get('fields', None)) | |
self.ignoredFields = options.pop("ignored", options.get('ignored', None)) | |
self.use_natural_keys = options.pop("use_natural_keys", options.get("use_natural_keys", False)) | |
self.transcript = options.pop("transcript", options.get("transcript", {})) | |
self.nested = options.pop('nested', False) | |
self.currentLoc = '' | |
# Placeholder for the later used parser | |
self.html_parser = None | |
# Escape all string information to be written to the output | |
# stream though HTML parser, making it safe to transport. | |
# Remove this feature for speed - warn; your JSON may be invalid if False | |
self.parse_html = options.pop('parse_html', False) | |
# Stack recursive list. | |
self.recurse_list = [] | |
self.level = 0 | |
self.start_serialization() | |
self.handle_object(obj) | |
self.end_serialization() | |
return self.getvalue() | |
def get_string_value(self, obj, field): | |
"""Convert a field's value to a string.""" | |
return smart_unicode(field.value_to_string(obj)) | |
def start_serialization(self): | |
"""Called when serializing of the queryset starts.""" | |
pass | |
def end_serialization(self): | |
"""Called when serializing of the queryset ends.""" | |
pass | |
def start_array(self): | |
"""Called when serializing of an array starts.""" | |
self.stream.write(u'[') | |
def end_array(self): | |
"""Called when serializing of an array ends.""" | |
self.stream.write(u']') | |
def start_object(self): | |
"""Called when serializing of an object starts.""" | |
self.stream.write(u'{') | |
def end_object(self): | |
"""Called when serializing of an object ends.""" | |
self.stream.write(u'}') | |
def handle_object(self, object): | |
""" Called to handle everything, looks for the correct handling """ | |
if isinstance(object, dict): | |
self.handle_dictionary(object) | |
elif isinstance(object, list): | |
self.handle_list(object) | |
elif isinstance(object, Model): | |
self.handle_model(object) | |
elif isinstance(object, QuerySet): | |
self.handle_queryset(object) | |
elif isinstance(object, bool): | |
self.handle_simple(object) | |
elif isinstance(object, int) or isinstance(object, float) or isinstance(object, long): | |
self.handle_simple(object) | |
elif isinstance(object, unicode): | |
self.handle_simple(object) | |
elif isinstance(object, basestring): | |
self.handle_simple(object) | |
elif isinstance(object, tuple): | |
self.handle_simple(object) | |
elif isinstance(object, datetime): | |
self.handle_datetime(object) | |
elif isinstance(object, date): | |
self.handle_date(object) | |
elif object == None: | |
self.handle_none(object) | |
else: | |
# If this a seralize entity | |
try: | |
self.handle_string(object) | |
except: | |
raise UnableToSerializeError(type(object)) | |
def handle_dictionary(self, d): | |
"""Called to handle a Dictionary""" | |
i = 0 | |
self.start_object() | |
for key, value in d.iteritems(): | |
self.currentLoc += key+'.' | |
#self.stream.write(unicode(self.currentLoc)) | |
i += 1 | |
self.handle_simple(key) | |
self.stream.write(u': ') | |
# recursive care. | |
# if id(value) not in self.recurse_list: | |
self.recurse_list.append(id(value)) | |
self.handle_object(value) | |
# else: | |
# self.stream.write(u'-1') | |
if i != len(d): | |
self.stream.write(u', ') | |
self.currentLoc = self.currentLoc[0:(len(self.currentLoc)-len(key)-1)] | |
self.end_object() | |
def handle_list(self, l): | |
"""Called to handle a list""" | |
self.start_array() | |
for value in l: | |
self.handle_object(value) | |
if l.index(value) != len(l) -1: | |
self.stream.write(u', ') | |
self.end_array() | |
def handle_datetime(self, d): | |
self.start_object() | |
self.stream.write(u'"date" : "%s", ' % str(d)) | |
self.stream.write(u'"day" : "%s", ' % d.day ) | |
self.stream.write(u'"hour" : "%s", ' % d.hour) | |
self.stream.write(u'"microsecond" : "%s", ' % d.microsecond ) | |
self.stream.write(u'"minute" : "%s", ' % d.minute ) | |
self.stream.write(u'"month" : "%s", ' % d.month) | |
self.stream.write(u'"second" : "%s", ' % d.second) | |
self.stream.write(u'"weekday" : "%s", ' % d.weekday()) | |
self.stream.write(u'"year" : "%s"' % d.year ) | |
self.end_object() | |
def handle_date(self, d): | |
self.start_object() | |
self.stream.write(u'"day" : %s, ' % d.day ) | |
self.stream.write(u'"month" : %s, ' % d.month) | |
self.stream.write(u'"year" : %s,' % d.year ) | |
self.stream.write(u'"weekday" : %s, ' % d.weekday()) | |
self.stream.write(u'"isoformat" : "%s", ' % d.isoformat()) | |
self.stream.write(u'"ctime" : "%s"' % d.ctime()) | |
self.end_object() | |
def handle_model(self, mod): | |
"""Called to handle a django Model""" | |
try: | |
_meta = mod._meta | |
except AttributeError as e: | |
_meta = None | |
if _meta is None: | |
# Try a standard object | |
self.handle_simple(mod) | |
else: | |
self.start_object() | |
for field in mod._meta.local_fields: | |
if field.rel is None: | |
if self.selectedFields is None or field.attname in self.selectedFields or field.attname: | |
if self.ignoredFields is None or self.currentLoc + field.attname not in self.ignoredFields: | |
self.handle_field(mod, field) | |
else: | |
if self.selectedFields is None or field.attname[:-3] in self.selectedFields: | |
if self.ignoredFields is None or self.currentLoc + field.attname[:-3] not in self.ignoredFields: | |
self.handle_fk_field(mod, field) | |
for field in mod._meta.many_to_many: | |
if self.selectedFields is None or field.attname in self.selectedFields: | |
if self.ignoredFields is None or self.currentLoc + field.attname not in self.ignoredFields: | |
self.handle_m2m_field(mod, field) | |
self.stream.seek(self.stream.tell()-2) | |
self.end_object() | |
def handle_queryset(self, queryset): | |
"""Called to handle a django queryset""" | |
self.start_array() | |
it = 0 | |
for mod in queryset: | |
it += 1 | |
self.handle_model(mod) | |
if queryset.count() != it: | |
self.stream.write(u', ') | |
self.end_array() | |
def handle_field(self, mod, field): | |
"""Called to handle each individual (non-relational) field on an object.""" | |
self.handle_simple(field.name) | |
if field.get_internal_type() in self.boolean_fields: | |
if field.value_to_string(mod) == 'True': | |
self.stream.write(u': true') | |
elif field.value_to_string(mod) == 'False': | |
self.stream.write(u': false') | |
else: | |
self.stream.write(u': undefined') | |
else: | |
self.stream.write(u': ') | |
self.handle_simple(field.value_to_string(mod)) | |
self.stream.write(u', ') | |
def handle_fk_field(self, mod, field): | |
"""Called to handle a ForeignKey field.""" | |
related = getattr(mod, field.name) | |
if related is not None: | |
if field.rel.field_name == related._meta.pk.name: | |
# Related to remote object via primary key | |
pk = related._get_pk_val() | |
else: | |
# Related to remote object via other field | |
pk = getattr(related, field.rel.field_name) | |
d = { | |
'pk': pk, | |
} | |
if self.use_natural_keys and hasattr(related, 'natural_key'): | |
d.update({'natural_key': related.natural_key()}) | |
if type(d['pk']) == str and d['pk'].isdigit(): | |
d.update({'pk': int(d['pk'])}) | |
self.handle_simple(field.name) | |
self.stream.write(u': ') | |
if self.nested: | |
self.handle_model(related) | |
else: | |
self.handle_object(d) | |
self.stream.write(u', ') | |
def handle_m2m_field(self, mod, field): | |
"""Called to handle a ManyToManyField.""" | |
if field.rel.through._meta.auto_created: | |
self.handle_simple(field.name) | |
self.stream.write(u': ') | |
self.start_array() | |
hasRelationships = False | |
#import pdb;pdb.set_trace() | |
for relobj in getattr(mod, field.name).iterator(): | |
hasRelationships = True | |
pk = relobj._get_pk_val() | |
d = { | |
'pk': pk, | |
} | |
if self.use_natural_keys and hasattr(relobj, 'natural_key'): | |
d.update({'natural_key': relobj.natural_key()}) | |
if type(d['pk']) == str and d['pk'].isdigit(): | |
d.update({'pk': int(d['pk'])}) | |
if self.nested: | |
self.handle_model(relobj) | |
else: | |
self.handle_simple(d) | |
self.stream.write(u', ') | |
if hasRelationships: | |
self.stream.seek(self.stream.tell()-2) | |
self.end_array() | |
self.stream.write(u', ') | |
def safe_string(self, strin): | |
''' Returns a transport ready string. ''' | |
esc = strin | |
# UTF-8 save convert | |
if type(strin) != unicode: | |
esc = unicode('%s' % strin, "utf-8", errors="replace") | |
esc = esc.replace('\n', '') | |
# if to parse as HTML | |
if self.parse_html: | |
esc = cgiesc(esc).encode('ascii', 'xmlcharrefreplace') | |
return unicode(esc) | |
def handle_simple(self, simple): | |
""" Called to handle values that can be handled via simplejson """ | |
esc = self.safe_string( dumps(simple) ) | |
self.stream.write(esc) | |
def handle_string(self, simple): | |
esc = self.safe_string(simple) | |
self.stream.write( u'"%s"' % esc) | |
def handle_none(self, object): | |
o = '' | |
self.stream.write(u'"%s"' % o)#unicode(dumps(o))) | |
def getvalue(self): | |
"""Return the fully serialized object (or None if the output stream is not seekable).sss """ | |
if callable(getattr(self.stream, 'getvalue', None)): | |
return self.stream.getvalue() | |
#end serializer | |
def serialize(data): | |
jsonSerializer = JSONSerializer() | |
return jsonSerializer.serialize(data, use_natural_keys=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment