Created
February 24, 2014 17:22
-
-
Save ykmm/9192663 to your computer and use it in GitHub Desktop.
Collection+json python library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
#Based on http://amundsen.com/media-types/collection/format/ | |
import json | |
from collections import OrderedDict as odict | |
#from ContentType import ContentType | |
import logging | |
COLLECTION = 'collection' | |
VERSION = 'version' | |
HREF = 'href' | |
LINKS = 'links' | |
ITEMS = 'items' | |
QUERIES = 'queries' | |
TEMPLATE = 'template' | |
ERROR = 'error' | |
TITLE = 'title' | |
CODE = 'code' | |
MESSAGE = 'message' | |
DATA = 'data' | |
NAME = 'name' | |
VALUE = 'value' | |
PROMPT = 'prompt' | |
REL = 'rel' | |
RENDER = 'render' | |
required = 'required' | |
optional = 'optional' | |
STRUCT_DATA = [odict([(NAME,required), | |
(VALUE,optional), | |
(PROMPT,optional)])] | |
STRUCT_LINKS = [odict([(HREF,required), | |
(REL,required), | |
(NAME,optional), | |
(RENDER,optional), | |
(PROMPT,optional)])] | |
STRUCT_ITEMS = [odict([(HREF, required), | |
(DATA, STRUCT_DATA), | |
(LINKS, STRUCT_LINKS)])] | |
STRUCT_QUERIES = [odict([(HREF,required), | |
(REL,required), | |
(NAME,optional), | |
(PROMPT,optional), | |
(DATA,STRUCT_DATA)])] | |
STRUCT_TEMPLATE = odict([(DATA, STRUCT_DATA)]) | |
STRUCT_ERROR = odict([(TITLE,optional), | |
(CODE,optional), | |
(MESSAGE,optional)]) | |
STRUCTURE = odict([(COLLECTION, odict([(VERSION, required), | |
(HREF, required), | |
(LINKS, STRUCT_LINKS), | |
(ITEMS, STRUCT_ITEMS), | |
(QUERIES, STRUCT_QUERIES), | |
(TEMPLATE, STRUCT_TEMPLATE), | |
(ERROR, STRUCT_ERROR), | |
]) | |
)]) | |
#Test supporting extension | |
""" | |
REQUIRED = 'required' | |
MULTIVAL = 'multival' | |
OPTIONS = 'options' | |
URLOPTIONS = 'urloptions' | |
STRUCT_OPTIONS_MM = [odict([(VALUE,optional), | |
(PROMPT,optional)])] | |
STRUCT_DATA_MM = [odict([(NAME,required), | |
(VALUE,optional), | |
(PROMPT,optional), | |
(REQUIRED, optional), | |
(MULTIVAL, optional), | |
(OPTIONS, STRUCT_OPTIONS_MM), | |
(URLOPTIONS, optional) | |
])] | |
STRUCT_QUERIES = [odict([(HREF,required), | |
(REL,required), | |
(NAME,optional), | |
(PROMPT,optional), | |
(DATA,STRUCT_DATA_MM)])] | |
STRUCT_TEMPLATE = odict([(DATA, STRUCT_DATA_MM)]) | |
""" | |
class CJError(Exception): | |
pass | |
class CollectionJsonV10(object): | |
content_type = 'application/vnd.collection+json' | |
_cj_version = '1.0' | |
def __init__(self, text_or_dict = None): | |
"""Representes a Collection+JSON object | |
Accepts as optional parameter an input string/dict used to populate | |
the instance | |
""" | |
self.collobj = odict() | |
self.collobj[COLLECTION] = odict() | |
self.collobj[COLLECTION][VERSION] = self._cj_version | |
if text_or_dict is not None: | |
if isinstance(text_or_dict, basestring): | |
try: | |
init_dict = json.loads(text_or_dict) | |
except: | |
raise CJError, "invalid json string" | |
elif isinstance(text_or_dict, dict): | |
init_dict = text_or_dict | |
else: | |
raise CJError, "text_or_dict should be a json string or a json dictionary" | |
self.collobj = self._validate(init_dict) | |
def __repr__(self): | |
return "CollectionJson(%r)" % self.collobj | |
def _rec_validate(self, elemento_padre, data, structure): | |
"""recursive validy check of "data" based on "structure" | |
returns an ordered and cleaned up data structure | |
""" | |
tmpdata = data[elemento_padre] | |
tmpstructure = structure[elemento_padre] | |
#If the data structure is a list we create a fake data/struct so | |
#we can still use this recursive method for validation | |
if isinstance(tmpstructure,list): | |
tmpclean = list() | |
for riga in tmpdata: | |
fake_data = odict((('from_list',riga),)) | |
fake_struct = odict((('from_list',tmpstructure[0]),)) | |
tmplistelem = self._rec_validate('from_list', | |
fake_data, | |
fake_struct) | |
tmpclean.append(tmplistelem) | |
else: | |
tmpclean = odict() | |
for elem in tmpstructure: | |
tipo_elem = tmpstructure[elem] | |
#The element can be "required", "optional" or a structure | |
if tipo_elem == required: | |
if elem not in tmpdata or tmpdata[elem] == '': | |
raise CJError, "%s absent or empty \nin data %s" % (elem, data) | |
else: | |
tmpclean[elem] = tmpdata[elem] | |
elif tipo_elem == optional: | |
if elem in tmpdata: | |
tmpclean[elem] = tmpdata[elem] | |
else: | |
if elem in tmpdata: | |
tmpclean[elem] = self._rec_validate(elem, | |
tmpdata, | |
tmpstructure) | |
return tmpclean | |
def _validate(self, dati): | |
if COLLECTION not in dati: | |
raise CJError, "collection object is absent" | |
retclean = odict() | |
retclean[COLLECTION] = self._rec_validate(COLLECTION, | |
dati, | |
STRUCTURE) | |
if retclean[COLLECTION][VERSION] != self._cj_version: | |
raise CJError, "invalid collection+json version" | |
return retclean | |
def represent(self, pretty=False): | |
if pretty: | |
indent = 4 | |
else: | |
indent = None | |
self._validate(self.collobj) | |
return json.dumps(self.collobj, indent=indent) | |
def represent_template(self, pretty=False): | |
if pretty: | |
indent = 4 | |
else: | |
indent = None | |
self._validate_subtree(self.collobj[COLLECTION][TEMPLATE],STRUCT_TEMPLATE) | |
return json.dumps({TEMPLATE: self.collobj[COLLECTION][TEMPLATE]}, indent=indent) | |
def _validate_subtree(self, data, structure): | |
"""Used to validate a specific subtree based on "structure" | |
"structure" can be any of the above defined STRUCT_* | |
""" | |
tmpdata = odict([('tmpvalid',data)]) | |
tmpstruct = odict([('tmpvalid',structure)]) | |
return self._rec_validate('tmpvalid', tmpdata, tmpstruct) | |
def add_item_to_list(self, stuff, list_name, struct_name): | |
"""Adds "stuff" element to "list_name" validating against the structure | |
named in "struct_name" | |
""" | |
safe_stuff = self._validate_subtree(stuff,struct_name[0]) | |
#If we need to add/modify something | |
if len(safe_stuff) > 0: | |
#If the items are not already present, we assign all of safe_stuff | |
if list_name not in self.collobj[COLLECTION]: | |
self.collobj[COLLECTION][list_name] = [safe_stuff] | |
else: | |
self.collobj[COLLECTION][list_name].append(safe_stuff) | |
#HREF | |
@property | |
def href(self): | |
if HREF in self.collobj[COLLECTION]: | |
return self.collobj[COLLECTION][HREF] | |
else: | |
return None | |
@href.setter | |
def href(self, value): | |
self.collobj[COLLECTION][HREF] = value | |
@href.deleter | |
def href(self): | |
del(self.collobj[COLLECTION][HREF]) | |
#HREF | |
@property | |
def version(self): | |
if VERSION in self.collobj[COLLECTION]: | |
return self.collobj[COLLECTION][VERSION] | |
else: | |
return None | |
@href.setter | |
def href(self, value): | |
self.collobj[COLLECTION][HREF] = value | |
#LINKS | |
def add_links(self, links): | |
for link in links: | |
self.add_item_to_list(link, LINKS, STRUCT_LINKS) | |
@property | |
def links(self): | |
if LINKS in self.collobj[COLLECTION]: | |
return self.collobj[COLLECTION][LINKS] | |
else: | |
return list() | |
@links.deleter | |
def links(self): | |
if LINKS in self.collobj[COLLECTION]: | |
del(self.collobj[COLLECTION][LINKS]) | |
#ITEMS | |
def add_items(self, items): | |
for item in items: | |
self.add_item_to_list(item, ITEMS, STRUCT_ITEMS) | |
@property | |
def items(self): | |
if ITEMS in self.collobj[COLLECTION]: | |
return self.collobj[COLLECTION][ITEMS] | |
else: | |
return list() | |
@items.deleter | |
def items(self): | |
if ITEMS in self.collobj[COLLECTION]: | |
del(self.collobj[COLLECTION][ITEMS]) | |
@property | |
def items_grouped(self): | |
"""Returns a dict of items grouped by data elements""" | |
if ITEMS in self.collobj[COLLECTION]: | |
sets = dict() | |
for item in self.collobj[COLLECTION][ITEMS]: | |
itemkeys = [item_elem['name'] for item_elem in item['data']] | |
itemkeys = tuple(itemkeys) | |
if itemkeys not in sets: | |
sets[itemkeys] = [item] | |
else: | |
sets[itemkeys].append(item) | |
return sets | |
else: | |
return dict() | |
#QUERIES | |
def add_queries(self, queries): | |
for query in queries: | |
self.add_item_to_list(query, QUERIES, STRUCT_QUERIES) | |
@property | |
def queries(self): | |
if QUERIES in self.collobj[COLLECTION]: | |
return self.collobj[COLLECTION][QUERIES] | |
else: | |
return list() | |
@queries.deleter | |
def queries(self): | |
if QUERIES in self.collobj[COLLECTION]: | |
del(self.collobj[COLLECTION][QUERIES]) | |
#TEMPLATE | |
@property | |
def template(self): | |
if TEMPLATE in self.collobj[COLLECTION]: | |
return self.collobj[COLLECTION][TEMPLATE] | |
else: | |
return None | |
@template.setter | |
def template(self, stuff): | |
safe_stuff = self._validate_subtree(stuff, STRUCT_TEMPLATE) | |
self.collobj[COLLECTION][TEMPLATE] = safe_stuff | |
@template.deleter | |
def template(self): | |
if TEMPLATE in self.collobj[COLLECTION]: | |
del(self.collobj[COLLECTION][TEMPLATE]) | |
#ERROR | |
@property | |
def error(self): | |
if ERROR in self.collobj[COLLECTION]: | |
return self.collobj[COLLECTION][ERROR] | |
else: | |
return None | |
@error.setter | |
def error(self, stuff): | |
safe_stuff = self._validate_subtree(stuff, STRUCT_ERROR) | |
self.collobj[COLLECTION][ERROR] = safe_stuff | |
@error.deleter | |
def error(self): | |
if ERROR in self.collobj[COLLECTION]: | |
del(self.collobj[COLLECTION][ERROR]) | |
if __name__ == '__main__': | |
jc = CollectionJsonV10(None) | |
print jc | |
jc.template = {'data':[{'name':'nome','value':'valore','prompt':'prompoooot'}, {'name':'nome2','value':'valore2','prompt':'prompo2ooot'}]} | |
jc.href = 'http://nohref/' | |
print jc | |
print jc.represent_template(True) | |
print jc.represent(True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment