Last active
July 22, 2024 01:21
-
-
Save dvdotsenko/3426c56fbb7c1ac3d79fe01d2ab40f70 to your computer and use it in GitHub Desktop.
Python list-of-lists interface wrapper for Google Sheet API RPC. `spreadsheet['sheet name']['A1':'B2'] = [[1, 2], [3, 2]]`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2021-2024 Daniel Dotsenko <[email protected]> | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and | |
# associated documentation files (the "Software"), to deal in the Software without restriction, | |
# including without limitation the rights to use, copy, modify, merge, publish, distribute, | |
# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all copies or | |
# substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, | |
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR | |
# PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE | |
# FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, | |
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
# | |
# (MIT license) | |
""" | |
Google Sheets Python API repackaged as pythonic collections operations that abstract away API calls. | |
Requires modules: | |
google-api-python-client | |
google-auth-httplib2 | |
google-auth-oauthlib | |
Example: | |
from google_sheets import CredentialsManager, Spreadsheet | |
# Create "Desktop" type credentials in Google Cloud console and download the credentials document | |
app_creds = { | |
"installed": { | |
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"client_id": "155....com", | |
"client_secret": "G...H", | |
"project_id": "##...##", | |
"redirect_uris": ["urn:ietf:wg:oauth:2.0:oob","http://localhost"], | |
"token_uri": "https://oauth2.googleapis.com/token" | |
} | |
} | |
client_creds = None # none if you are starting | |
spreadsheet_id = "18e1ANo...J9h2M" | |
cm = CredentialsManager( | |
app_creds, | |
client_creds, | |
scopes = Spreadsheet.SCOPES | |
) | |
# you can force refresh of the creds and, thus, front-load the hand-shake | |
# if no user creds data is available, this will start the OAuth handshake process | |
cm.refresh_credentials() | |
# observe user_credentials_data now available. | |
# You can save it for reuse later to avoid having to do OAuth handshake again | |
cm.user_credentials_data | |
spreadsheet = Spreadsheet( | |
spreadsheet_id, | |
cm.user_credentials | |
) | |
["Sheet 1", "Sheet 2"] = spreadsheet.keys() | |
sheet = spreadsheet['Sheet 1'] | |
sheet['A1':'B2'] = [ | |
['a1', NO_CHANGE], | |
['a2', 'b2'] | |
] | |
[['a1', None], ['a2', 'b2']] = sheet['A1':'B2'] | |
del sheet['A2':'B2'] | |
[['a1']] = sheet['A1':'B2'] | |
# single cell value request returns only cell value, not an array-of-arrays structure | |
'a1' = sheet['A1'] | |
del sheet['A1'] | |
# Google collapses sparse arrays when sheet does not have values in cells | |
# To make it easier to have "random access" experience with returned arrays of various lengths | |
# use Grid list-like wrapper for returned range of cells: | |
g = Grid([ | |
['a1'], | |
['a2', 'b2'], | |
[], # no values here | |
['a4', 'b4', 'c4'], | |
]) | |
None = g[2:4] # note zero-indexed address | |
g[2:2] = 'off to side' | |
g | |
> [ | |
['a1'], | |
['a2', 'b2'], | |
[ None, None, 'off to side'], | |
['a4', 'b4', 'c4'], | |
] | |
# value of None sent to Google Sheet means "don't change this cell's value. | |
# if you need to override the value in the sheet, use empty string `''` instead of None. | |
""" | |
import json | |
import os.path | |
from collections.abc import Iterable, Iterator | |
from functools import cached_property, cache | |
from typing import TypedDict, List, Dict, Optional, Any, Union | |
import google.oauth2.credentials | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from google.oauth2 import service_account | |
MAJOR_DIMENSION_ROWS = 'ROWS' | |
MAJOR_DIMENSION_COLUMNS = 'COLUMNS' | |
# Providing None as value for a cell during "update" call | |
# effectively means "don't change this cell" | |
NO_CHANGE = None | |
CellValue = Union[str,int,bool,None] | |
class ApplicationCredentials(TypedDict): | |
auth_provider_x509_cert_url: str | |
auth_uri: str | |
client_id: str | |
client_secret: str | |
project_id: str | |
redirect_uris: List[str] | |
token_uri: str | |
class DesktopApplicationCredentials(TypedDict): | |
installed: ApplicationCredentials | |
class UserCredentials(TypedDict): | |
client_id: str | |
client_secret: str | |
expiry: str | |
refresh_token: str | |
scopes: List[str] | |
token: str | |
token_uri: str | |
class CredentialsManager: | |
application_credentials_data: DesktopApplicationCredentials | |
user_credentials_data: Optional[UserCredentials] | |
scopes: List[str] | |
def __init__( | |
self, | |
application_credentials_data: DesktopApplicationCredentials, | |
user_credentials_data: UserCredentials = None, | |
scopes: List[str] = None, | |
): | |
self.application_credentials_data: DesktopApplicationCredentials = application_credentials_data | |
self.user_credentials_data: UserCredentials = user_credentials_data | |
self.scopes = scopes | |
self._creds = None | |
def refresh_credentials(self) -> google.oauth2.credentials.Credentials: | |
if not self._creds and self.user_credentials_data: | |
self._creds = Credentials.from_authorized_user_info(self.user_credentials_data, self.scopes) | |
if not self._creds or not self._creds.valid: | |
if self._creds and self._creds.expired and self._creds.refresh_token: | |
self._creds.refresh(Request()) | |
else: | |
self._creds = InstalledAppFlow.from_client_config( | |
self.application_credentials_data, self.scopes | |
).run_console() | |
self.user_credentials_data = json.loads(self._creds.to_json()) | |
return self._creds | |
@property | |
def user_credentials(self): | |
return self.refresh_credentials() | |
@staticmethod | |
def credentials_to_file(creds: google.oauth2.credentials.Credentials, filename): | |
assert isinstance(creds, google.oauth2.credentials.Credentials) | |
with open(filename, 'w') as fp: | |
json.dump(json.laods(creds.to_json()), fp, sort_keys=True, indent=2) | |
@staticmethod | |
def credentials_from_file(filename, required=False) -> Optional[Dict]: | |
if os.path.isfile(filename): | |
with open(filename, 'r') as fp: | |
return json.load(fp) | |
if required: | |
raise FileNotFoundError(filename) | |
class SystemAccountCredentialsManager: | |
@classmethod | |
def credentials_from_info(cls, data: dict) -> service_account.Credentials: | |
return service_account.Credentials.from_service_account_info(data) | |
@classmethod | |
def credentials_from_file(cls, filename) -> service_account.Credentials: | |
with open(filename, 'r') as fp: | |
data = json.load(fp) | |
return cls.credentials_from_info(data) | |
def _extract_coordinates(item): | |
""" | |
processes the value passed to o[value] notation. | |
Typically it's either a `slice` type of 3 elements, or a single item | |
Meaning of values is similar to Pandas / NumPy multidimensional array get/set notation . | |
'A1', 'B2' = x['A1':'B2'] | |
'A1', None = x['A1':] | |
'A1', 'B2' = x['A1:B2'] | |
'A1', None = x['A1'] | |
""" | |
if isinstance(item, slice): | |
a, b, c = item.start, item.stop, item.step | |
if not(c is None): | |
raise KeyError(item) | |
else: | |
a, b, c = item, None, None | |
if isinstance(a, str): | |
if ':' in a: | |
if b or c: | |
raise KeyError(item) | |
parts = a.split(':') | |
if len(parts) > 2: | |
raise KeyError(item) | |
a, b = parts | |
return a, b | |
def _list_get(l, i, default=None): | |
# safe .get like dict.get(key) | |
try: | |
return l[i] | |
except IndexError: | |
return default | |
NoneConstructor = lambda: None | |
def _list_grow_to_index(l, i, empty_value_contructor=NoneConstructor): | |
indexable_max = len(l) - 1 | |
if i > indexable_max: | |
padding_needed = i - indexable_max | |
while padding_needed: | |
# don't use [O]*padding_needed. | |
# It reuses the O instance, causing referential side effects. | |
# Looping makes constructor to be called separately each time | |
l.append(empty_value_contructor()) | |
padding_needed -= 1 | |
def _list_set(l, i, value, empty_value_contructor=NoneConstructor): | |
indexable_max = len(l) - 1 | |
if i <= indexable_max: | |
l[i] = value | |
elif i == indexable_max + 1: | |
l.append(value) | |
else: # need to grow the underlying array manually | |
_list_grow_to_index(l, i-1, empty_value_contructor) | |
l.append(value) | |
class Grid(list): | |
""" | |
Wraps *sparse* array of arrays representing data for a spreadsheet into | |
a safe "random access" grid. | |
It feels like a mix of array of arrays and a 2-dimensional Pandas / NumPy array | |
Unlike NumPy/Pandas, Row are top-level iterable, not columns. | |
g = Grid([['a1','b1'],['a2','b2']]) | |
'a1' = g[0:0] | |
del g[0:0] | |
g | |
> [[None, 'b1'], ['a2', 'b2']] | |
# auto-create row 3 (zero-indexed), column 4 (zero-indexed) on value set | |
g[3:4] = "row 4 col 5 value" | |
g | |
> [ | |
[None, 'b1'], | |
['a2', 'b2'], | |
[], | |
[None, None, None, None, 'row 4 col 5 value'] | |
] | |
""" | |
def __getitem__(self, item): | |
a, b = _extract_coordinates(item) | |
try: | |
v = super().__getitem__(a) | |
except IndexError: | |
v = [] | |
if b is None: | |
return v | |
else: | |
return _list_get(v, b) | |
def _grow_to_index(self, i) -> Any: | |
indexable_max = len(self) - 1 | |
if i > indexable_max: | |
padding_needed = i - indexable_max | |
while padding_needed: | |
self.append([]) | |
padding_needed -= 1 | |
def __setitem__(self, key, value): | |
a, b = _extract_coordinates(key) | |
_list_grow_to_index(self, a, list) | |
if b is None: | |
super().__setitem__(a, value) | |
else: | |
row = super().__getitem__(a) | |
_list_set( | |
row, | |
b, | |
value, | |
) | |
def __delitem__(self, key): | |
if self.__getitem__(key) is None: | |
return | |
self.__setitem__(key, None) | |
@property | |
def height(self): | |
return len(self) | |
@property | |
def width(self): | |
width = 0 | |
for e in self: | |
width = max(width, len(e)) | |
return width | |
class Sheet: | |
def __init__(self, name, spreadsheet_id, api): | |
self.name = name | |
self.spreadsheet_id = spreadsheet_id | |
self.api = api | |
def __repr__(self): | |
return super().__repr__()[:-1] + f" '{self.name}'>" | |
def _form_range(self, a, b): | |
if b: | |
return f"{self.name}!{a}:{b}" | |
else: | |
return f"{self.name}!{a}" | |
def __delitem__(self, key): | |
a, b = _extract_coordinates(key) | |
self.api.values().clear( | |
spreadsheetId=self.spreadsheet_id, | |
range=self._form_range(a, b) | |
).execute() | |
def __setitem__(self, key, value): | |
""" | |
x['A1':'B2'] = [['a1', 'b1'], ['a2', 'b2']] | |
x['A1'] = 'a1' # note no brackets for single set. | |
:param key: slice('A1', 'B2') or 'A1' | |
:param value: [[1,2],[3,4]] or 5 | |
""" | |
a, b = _extract_coordinates(key) | |
rr = self._form_range(a, b) | |
self.api.values().update( | |
spreadsheetId=self.spreadsheet_id, | |
range=rr, | |
includeValuesInResponse=False, | |
valueInputOption='RAW', | |
body=dict( | |
majorDimension=MAJOR_DIMENSION_ROWS, | |
range=rr, | |
values=value if b else [[value]], | |
) | |
).execute() | |
def __getitem__(self, item) -> Union[Grid,None,object]: | |
a, b = _extract_coordinates(item) | |
v = self.api.values().get( | |
spreadsheetId=self.spreadsheet_id, | |
range=self._form_range(a, b), | |
majorDimension=MAJOR_DIMENSION_ROWS, | |
).execute().get('values', []) | |
if b is None: | |
# return a single cell value, not an array of arrays | |
# if value was there, it returns as [[value]] from Google | |
# If it's empty, we have our default empty array, which should result in None returned | |
# let's use a loop and fall through it if return array is empty. | |
for row in v: | |
for cell_value in row: | |
return cell_value | |
return None | |
return Grid(v) | |
class Spreadsheet(Iterable): | |
""" | |
Works like a Dict, where keys are sheet names and values are Sheet object | |
where Sheet object also works as Dict supporting | |
setting: | |
# array of rows | |
sheet['A1':'B2'] = [ | |
["A1", "B1"], | |
[NO_CHANGE, "B2"] | |
] | |
sheet['A1'] = 'a1' # note, no brackets for single set | |
getting: | |
[["A1", "B1"], [None, "B2"]] = sheet['A1':'B2'] | |
clearing cells: | |
del sheet['A1':'B2'] | |
""" | |
API_VERSION = 'v4' | |
API_NODE = 'sheets' | |
SCOPES = [ | |
'https://www.googleapis.com/auth/drive', | |
'https://www.googleapis.com/auth/drive.readonly', | |
] | |
def __init__(self, spreadsheet_id, credentials: google.oauth2.credentials.Credentials): | |
self.spreadsheet_id = spreadsheet_id | |
self.credentials = credentials | |
def __repr__(self): | |
return super().__repr__()[:-1] + f" '{self.spreadsheet_id}'>" | |
@cached_property | |
def api(self): | |
return build(self.API_NODE, self.API_VERSION, credentials=self.credentials).spreadsheets() | |
@cache | |
def keys(self): | |
vv = self.api.get( | |
spreadsheetId=self.spreadsheet_id, | |
includeGridData=False, | |
).execute() | |
return [ | |
v['properties']['title'] | |
for v in vv.get('sheets', []) | |
] | |
def values(self) -> list[Sheet]: | |
return [ | |
Sheet( | |
name, | |
self.spreadsheet_id, | |
self.api, | |
) | |
for name in self.keys() | |
] | |
def items(self): | |
return [ | |
(sheet.name, sheet) | |
for sheet in self.values() | |
] | |
def __getitem__(self, item: str) -> Sheet: | |
if item in self.keys(): | |
return Sheet( | |
item, | |
self.spreadsheet_id, | |
self.api, | |
) | |
raise KeyError(item) | |
def __iter__(self) -> Iterator[Sheet]: | |
return self.values().__iter__() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment