Last active
September 26, 2015 13:29
-
-
Save ad-m/c8e190ea820797be5d6d to your computer and use it in GitHub Desktop.
Pythonowy klient API MojePanstwo.pl
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # pip install cached_property requests requests_cache | |
| from __future__ import division | |
| from math import ceil | |
| import requests | |
| import requests_cache | |
| from cached_property import cached_property | |
| requests_cache.configure('/tmp/aaa') | |
| INDEX_URL = 'https://api-v3.mojepanstwo.pl/dane/zbiory' | |
| DATASET_URL = 'https://api-v3.mojepanstwo.pl/dane/{dataset}' | |
| OBJECT_URL = 'https://api-v3.mojepanstwo.pl/dane/{dataset}/{id}' | |
| class DataObject(object): | |
| def __init__(self, dataset, pk, layers=None, s=None): | |
| self.dataset = dataset | |
| self.pk = pk | |
| self.layers = layers or set() | |
| self.s = s or requests.Session() | |
| def _build_querystring(self): | |
| return {'layers[]': self.layers} | |
| def _add_layer(self, name): | |
| del self['data'] | |
| self.layers.add(name) | |
| @cached_property | |
| def data(self): | |
| r = self.s.get(url=OBJECT_URL.format(dataset=self.dataset, id=self.pk), | |
| params=self._build_querystring()) | |
| if r.status_code is not 200: | |
| raise Exception(r.json().get('error_description', "Unknown error at %s" % (r.url))) | |
| return r.json() | |
| def __dir__(self): | |
| return (self.data.keys() + | |
| [x.replace('.', '__') for x in self.data['data']] + | |
| self.data['layers'].keys()) | |
| def __getattr__(self, name): | |
| if name in self.data: | |
| return self.data[name] | |
| if name.replace('__', '.') in self.data['data']: | |
| return self.data['data'][name.replace('__', '.')] | |
| if name in self.data['layers']: | |
| if self.data['layers'] is None: | |
| self.add_layer(name) | |
| return self.data['layers']['name'] | |
| return super(DataObject, self).__getattr__(name) | |
| def __repr__(self): | |
| return "{name} ({dataset}:{pk})".format(name=self.__class__.__name__, | |
| dataset=self.dataset, | |
| pk=self.pk) | |
| class ResultObject(object): | |
| def __init__(self, row, layers=None, s=None): | |
| self.row = row | |
| self.layers = layers or set() | |
| self.s = s or requests.Session() | |
| def evaluate(self): | |
| return DataObject(dataset=self.row['dataset'], pk=self.row['id']) | |
| def __repr__(self): | |
| return "{name} ({dataset}:{pk})".format(name=self.__class__.__name__, | |
| dataset=self.row['dataset'], | |
| pk=self.row['id']) | |
| class Page(object): | |
| def __init__(self, dataset, q='', layers=None, conditions=None, page=1, per_page=50, s=None): | |
| self.dataset = dataset | |
| self.q = q | |
| self.conditions = conditions or {} | |
| self.page = page | |
| self.layers = set() | |
| self.per_page = per_page | |
| self.s = s or requests.Session() | |
| def _build_conditions(self): | |
| return {"conditions[%s]" % name: value for name, value in self.conditions.items()} | |
| def _build_querystring(self): | |
| querystring = self._build_conditions() | |
| querystring['page'] = self.page | |
| querystring['limit'] = self.per_page | |
| if self.q: | |
| querystring['q'] = self.q | |
| return querystring | |
| @cached_property | |
| def data(self): | |
| r = self.s.get(url=DATASET_URL.format(dataset=self.dataset), | |
| params=self._build_querystring()) | |
| if r.status_code is not 200: | |
| raise Exception(r.json().get('error_description', "Unknown error at %s" % (r.url))) | |
| return r.json() | |
| def __iter__(self): | |
| for x in self.data['Dataobject']: # or use 'yield from' | |
| yield ResultObject(row=x, layers=self.layers, s=self.s) | |
| def __nonzero__(self): | |
| return bool(self.data['Dataobject']) | |
| class QuerySet(object): | |
| conditions = dict() | |
| q = None | |
| order = None | |
| _per_page = 50 | |
| layers = set() | |
| def __init__(self, dataset, s=None): | |
| self.dataset = dataset | |
| self.s = s or requests.Session() | |
| def query(self, q): | |
| self.q = q | |
| return self | |
| def select_layers(self, *names): | |
| for name in names: | |
| self.layers.add(name) | |
| else: | |
| self.layers = {} | |
| return self | |
| def filter(self, **kwargs): | |
| for name, value in kwargs.items(): | |
| self.conditions[name.replace('__', '.')] = value | |
| return self | |
| def per_page(self, num=50): | |
| self._per_page = num | |
| return self | |
| def get_page(self, page=1, per_page=None): | |
| return Page(dataset=self.dataset, | |
| q=self.q, | |
| conditions=self.conditions, | |
| per_page=per_page or self._per_page, | |
| page=page, | |
| layers=self.layers, | |
| s=self.s) | |
| def __getitem__(self, key): | |
| return list(self.get_page(key, 1))[0] | |
| def __len__(self): | |
| return ceil(self.get_page(1, 0)['Count']) | |
| def __iter__(self): | |
| r = self.get_page(1) | |
| for x in r: # or use 'yield from' | |
| yield x | |
| for page in range(2, int(ceil(r.data['Count'] // self._per_page))): | |
| for x in self.get_page(page=page): # or use 'yield from' | |
| yield x | |
| class Storage(object): | |
| def __init__(self, s=None): | |
| self.s = s or requests.Session() | |
| @cached_property | |
| def data(self): | |
| return self.s.get(INDEX_URL, params={'limit': 200}).json() | |
| def keys(self): | |
| return [x['data']['zbiory.slug'] for x in self.data['Dataobject']] | |
| def __getitem__(self, key): | |
| if key in self.keys(): | |
| return QuerySet(dataset=key, s=self.s) | |
| raise KeyError("Dataset {dataset} doesn't exists".format(dataset=key)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment