Created
May 16, 2012 11:22
-
-
Save santiagobasulto/2709634 to your computer and use it in GitHub Desktop.
ModelResourceTastypie
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class ModelResource(Resource): | |
| """ | |
| A subclass of ``Resource`` designed to work with Django's ``Models``. | |
| This class will introspect a given ``Model`` and build a field list based | |
| on the fields found on the model (excluding relational fields). | |
| Given that it is aware of Django's ORM, it also handles the CRUD data | |
| operations of the resource. | |
| """ | |
| __metaclass__ = ModelDeclarativeMetaclass | |
| @classmethod | |
| def should_skip_field(cls, field): | |
| """ | |
| Given a Django model field, return if it should be included in the | |
| contributed ApiFields. | |
| """ | |
| # Ignore certain fields (related fields). | |
| if getattr(field, 'rel'): | |
| return True | |
| return False | |
| @classmethod | |
| def api_field_from_django_field(cls, f, default=fields.CharField): | |
| """ | |
| Returns the field type that would likely be associated with each | |
| Django type. | |
| """ | |
| result = default | |
| internal_type = f.get_internal_type() | |
| if internal_type in ('DateField', 'DateTimeField'): | |
| result = fields.DateTimeField | |
| elif internal_type in ('BooleanField', 'NullBooleanField'): | |
| result = fields.BooleanField | |
| elif internal_type in ('FloatField',): | |
| result = fields.FloatField | |
| elif internal_type in ('DecimalField',): | |
| result = fields.DecimalField | |
| elif internal_type in ('IntegerField', 'PositiveIntegerField', 'PositiveSmallIntegerField', 'SmallIntegerField', 'AutoField'): | |
| result = fields.IntegerField | |
| elif internal_type in ('FileField', 'ImageField'): | |
| result = fields.FileField | |
| elif internal_type == 'TimeField': | |
| result = fields.TimeField | |
| # TODO: Perhaps enable these via introspection. The reason they're not enabled | |
| # by default is the very different ``__init__`` they have over | |
| # the other fields. | |
| # elif internal_type == 'ForeignKey': | |
| # result = ForeignKey | |
| # elif internal_type == 'ManyToManyField': | |
| # result = ManyToManyField | |
| return result | |
| @classmethod | |
| def get_fields(cls, fields=None, excludes=None): | |
| """ | |
| Given any explicit fields to include and fields to exclude, add | |
| additional fields based on the associated model. | |
| """ | |
| final_fields = {} | |
| fields = fields or [] | |
| excludes = excludes or [] | |
| if not cls._meta.object_class: | |
| return final_fields | |
| for f in cls._meta.object_class._meta.fields: | |
| # If the field name is already present, skip | |
| if f.name in cls.base_fields: | |
| continue | |
| # If field is not present in explicit field listing, skip | |
| if fields and f.name not in fields: | |
| continue | |
| # If field is in exclude list, skip | |
| if excludes and f.name in excludes: | |
| continue | |
| if cls.should_skip_field(f): | |
| continue | |
| api_field_class = cls.api_field_from_django_field(f) | |
| kwargs = { | |
| 'attribute': f.name, | |
| 'help_text': f.help_text, | |
| } | |
| if f.null is True: | |
| kwargs['null'] = True | |
| kwargs['unique'] = f.unique | |
| if not f.null and f.blank is True: | |
| kwargs['default'] = '' | |
| kwargs['blank'] = True | |
| if f.get_internal_type() == 'TextField': | |
| kwargs['default'] = '' | |
| if f.has_default(): | |
| kwargs['default'] = f.default | |
| if getattr(f, 'auto_now', False): | |
| kwargs['default'] = f.auto_now | |
| if getattr(f, 'auto_now_add', False): | |
| kwargs['default'] = f.auto_now_add | |
| final_fields[f.name] = api_field_class(**kwargs) | |
| final_fields[f.name].instance_name = f.name | |
| return final_fields | |
| def check_filtering(self, field_name, filter_type='exact', filter_bits=None): | |
| """ | |
| Given a field name, a optional filter type and an optional list of | |
| additional relations, determine if a field can be filtered on. | |
| If a filter does not meet the needed conditions, it should raise an | |
| ``InvalidFilterError``. | |
| If the filter meets the conditions, a list of attribute names (not | |
| field names) will be returned. | |
| """ | |
| if filter_bits is None: | |
| filter_bits = [] | |
| if not field_name in self._meta.filtering: | |
| raise InvalidFilterError("The '%s' field does not allow filtering." % field_name) | |
| # Check to see if it's an allowed lookup type. | |
| if not self._meta.filtering[field_name] in (ALL, ALL_WITH_RELATIONS): | |
| # Must be an explicit whitelist. | |
| if not filter_type in self._meta.filtering[field_name]: | |
| raise InvalidFilterError("'%s' is not an allowed filter on the '%s' field." % (filter_type, field_name)) | |
| if self.fields[field_name].attribute is None: | |
| raise InvalidFilterError("The '%s' field has no 'attribute' for searching with." % field_name) | |
| # Check to see if it's a relational lookup and if that's allowed. | |
| if len(filter_bits): | |
| if not getattr(self.fields[field_name], 'is_related', False): | |
| raise InvalidFilterError("The '%s' field does not support relations." % field_name) | |
| if not self._meta.filtering[field_name] == ALL_WITH_RELATIONS: | |
| raise InvalidFilterError("Lookups are not allowed more than one level deep on the '%s' field." % field_name) | |
| # Recursively descend through the remaining lookups in the filter, | |
| # if any. We should ensure that all along the way, we're allowed | |
| # to filter on that field by the related resource. | |
| related_resource = self.fields[field_name].get_related_resource(None) | |
| return [self.fields[field_name].attribute] + related_resource.check_filtering(filter_bits[0], filter_type, filter_bits[1:]) | |
| return [self.fields[field_name].attribute] | |
| def filter_value_to_python(self, value, field_name, filters, filter_expr, | |
| filter_type): | |
| """ | |
| Turn the string ``value`` into a python object. | |
| """ | |
| # Simple values | |
| if value in ['true', 'True', True]: | |
| value = True | |
| elif value in ['false', 'False', False]: | |
| value = False | |
| elif value in ('nil', 'none', 'None', None): | |
| value = None | |
| # Split on ',' if not empty string and either an in or range filter. | |
| if filter_type in ('in', 'range') and len(value): | |
| if hasattr(filters, 'getlist'): | |
| value = [] | |
| for part in filters.getlist(filter_expr): | |
| value.extend(part.split(',')) | |
| else: | |
| value = value.split(',') | |
| return value | |
| def build_filters(self, filters=None): | |
| """ | |
| Given a dictionary of filters, create the necessary ORM-level filters. | |
| Keys should be resource fields, **NOT** model fields. | |
| Valid values are either a list of Django filter types (i.e. | |
| ``['startswith', 'exact', 'lte']``), the ``ALL`` constant or the | |
| ``ALL_WITH_RELATIONS`` constant. | |
| """ | |
| # At the declarative level: | |
| # filtering = { | |
| # 'resource_field_name': ['exact', 'startswith', 'endswith', 'contains'], | |
| # 'resource_field_name_2': ['exact', 'gt', 'gte', 'lt', 'lte', 'range'], | |
| # 'resource_field_name_3': ALL, | |
| # 'resource_field_name_4': ALL_WITH_RELATIONS, | |
| # ... | |
| # } | |
| # Accepts the filters as a dict. None by default, meaning no filters. | |
| if filters is None: | |
| filters = {} | |
| qs_filters = {} | |
| if hasattr(self._meta, 'queryset'): | |
| # Get the possible query terms from the current QuerySet. | |
| query_terms = self._meta.queryset.query.query_terms.keys() | |
| else: | |
| query_terms = QUERY_TERMS.keys() | |
| for filter_expr, value in filters.items(): | |
| filter_bits = filter_expr.split(LOOKUP_SEP) | |
| field_name = filter_bits.pop(0) | |
| filter_type = 'exact' | |
| if not field_name in self.fields: | |
| # It's not a field we know about. Move along citizen. | |
| continue | |
| if len(filter_bits) and filter_bits[-1] in query_terms: | |
| filter_type = filter_bits.pop() | |
| lookup_bits = self.check_filtering(field_name, filter_type, filter_bits) | |
| value = self.filter_value_to_python(value, field_name, filters, filter_expr, filter_type) | |
| db_field_name = LOOKUP_SEP.join(lookup_bits) | |
| qs_filter = "%s%s%s" % (db_field_name, LOOKUP_SEP, filter_type) | |
| qs_filters[qs_filter] = value | |
| return dict_strip_unicode_keys(qs_filters) | |
| def apply_sorting(self, obj_list, options=None): | |
| """ | |
| Given a dictionary of options, apply some ORM-level sorting to the | |
| provided ``QuerySet``. | |
| Looks for the ``order_by`` key and handles either ascending (just the | |
| field name) or descending (the field name with a ``-`` in front). | |
| The field name should be the resource field, **NOT** model field. | |
| """ | |
| if options is None: | |
| options = {} | |
| parameter_name = 'order_by' | |
| if not 'order_by' in options: | |
| if not 'sort_by' in options: | |
| # Nothing to alter the order. Return what we've got. | |
| return obj_list | |
| else: | |
| warnings.warn("'sort_by' is a deprecated parameter. Please use 'order_by' instead.") | |
| parameter_name = 'sort_by' | |
| order_by_args = [] | |
| if hasattr(options, 'getlist'): | |
| order_bits = options.getlist(parameter_name) | |
| else: | |
| order_bits = options.get(parameter_name) | |
| if not isinstance(order_bits, (list, tuple)): | |
| order_bits = [order_bits] | |
| for order_by in order_bits: | |
| order_by_bits = order_by.split(LOOKUP_SEP) | |
| field_name = order_by_bits[0] | |
| order = '' | |
| if order_by_bits[0].startswith('-'): | |
| field_name = order_by_bits[0][1:] | |
| order = '-' | |
| if not field_name in self.fields: | |
| # It's not a field we know about. Move along citizen. | |
| raise InvalidSortError("No matching '%s' field for ordering on." % field_name) | |
| if not field_name in self._meta.ordering: | |
| raise InvalidSortError("The '%s' field does not allow ordering." % field_name) | |
| if self.fields[field_name].attribute is None: | |
| raise InvalidSortError("The '%s' field has no 'attribute' for ordering with." % field_name) | |
| order_by_args.append("%s%s" % (order, LOOKUP_SEP.join([self.fields[field_name].attribute] + order_by_bits[1:]))) | |
| return obj_list.order_by(*order_by_args) | |
| def apply_filters(self, request, applicable_filters): | |
| """ | |
| An ORM-specific implementation of ``apply_filters``. | |
| The default simply applies the ``applicable_filters`` as ``**kwargs``, | |
| but should make it possible to do more advanced things. | |
| """ | |
| return self.get_object_list(request).filter(**applicable_filters) | |
| def get_object_list(self, request): | |
| """ | |
| An ORM-specific implementation of ``get_object_list``. | |
| Returns a queryset that may have been limited by other overrides. | |
| """ | |
| return self._meta.queryset._clone() | |
| def obj_get_list(self, request=None, **kwargs): | |
| """ | |
| A ORM-specific implementation of ``obj_get_list``. | |
| Takes an optional ``request`` object, whose ``GET`` dictionary can be | |
| used to narrow the query. | |
| """ | |
| filters = {} | |
| if hasattr(request, 'GET'): | |
| # Grab a mutable copy. | |
| filters = request.GET.copy() | |
| # Update with the provided kwargs. | |
| filters.update(kwargs) | |
| applicable_filters = self.build_filters(filters=filters) | |
| try: | |
| base_object_list = self.apply_filters(request, applicable_filters) | |
| return self.apply_authorization_limits(request, base_object_list) | |
| except ValueError: | |
| raise BadRequest("Invalid resource lookup data provided (mismatched type).") | |
| def obj_get(self, request=None, **kwargs): | |
| """ | |
| A ORM-specific implementation of ``obj_get``. | |
| Takes optional ``kwargs``, which are used to narrow the query to find | |
| the instance. | |
| """ | |
| try: | |
| base_object_list = self.get_object_list(request).filter(**kwargs) | |
| object_list = self.apply_authorization_limits(request, base_object_list) | |
| stringified_kwargs = ', '.join(["%s=%s" % (k, v) for k, v in kwargs.items()]) | |
| if len(object_list) <= 0: | |
| raise self._meta.object_class.DoesNotExist("Couldn't find an instance of '%s' which matched '%s'." % (self._meta.object_class.__name__, stringified_kwargs)) | |
| elif len(object_list) > 1: | |
| raise MultipleObjectsReturned("More than '%s' matched '%s'." % (self._meta.object_class.__name__, stringified_kwargs)) | |
| return object_list[0] | |
| except ValueError: | |
| raise NotFound("Invalid resource lookup data provided (mismatched type).") | |
| def obj_create(self, bundle, request=None, **kwargs): | |
| """ | |
| A ORM-specific implementation of ``obj_create``. | |
| """ | |
| bundle.obj = self._meta.object_class() | |
| for key, value in kwargs.items(): | |
| setattr(bundle.obj, key, value) | |
| bundle = self.full_hydrate(bundle) | |
| self.is_valid(bundle,request) | |
| if bundle.errors: | |
| self.error_response(bundle.errors, request) | |
| # Save FKs just in case. | |
| self.save_related(bundle) | |
| # Save parent | |
| bundle.obj.save() | |
| # Now pick up the M2M bits. | |
| m2m_bundle = self.hydrate_m2m(bundle) | |
| self.save_m2m(m2m_bundle) | |
| return bundle | |
| def obj_update(self, bundle, request=None, skip_errors=False, **kwargs): | |
| """ | |
| A ORM-specific implementation of ``obj_update``. | |
| """ | |
| if not bundle.obj or not bundle.obj.pk: | |
| # Attempt to hydrate data from kwargs before doing a lookup for the object. | |
| # This step is needed so certain values (like datetime) will pass model validation. | |
| try: | |
| bundle.obj = self.get_object_list(bundle.request).model() | |
| bundle.data.update(kwargs) | |
| bundle = self.full_hydrate(bundle) | |
| lookup_kwargs = kwargs.copy() | |
| for key in kwargs.keys(): | |
| if key == 'pk': | |
| continue | |
| elif getattr(bundle.obj, key, NOT_AVAILABLE) is not NOT_AVAILABLE: | |
| lookup_kwargs[key] = getattr(bundle.obj, key) | |
| else: | |
| del lookup_kwargs[key] | |
| except: | |
| # if there is trouble hydrating the data, fall back to just | |
| # using kwargs by itself (usually it only contains a "pk" key | |
| # and this will work fine. | |
| lookup_kwargs = kwargs | |
| try: | |
| bundle.obj = self.obj_get(bundle.request, **lookup_kwargs) | |
| except ObjectDoesNotExist: | |
| raise NotFound("A model instance matching the provided arguments could not be found.") | |
| bundle = self.full_hydrate(bundle) | |
| self.is_valid(bundle,request) | |
| if bundle.errors and not skip_errors: | |
| self.error_response(bundle.errors, request) | |
| # Save FKs just in case. | |
| self.save_related(bundle) | |
| # Save the main object. | |
| bundle.obj.save() | |
| # Now pick up the M2M bits. | |
| m2m_bundle = self.hydrate_m2m(bundle) | |
| self.save_m2m(m2m_bundle) | |
| return bundle | |
| def obj_delete_list(self, request=None, **kwargs): | |
| """ | |
| A ORM-specific implementation of ``obj_delete_list``. | |
| Takes optional ``kwargs``, which can be used to narrow the query. | |
| """ | |
| base_object_list = self.get_object_list(request).filter(**kwargs) | |
| authed_object_list = self.apply_authorization_limits(request, base_object_list) | |
| if hasattr(authed_object_list, 'delete'): | |
| # It's likely a ``QuerySet``. Call ``.delete()`` for efficiency. | |
| authed_object_list.delete() | |
| else: | |
| for authed_obj in authed_object_list: | |
| authed_obj.delete() | |
| def obj_delete(self, request=None, **kwargs): | |
| """ | |
| A ORM-specific implementation of ``obj_delete``. | |
| Takes optional ``kwargs``, which are used to narrow the query to find | |
| the instance. | |
| """ | |
| obj = kwargs.pop('_obj', None) | |
| if not hasattr(obj, 'delete'): | |
| try: | |
| obj = self.obj_get(request, **kwargs) | |
| except ObjectDoesNotExist: | |
| raise NotFound("A model instance matching the provided arguments could not be found.") | |
| obj.delete() | |
| def patch_list(self, request, **kwargs): | |
| """ | |
| An ORM-specific implementation of ``patch_list``. | |
| Necessary because PATCH should be atomic (all-success or all-fail) | |
| and the only way to do this neatly is at the database level. | |
| """ | |
| with transaction.commit_on_success(): | |
| return super(ModelResource, self).patch_list(request, **kwargs) | |
| def rollback(self, bundles): | |
| """ | |
| A ORM-specific implementation of ``rollback``. | |
| Given the list of bundles, delete all models pertaining to those | |
| bundles. | |
| """ | |
| for bundle in bundles: | |
| if bundle.obj and getattr(bundle.obj, 'pk', None): | |
| bundle.obj.delete() | |
| def save_related(self, bundle): | |
| """ | |
| Handles the saving of related non-M2M data. | |
| Calling assigning ``child.parent = parent`` & then calling | |
| ``Child.save`` isn't good enough to make sure the ``parent`` | |
| is saved. | |
| To get around this, we go through all our related fields & | |
| call ``save`` on them if they have related, non-M2M data. | |
| M2M data is handled by the ``ModelResource.save_m2m`` method. | |
| """ | |
| for field_name, field_object in self.fields.items(): | |
| if not getattr(field_object, 'is_related', False): | |
| continue | |
| if getattr(field_object, 'is_m2m', False): | |
| continue | |
| if not field_object.attribute: | |
| continue | |
| if field_object.blank and not bundle.data.has_key(field_name): | |
| continue | |
| # Get the object. | |
| try: | |
| related_obj = getattr(bundle.obj, field_object.attribute) | |
| except ObjectDoesNotExist: | |
| related_obj = None | |
| # Because sometimes it's ``None`` & that's OK. | |
| if related_obj: | |
| if field_object.related_name: | |
| if not bundle.obj.pk: | |
| bundle.obj.save() | |
| setattr(related_obj, field_object.related_name, bundle.obj) | |
| related_obj.save() | |
| setattr(bundle.obj, field_object.attribute, related_obj) | |
| def save_m2m(self, bundle): | |
| """ | |
| Handles the saving of related M2M data. | |
| Due to the way Django works, the M2M data must be handled after the | |
| main instance, which is why this isn't a part of the main ``save`` bits. | |
| Currently slightly inefficient in that it will clear out the whole | |
| relation and recreate the related data as needed. | |
| """ | |
| for field_name, field_object in self.fields.items(): | |
| if not getattr(field_object, 'is_m2m', False): | |
| continue | |
| if not field_object.attribute: | |
| continue | |
| if field_object.readonly: | |
| continue | |
| # Get the manager. | |
| related_mngr = getattr(bundle.obj, field_object.attribute) | |
| if hasattr(related_mngr, 'clear'): | |
| # Clear it out, just to be safe. | |
| related_mngr.clear() | |
| related_objs = [] | |
| for related_bundle in bundle.data[field_name]: | |
| related_bundle.obj.save() | |
| related_objs.append(related_bundle.obj) | |
| related_mngr.add(*related_objs) | |
| def get_resource_uri(self, bundle_or_obj): | |
| """ | |
| Handles generating a resource URI for a single resource. | |
| Uses the model's ``pk`` in order to create the URI. | |
| """ | |
| kwargs = { | |
| 'resource_name': self._meta.resource_name, | |
| } | |
| if isinstance(bundle_or_obj, Bundle): | |
| kwargs['pk'] = bundle_or_obj.obj.pk | |
| else: | |
| kwargs['pk'] = bundle_or_obj.id | |
| if self._meta.api_name is not None: | |
| kwargs['api_name'] = self._meta.api_name | |
| return self._build_reverse_url("api_dispatch_detail", kwargs=kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment