Last active
February 23, 2023 18:59
-
-
Save mindey/b20bc6623167555b9837c9bc8964f727 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Mindey | |
# Basic Baserow.io API Wrapper. | |
import os, requests, getpass | |
import urllib.parse | |
import tqdm | |
# Auth: | |
try: | |
from envars import API_BASE, API_TOKEN | |
except: | |
API_BASE, API_TOKEN = (None, None) | |
# Util: | |
def request(endpoint, data={}, action='get', params={}, apikey=None): | |
headers={} | |
if apikey: | |
headers["Authorization"] = f"Token {apikey}" | |
elif API_TOKEN: | |
headers["Authorization"] = f"Token {API_TOKEN}" | |
if action in ['get', 'delete']: | |
response = getattr(requests, action)(endpoint, headers=headers, params=params) | |
elif action in ['post', 'patch']: | |
response = getattr(requests, action)( | |
endpoint, headers=dict(headers, **{ | |
'Content-Type': 'application/json'}), json=data) | |
else: | |
response = None | |
return response | |
# Main: | |
def fields(table_id, apibase=None, apikey=None): | |
if apibase is None: | |
apibase = API_BASE | |
endpoint = f'{apibase}/api/database/fields/table/{table_id}/' | |
return request(endpoint, apikey=apikey) | |
def create(table_id, data, many=False, apibase=None, apikey=None): | |
if many: | |
data = {'items': data} | |
many = '/batch' | |
else: | |
many = '' | |
# params={'user_field_names': True} | |
if apibase is None: | |
apibase = API_BASE | |
endpoint = f'{apibase}/api/database/rows/table/{table_id}{many}/?user_field_names=true' | |
return request(endpoint, data=data, action='post', apikey=apikey) | |
def read(table_id, data=None, fields=None, filters=None, many=False, params={}, apibase=None, apikey=None): | |
""" | |
filters: [('field_ID', 'operator', 'value')], | |
e.g.: [('field_3035', 'equal', 'Hello')] | |
""" | |
params['user_field_names'] = True | |
if many: | |
extra = '' | |
else: | |
extra = f'/{data}' | |
if apibase is None: | |
apibase = API_BASE | |
endpoint = f'{apibase}/api/database/rows/table/{table_id}{extra}/' | |
if isinstance(fields, list): | |
params['include'] = '\"'+'\",\"'.join(fields)+'\"' | |
if isinstance(filters, list): | |
for f in filters: | |
if isinstance(f, list) or isinstance(f, tuple): | |
if len(f) == 3: | |
params[f'filter__{f[0]}__{f[1]}'] = f[2] | |
elif len(f) == 2: | |
params[f'filter__{f[0]}__{f[1]}'] = True | |
if many: | |
with tqdm.tqdm() as bar: | |
response = {'next': endpoint} | |
while response.get('next'): | |
response = request(response['next'], params=params, apikey=apikey).json() | |
for item in response.get('results', []): | |
bar.update(1); yield item | |
else: | |
for result in [request(endpoint, params=params, apikey=apikey)]: | |
yield result | |
def update(table_id, data, many=False, apibase=None, apikey=None): | |
if many: | |
extra = '/batch' | |
data = {'items': data} | |
else: | |
extra = f'/{data["id"]}' | |
#params={'user_field_names': True} | |
if apibase is None: | |
apibase = API_BASE | |
endpoint = f'{apibase}/api/database/rows/table/{table_id}{extra}/?user_field_names=true' | |
return request(endpoint, data=data, action='patch', apikey=apikey) | |
def delete(table_id, data, many=False, apibase=None, apikey=None): | |
if many: | |
extra = '/batch-delete' | |
data = {'items': data} | |
else: | |
if isinstance(data, dict): | |
data = data.get('id', '') | |
extra = f'/{data}' | |
if apibase is None: | |
apibase = API_BASE | |
endpoint = f'{apibase}/api/database/rows/table/{table_id}{extra}/' | |
if many: | |
response = request(endpoint, data=data, action='post', apikey=apikey) | |
else: | |
response = request(endpoint, action='delete', apikey=apikey) | |
return response | |
# Aliases | |
def insert_one(table_id, data, apibase=None, apikey=None): | |
return create(table_id, data, apibase=apibase, apikey=apikey) | |
def insert_many(table_id, data, apibase=None, apikey=None): | |
return create(table_id, data, many=True, apibase=apibase, apikey=apikey) | |
def insert_batch(table_id, data, apibase=None, apikey=None): | |
if isinstance(data, dict): | |
data = [data] | |
batch = []; size = 0; total = 0 | |
for i, item in tqdm.tqdm(enumerate(data)): | |
batch.append(item); size += 1 | |
if size == 200: # baserow limit | |
insert_many(table_id, batch, apibase=apibase, apikey=apikey) | |
batch = []; total += size; size = 0 | |
if batch: | |
insert_many(table_id, batch, apibase=apibase, apikey=apikey) | |
total += size | |
return {'created': total} | |
def get_one(table_id, row_id, fields=None, apibase=None, apikey=None): | |
return next(read(table_id, row_id, fields=fields, apibase=apibase, apikey=apikey)).json() | |
def get_all(table_id, row_id=None, filters=None, fields=None, many=True, apibase=None, apikey=None): | |
return read(table_id, fields=fields, many=True, filters=filters, apibase=apibase, apikey=apikey) | |
def update_one(table_id, data, apibase=None, apikey=None): | |
return update(table_id, data, apibase=apibase, apikey=apikey) | |
def update_many(table_id, data, apibase=None, apikey=None): | |
return update(table_id, data, many=True, apibase=apibase, apikey=apikey) | |
def update_batch(table_id, data, apibase=None, apikey=None): | |
if isinstance(data, dict): | |
data = [data] | |
batch = []; size = 0; total = 0 | |
for i, item in tqdm.tqdm(enumerate(data)): | |
batch.append(item); size += 1 | |
if size == 200: # baserow limit | |
update_many(table_id, batch, apibase=apibase, apikey=apikey) | |
batch = []; total += size; size = 0 | |
if batch: | |
update_many(table_id, batch, apibase=apibase, apikey=apikey) | |
total += size | |
return {'updated': total} | |
def delete_one(table_id, data, apibase=None, apikey=None): | |
return delete(table_id, data, apibase=apibase, apikey=apikey) | |
def delete_many(table_id, data, apibase=None, apikey=None): | |
return delete(table_id, data, many=True, apibase=apibase, apikey=apikey) | |
class Table: | |
def __init__(self, table_id, api='https://api.baserow.io', token=None): | |
self.api = {'apibase': api, 'apikey': token} | |
self.table_id = table_id | |
self._refresh_fields() | |
def _refresh_fields(self): | |
try: | |
self.fields = { | |
field['name']: f"field_{field['id']}" | |
for field in fields(self.table_id, **self.api).json() | |
} | |
except: | |
raise("Perhaps talbe doesn't exist.") | |
def insert(self, data): | |
if isinstance(data, dict): | |
return insert_one(self.table_id, data, **self.api) | |
return insert_many(self.table_id, data, **self.api) | |
def get(self, row_id, fields=None): | |
result = get_one(self.table_id, row_id, fields=fields, **self.api) | |
if fields: | |
return dict((k, result[k]) for k in ['id']+fields) | |
return result | |
def update(self, data): | |
if isinstance(data, dict): | |
return update_one(self.table_id, data, **self.api) | |
return update_batch(self.table_id, data, **self.api) | |
def get_all(self, filters=None, fields=None): | |
if isinstance(filters, list): | |
filters = [(self.fields.get(fi[0], fi[0]), *fi[1:]) for fi in filters | |
if (isinstance(fi, tuple) or isinstance(fi, list))] | |
iterator = get_all(self.table_id, fields=fields, filters=filters, **self.api) | |
return list(iterator) | |
def update_many(self, data): | |
return update_batch(self.table_id, data, **self.api) | |
def delete(self, data): | |
if isinstance(data, list): | |
return delete_many(self.table_id, data, **self.api) | |
else: | |
return delete_one(self.table_id, data, **self.api) | |
# Extra | |
def read_table(url): | |
try: | |
url_obj = urllib.parse.urlparse(url) | |
api_base = f'{url_obj.scheme}://{url_obj.netloc}' | |
except: | |
raise Exception("Could not parse the url provided.") | |
return | |
if '/public/grid/' in url_obj.path: | |
grid_id = url_obj.path.rsplit('/public/grid/', 1)[-1] | |
with tqdm.tqdm() as bar: | |
response = {'next': f'{api_base}/api/database/views/grid/{grid_id}/public/rows/'} | |
while response.get('next'): | |
response = requests.get(response['next']).json() | |
for item in response.get('results', []): | |
bar.update(1); yield item |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment