Skip to content

Instantly share code, notes, and snippets.

@mindey
Last active February 23, 2023 18:59
Show Gist options
  • Save mindey/b20bc6623167555b9837c9bc8964f727 to your computer and use it in GitHub Desktop.
Save mindey/b20bc6623167555b9837c9bc8964f727 to your computer and use it in GitHub Desktop.
# Author: Mindey
# Basic Baserow.io API Wrapper.
import os, requests, getpass
import urllib.parse
import tqdm
# Auth:
try:
from envars import API_BASE, API_TOKEN
except:
API_BASE, API_TOKEN = (None, None)
# Util:
def request(endpoint, data={}, action='get', params={}, apikey=None):
headers={}
if apikey:
headers["Authorization"] = f"Token {apikey}"
elif API_TOKEN:
headers["Authorization"] = f"Token {API_TOKEN}"
if action in ['get', 'delete']:
response = getattr(requests, action)(endpoint, headers=headers, params=params)
elif action in ['post', 'patch']:
response = getattr(requests, action)(
endpoint, headers=dict(headers, **{
'Content-Type': 'application/json'}), json=data)
else:
response = None
return response
# Main:
def fields(table_id, apibase=None, apikey=None):
if apibase is None:
apibase = API_BASE
endpoint = f'{apibase}/api/database/fields/table/{table_id}/'
return request(endpoint, apikey=apikey)
def create(table_id, data, many=False, apibase=None, apikey=None):
if many:
data = {'items': data}
many = '/batch'
else:
many = ''
# params={'user_field_names': True}
if apibase is None:
apibase = API_BASE
endpoint = f'{apibase}/api/database/rows/table/{table_id}{many}/?user_field_names=true'
return request(endpoint, data=data, action='post', apikey=apikey)
def read(table_id, data=None, fields=None, filters=None, many=False, params={}, apibase=None, apikey=None):
"""
filters: [('field_ID', 'operator', 'value')],
e.g.: [('field_3035', 'equal', 'Hello')]
"""
params['user_field_names'] = True
if many:
extra = ''
else:
extra = f'/{data}'
if apibase is None:
apibase = API_BASE
endpoint = f'{apibase}/api/database/rows/table/{table_id}{extra}/'
if isinstance(fields, list):
params['include'] = '\"'+'\",\"'.join(fields)+'\"'
if isinstance(filters, list):
for f in filters:
if isinstance(f, list) or isinstance(f, tuple):
if len(f) == 3:
params[f'filter__{f[0]}__{f[1]}'] = f[2]
elif len(f) == 2:
params[f'filter__{f[0]}__{f[1]}'] = True
if many:
with tqdm.tqdm() as bar:
response = {'next': endpoint}
while response.get('next'):
response = request(response['next'], params=params, apikey=apikey).json()
for item in response.get('results', []):
bar.update(1); yield item
else:
for result in [request(endpoint, params=params, apikey=apikey)]:
yield result
def update(table_id, data, many=False, apibase=None, apikey=None):
if many:
extra = '/batch'
data = {'items': data}
else:
extra = f'/{data["id"]}'
#params={'user_field_names': True}
if apibase is None:
apibase = API_BASE
endpoint = f'{apibase}/api/database/rows/table/{table_id}{extra}/?user_field_names=true'
return request(endpoint, data=data, action='patch', apikey=apikey)
def delete(table_id, data, many=False, apibase=None, apikey=None):
if many:
extra = '/batch-delete'
data = {'items': data}
else:
if isinstance(data, dict):
data = data.get('id', '')
extra = f'/{data}'
if apibase is None:
apibase = API_BASE
endpoint = f'{apibase}/api/database/rows/table/{table_id}{extra}/'
if many:
response = request(endpoint, data=data, action='post', apikey=apikey)
else:
response = request(endpoint, action='delete', apikey=apikey)
return response
# Aliases
def insert_one(table_id, data, apibase=None, apikey=None):
return create(table_id, data, apibase=apibase, apikey=apikey)
def insert_many(table_id, data, apibase=None, apikey=None):
return create(table_id, data, many=True, apibase=apibase, apikey=apikey)
def insert_batch(table_id, data, apibase=None, apikey=None):
if isinstance(data, dict):
data = [data]
batch = []; size = 0; total = 0
for i, item in tqdm.tqdm(enumerate(data)):
batch.append(item); size += 1
if size == 200: # baserow limit
insert_many(table_id, batch, apibase=apibase, apikey=apikey)
batch = []; total += size; size = 0
if batch:
insert_many(table_id, batch, apibase=apibase, apikey=apikey)
total += size
return {'created': total}
def get_one(table_id, row_id, fields=None, apibase=None, apikey=None):
return next(read(table_id, row_id, fields=fields, apibase=apibase, apikey=apikey)).json()
def get_all(table_id, row_id=None, filters=None, fields=None, many=True, apibase=None, apikey=None):
return read(table_id, fields=fields, many=True, filters=filters, apibase=apibase, apikey=apikey)
def update_one(table_id, data, apibase=None, apikey=None):
return update(table_id, data, apibase=apibase, apikey=apikey)
def update_many(table_id, data, apibase=None, apikey=None):
return update(table_id, data, many=True, apibase=apibase, apikey=apikey)
def update_batch(table_id, data, apibase=None, apikey=None):
if isinstance(data, dict):
data = [data]
batch = []; size = 0; total = 0
for i, item in tqdm.tqdm(enumerate(data)):
batch.append(item); size += 1
if size == 200: # baserow limit
update_many(table_id, batch, apibase=apibase, apikey=apikey)
batch = []; total += size; size = 0
if batch:
update_many(table_id, batch, apibase=apibase, apikey=apikey)
total += size
return {'updated': total}
def delete_one(table_id, data, apibase=None, apikey=None):
return delete(table_id, data, apibase=apibase, apikey=apikey)
def delete_many(table_id, data, apibase=None, apikey=None):
return delete(table_id, data, many=True, apibase=apibase, apikey=apikey)
class Table:
def __init__(self, table_id, api='https://api.baserow.io', token=None):
self.api = {'apibase': api, 'apikey': token}
self.table_id = table_id
self._refresh_fields()
def _refresh_fields(self):
try:
self.fields = {
field['name']: f"field_{field['id']}"
for field in fields(self.table_id, **self.api).json()
}
except:
raise("Perhaps talbe doesn't exist.")
def insert(self, data):
if isinstance(data, dict):
return insert_one(self.table_id, data, **self.api)
return insert_many(self.table_id, data, **self.api)
def get(self, row_id, fields=None):
result = get_one(self.table_id, row_id, fields=fields, **self.api)
if fields:
return dict((k, result[k]) for k in ['id']+fields)
return result
def update(self, data):
if isinstance(data, dict):
return update_one(self.table_id, data, **self.api)
return update_batch(self.table_id, data, **self.api)
def get_all(self, filters=None, fields=None):
if isinstance(filters, list):
filters = [(self.fields.get(fi[0], fi[0]), *fi[1:]) for fi in filters
if (isinstance(fi, tuple) or isinstance(fi, list))]
iterator = get_all(self.table_id, fields=fields, filters=filters, **self.api)
return list(iterator)
def update_many(self, data):
return update_batch(self.table_id, data, **self.api)
def delete(self, data):
if isinstance(data, list):
return delete_many(self.table_id, data, **self.api)
else:
return delete_one(self.table_id, data, **self.api)
# Extra
def read_table(url):
try:
url_obj = urllib.parse.urlparse(url)
api_base = f'{url_obj.scheme}://{url_obj.netloc}'
except:
raise Exception("Could not parse the url provided.")
return
if '/public/grid/' in url_obj.path:
grid_id = url_obj.path.rsplit('/public/grid/', 1)[-1]
with tqdm.tqdm() as bar:
response = {'next': f'{api_base}/api/database/views/grid/{grid_id}/public/rows/'}
while response.get('next'):
response = requests.get(response['next']).json()
for item in response.get('results', []):
bar.update(1); yield item
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment