Skip to content

Instantly share code, notes, and snippets.

@ad-m
Last active September 26, 2015 13:29
Show Gist options
  • Select an option

  • Save ad-m/c8e190ea820797be5d6d to your computer and use it in GitHub Desktop.

Select an option

Save ad-m/c8e190ea820797be5d6d to your computer and use it in GitHub Desktop.
Pythonowy klient API MojePanstwo.pl
# pip install cached_property requests requests_cache
from __future__ import division
from math import ceil
import requests
import requests_cache
from cached_property import cached_property
requests_cache.configure('/tmp/aaa')
INDEX_URL = 'https://api-v3.mojepanstwo.pl/dane/zbiory'
DATASET_URL = 'https://api-v3.mojepanstwo.pl/dane/{dataset}'
OBJECT_URL = 'https://api-v3.mojepanstwo.pl/dane/{dataset}/{id}'
class DataObject(object):
def __init__(self, dataset, pk, layers=None, s=None):
self.dataset = dataset
self.pk = pk
self.layers = layers or set()
self.s = s or requests.Session()
def _build_querystring(self):
return {'layers[]': self.layers}
def _add_layer(self, name):
del self['data']
self.layers.add(name)
@cached_property
def data(self):
r = self.s.get(url=OBJECT_URL.format(dataset=self.dataset, id=self.pk),
params=self._build_querystring())
if r.status_code is not 200:
raise Exception(r.json().get('error_description', "Unknown error at %s" % (r.url)))
return r.json()
def __dir__(self):
return (self.data.keys() +
[x.replace('.', '__') for x in self.data['data']] +
self.data['layers'].keys())
def __getattr__(self, name):
if name in self.data:
return self.data[name]
if name.replace('__', '.') in self.data['data']:
return self.data['data'][name.replace('__', '.')]
if name in self.data['layers']:
if self.data['layers'] is None:
self.add_layer(name)
return self.data['layers']['name']
return super(DataObject, self).__getattr__(name)
def __repr__(self):
return "{name} ({dataset}:{pk})".format(name=self.__class__.__name__,
dataset=self.dataset,
pk=self.pk)
class ResultObject(object):
def __init__(self, row, layers=None, s=None):
self.row = row
self.layers = layers or set()
self.s = s or requests.Session()
def evaluate(self):
return DataObject(dataset=self.row['dataset'], pk=self.row['id'])
def __repr__(self):
return "{name} ({dataset}:{pk})".format(name=self.__class__.__name__,
dataset=self.row['dataset'],
pk=self.row['id'])
class Page(object):
def __init__(self, dataset, q='', layers=None, conditions=None, page=1, per_page=50, s=None):
self.dataset = dataset
self.q = q
self.conditions = conditions or {}
self.page = page
self.layers = set()
self.per_page = per_page
self.s = s or requests.Session()
def _build_conditions(self):
return {"conditions[%s]" % name: value for name, value in self.conditions.items()}
def _build_querystring(self):
querystring = self._build_conditions()
querystring['page'] = self.page
querystring['limit'] = self.per_page
if self.q:
querystring['q'] = self.q
return querystring
@cached_property
def data(self):
r = self.s.get(url=DATASET_URL.format(dataset=self.dataset),
params=self._build_querystring())
if r.status_code is not 200:
raise Exception(r.json().get('error_description', "Unknown error at %s" % (r.url)))
return r.json()
def __iter__(self):
for x in self.data['Dataobject']: # or use 'yield from'
yield ResultObject(row=x, layers=self.layers, s=self.s)
def __nonzero__(self):
return bool(self.data['Dataobject'])
class QuerySet(object):
conditions = dict()
q = None
order = None
_per_page = 50
layers = set()
def __init__(self, dataset, s=None):
self.dataset = dataset
self.s = s or requests.Session()
def query(self, q):
self.q = q
return self
def select_layers(self, *names):
for name in names:
self.layers.add(name)
else:
self.layers = {}
return self
def filter(self, **kwargs):
for name, value in kwargs.items():
self.conditions[name.replace('__', '.')] = value
return self
def per_page(self, num=50):
self._per_page = num
return self
def get_page(self, page=1, per_page=None):
return Page(dataset=self.dataset,
q=self.q,
conditions=self.conditions,
per_page=per_page or self._per_page,
page=page,
layers=self.layers,
s=self.s)
def __getitem__(self, key):
return list(self.get_page(key, 1))[0]
def __len__(self):
return ceil(self.get_page(1, 0)['Count'])
def __iter__(self):
r = self.get_page(1)
for x in r: # or use 'yield from'
yield x
for page in range(2, int(ceil(r.data['Count'] // self._per_page))):
for x in self.get_page(page=page): # or use 'yield from'
yield x
class Storage(object):
def __init__(self, s=None):
self.s = s or requests.Session()
@cached_property
def data(self):
return self.s.get(INDEX_URL, params={'limit': 200}).json()
def keys(self):
return [x['data']['zbiory.slug'] for x in self.data['Dataobject']]
def __getitem__(self, key):
if key in self.keys():
return QuerySet(dataset=key, s=self.s)
raise KeyError("Dataset {dataset} doesn't exists".format(dataset=key))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment