Last active
          April 5, 2017 20:27 
        
      - 
      
- 
        Save tiberiuichim/d96b430ca80d753b4af28a5b4a10076a to your computer and use it in GitHub Desktop. 
    Simplistic Kinto Storage API to be used in event-based extensions
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | from kinto.authorization import RouteFactory | |
| from kinto.core.errors import raise_invalid | |
| from kinto.core.events import ACTIONS | |
| from kinto.core.utils import build_request | |
| from kinto.core.utils import view_lookup | |
| from kinto.views.records import Record | |
| from kinto.views.buckets import Bucket | |
| from kinto.views.collections import Collection | |
| from kinto.views.groups import Group | |
| # ACTIONS are: CREATE, READ, UPDATE, DELETE | |
| def create_collection(bucket_id, collection_id, request, data={}): | |
| uri = '/buckets/{}/collections/{}'.format(bucket_id, collection_id) | |
| col = resource_create_object(request, Collection, uri, data=data) | |
| return col | |
| def create_group(request, bucket_id, group_id, data={}): | |
| # v1/buckets/{bucket_id}/groups | |
| uri = '/buckets/{}/groups/{}'.format(bucket_id, group_id) | |
| group = resource_create_object(request, Group, uri, data={}, | |
| check_id=False) | |
| return group | |
| # TODO: create create_record? | |
| def get_groups(request, bucket_id, **kwargs): | |
| res = get_resource(request, Group) | |
| return res.model.get_records(**kwargs)[0] # returns ([...], count) | |
| def get_buckets(request, **kwargs): | |
| res = get_resource(request, Bucket) | |
| return res.model.get_records(**kwargs)[0] # returns ([...], count) | |
| def get_collections(request, bucket_id, **kwargs): | |
| res = get_resource(request, Collection, bucket_id=bucket_id) | |
| return res.model.get_records(**kwargs)[0] | |
| def get_records(request, bucket_id, collection_id, **kwargs): | |
| # see kinto.core.resource.model for details on arguments | |
| res = get_resource(request, Record, bucket_id=bucket_id, | |
| collection_id=collection_id) | |
| return res.model.get_records(**kwargs)[0] | |
| def get_bucket(request, bucket_id): | |
| """ Returns bucket data | |
| """ | |
| res = get_resource(request, Bucket, bucket_id=bucket_id) | |
| rec = res.model.get_record(bucket_id) | |
| return res.postprocess(rec) | |
| def get_collection(request, bucket_id, collection_id): | |
| """ Returns collection data | |
| """ | |
| res = get_resource(request, Collection, bucket_id=bucket_id, | |
| collection_id=collection_id) | |
| rec = res.model.get_record(collection_id) | |
| return res.postprocess(rec) | |
| def get_record(request, bucket_id, collection_id, record_id): | |
| """ Returns record data | |
| # unprocessed: | |
| # {'id': 'email', 'value': '[email protected]', 'last_modified': 1491221954971, | |
| # '__permissions__': {}} | |
| # postprocessed: | |
| # {'permissions': {}, 'data': {'id': 'email', 'last_modified': | |
| # 1491221954971, 'value': '[email protected]'}} | |
| """ | |
| res = get_resource(request, Record, bucket_id=bucket_id, | |
| collection_id=collection_id, record_id=record_id, | |
| ) | |
| rec = res.model.get_record(record_id) | |
| return res.postprocess(rec) | |
| def get_group(request, bucket_id, group_id): | |
| res = get_resource(request, Group, bucket_id=bucket_id, group_id=group_id) | |
| rec = res.model.get_record(group_id) | |
| return res.postprocess(rec) | |
| def resource_create_object(request, resource_cls, uri, data={}, check_id=True): | |
| """In the default bucket, the bucket and collection are implicitly | |
| created. This helper instantiate the resource and simulate a request | |
| with its RootFactory on the instantiated resource. | |
| :returns: the created object | |
| :rtype: dict | |
| NOTE: code based on implementation in kinto.plugins.default. It has been | |
| customized to use the build_fake_request from this module | |
| """ | |
| resource_name, matchdict = view_lookup(request, uri) | |
| req = build_fake_request(request, uri, matchdict, | |
| resource_name=resource_name) | |
| obj_id = matchdict['id'] | |
| # Fake context, required to instantiate a resource. | |
| context = RouteFactory(req) | |
| context.resource_name = resource_name | |
| resource = resource_cls(req, context=context) | |
| # Check that provided id is valid for this resource. | |
| if check_id: | |
| if not resource.model.id_generator.match(obj_id): | |
| error_details = { | |
| 'location': 'path', | |
| 'description': "Invalid {} id".format(resource_name) | |
| } | |
| raise_invalid(resource.request, **error_details) | |
| data = {'id': obj_id, **data} | |
| obj = resource.model.create_record(data, ignore_conflict=True) | |
| # Since the current request is not a resource (but a straight Service), | |
| # we simulate a request on a resource. | |
| # This will be used in the resource event payload. | |
| resource.postprocess(obj, action=ACTIONS.CREATE) | |
| return obj | |
| def get_resource(request, | |
| resource_cls, | |
| bucket_id=None, | |
| collection_id=None, | |
| record_id=None, | |
| group_id=None, | |
| method='GET'): | |
| """ Returns the resource based on the given parameters | |
| """ | |
| sets = [ | |
| ((bucket_id, group_id), | |
| '/buckets/{}/groups/{}'), | |
| ((bucket_id, resource_cls == Group), | |
| '/buckets/{}/groups'), | |
| ((bucket_id, collection_id, record_id), | |
| '/buckets/{}/collections/{}/records/{}'), | |
| ((bucket_id, collection_id, resource_cls == Record), | |
| '/buckets/{}/collections/{}/records'), | |
| ((bucket_id, collection_id), | |
| '/buckets/{}/collections/{}'), | |
| ((bucket_id, resource_cls == Collection), | |
| '/buckets/{}/collections'), | |
| ((bucket_id, ), | |
| '/buckets/{}'), | |
| ((resource_cls == Bucket,), | |
| '/buckets'), | |
| ] | |
| for ids, path in sets: | |
| if not all(ids): | |
| continue | |
| uri = path.format(*ids) | |
| resource_name, matchdict = view_lookup(request, uri) | |
| req = build_fake_request(request, uri, matchdict, method=method) | |
| context = RouteFactory(req) | |
| context.resource_name = resource_name | |
| resource = resource_cls(req, context=context) | |
| return resource | |
| raise ValueError("No resource found") | |
| def build_fake_request(request, uri, matchdict, resource_name='record', | |
| method='GET'): | |
| # Build a fake request, mainly used to populate the create events that | |
| # will be triggered by the resource. | |
| req = build_request(request, { | |
| 'method': method, | |
| 'path': uri, | |
| }) | |
| req.matchdict = matchdict | |
| req.bound_data = request.bound_data | |
| req.authn_type = request.authn_type | |
| req.selected_userid = request.selected_userid | |
| req.errors = request.errors | |
| req.current_resource_name = resource_name | |
| return req | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment