Skip to content

Instantly share code, notes, and snippets.

@maxrp
Last active October 19, 2015 18:02
Show Gist options
  • Save maxrp/7ef9d6ef957343c6af8b to your computer and use it in GitHub Desktop.
Save maxrp/7ef9d6ef957343c6af8b to your computer and use it in GitHub Desktop.
notes on google sheets query
#!/usr/bin/python3
# to install dependencies, run: pip install gspread IPy oauth2client
import json
import gspread
import sys
from IPy import IP
from oauth2client.client import SignedJwtAssertionCredentials as jwtassert
def auth(secrets):
# this is a context manager, and will close the secrets file after use
with open(secrets) as secrets_file:
sec = json.load(secrets_file)
# here we convert the private key which we read out of a UTF8 encoded file into raw bytes
private_key = bytes(sec['private_key'], 'utf-8')
# this is the scope of permissions we will request
scope = 'https://spreadsheets.google.com/feeds'
# compile the authentication assertion and sign it with our private key
creds = jwtassert(sec['client_email'], private_key, scope)
# return a fully authorized session object
return gspread.authorize(creds)
def filter_sep_row(filter_fn, row, criteria):
""" This function is a generator which will yield new values until it runs out.
It takes a function, a list of data and a list of 'criteria' in the form of
possible substrings and splits the line based on the 'criteria' characters.
We then apply the filter function (filter_fn) to the resulting chunks yielding
it's result if it returns a result."""
for criterion in criteria:
if criterion in row:
for val in row.split(criterion):
# now that everything is split up, try to yield the result of filter_fn
yield filter_fn(val)
if row:
# the row didn't have any of the likely separators, so try to yield the result of filter_fn
yield filter_fn(row)
def filter_ips(chunk):
"""Tries to parse a 'chunk' as an IPv4 or IPv6 address, silently passes if input is not an IP."""
if len(chunk) < 7:
# this heuristic moderately reduces execution time, IPv4 addresses are at least 7 chars
pass
else:
try:
return IP(chunk)
# here we catch any exception whatsoever and keep moving
except:
pass
def main():
# create an authorized session
gsheet = auth('sheetquery.json')
# open the spreadsheet by name
sheet = gsheet.open('Account Compromise Tracking')
# we know we only really want the first worksheet, so select it
wks = sheet.get_worksheet(0)
# create an empty list to push our IPs into as we discover them
ips = []
# we happen to know in this case the 13th column is the IP column
for row in wks.col_values(13):
# here we chunk up the content of the field, ensure they are an IP and yield ther parts to this loop
for ip in filter_sep_row(filter_ips, row, [' ', ',', '(', ')', ';']):
if ip:
ips.append(ip)
# by casting to a set here, we ensure the listing is deduplicated
for ip in set(ips):
if ip:
print(ip)
if __name__ == '__main__':
main()
#!/usr/bin/python3
import json
import gspread
import sys
from oauth2client.client import SignedJwtAssertionCredentials as jwtassert
def auth(secrets_file):
"""Load a service account private key and return a new session."""
with open(secrets_file) as secrets:
secrets = json.load(secrets)
private_key = bytes(secrets['private_key'], 'utf-8')
scope = 'https://spreadsheets.google.com/feeds',
creds = jwtassert(secrets['client_email'], private_key, scope)
return gspread.authorize(creds)
class Sheet(object):
"""Authenticates and loads a worksheet and all column values into a dict
for further manipulation."""
def __init__(self, title, secrets_file, worksheet=0, header_row=1):
"""Auth and load the worksheet.
Args:
title (str): The title of the spreadsheet to load.
secrets_file (str): A file with google service account credentials.
Kwargs:
worksheet (int): Which worksheet to load (default=0)
header_row (int): Which row contains the headers (default=1)
"""
self.session = auth(secrets_file)
sheet = self.session.open(title)
self.sheet = sheet.get_worksheet(worksheet)
self.columns = self.load_columns(header_row)
def load_columns(self, header_row):
"""Return a dict of the worksheet columns mapped to the names in the
header_row."""
columns = dict()
# map header names into a dict, index counts from 1 not 0
index_mapping = enumerate(self.sheet.row_values(header_row), start=1)
for (index, name) in index_mapping:
columns[name] = self.sheet.col_values(index)
return columns
def main():
worksheet = Sheet('Account Compromise Tracking', 'sheetquery.json')
if sys.argv[1]:
print("\n".join(x for x in worksheet.columns[sys.argv[1]] if x))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment