Skip to content

Instantly share code, notes, and snippets.

@yucer
Created March 25, 2024 21:57
Show Gist options
  • Save yucer/bb86df7a8033dc275ec7e6c6e37d59a1 to your computer and use it in GitHub Desktop.
Save yucer/bb86df7a8033dc275ec7e6c6e37d59a1 to your computer and use it in GitHub Desktop.
Python script to get stats from google emails with gmail API
#!/usr/bin/env python3
import argparse
import os.path
from collections import defaultdict
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from rich.live import Live
from rich.console import Console
from rich.table import Table, box
console = Console()
# If modifying these scopes, delete the file token.json.
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
]
def convert_bytes(num): # check: https://stackoverflow.com/a/14822210
step_unit = 1024
for x in ['b', 'KB', 'MB', 'GB', 'TB']:
if num < step_unit:
return "%3.1f %s" % (num, x)
num /= step_unit
def get_message(gmail, msg_id):
msg = gmail.users().messages().get(
userId="me", id=msg_id, format="metadata"
).execute()
return msg
def get_stats_table(msg_sizes, msg_count, top, query, sort_by_count):
if sort_by_count:
sort_dict = msg_count
sort_field = 'msg count'
else:
sort_dict = msg_sizes
sort_field = 'total msg sizes'
table = Table(
title=f'Top {top} address ordered by {sort_field} for query "{query}"',
show_header=True,
header_style='bold magenta',
)
table.add_column("Pos", justify="right")
table.add_column("From")
table.add_column("Count", justify="right")
table.add_column("Size", justify="right")
origins = sorted(sort_dict, key=sort_dict.__getitem__, reverse=True)
for idx, origin in enumerate(origins[:top], 1):
table.add_row(
str(idx),
origin,
str(msg_count[origin]),
convert_bytes(msg_sizes[origin])
)
return table
def process_messages(gmail, query, top, sort_by_count):
page_token = 'first page'
idx = 0
msg_sizes = defaultdict(int)
msg_count = defaultdict(int)
while page_token is not None:
kwargs = dict(userId="me")
if page_token and (page_token != 'first page'):
kwargs.update(pageToken=page_token)
if query is not None:
kwargs.update(q=query)
results = gmail.users().messages().list(**kwargs).execute()
page_token = results.get('nextPageToken')
with Live(get_stats_table(msg_sizes, msg_count, top, query, sort_by_count),
refresh_per_second=1) as live:
for msg in results.get('messages', []):
idx += 1
msg_id = msg['id']
message = get_message(gmail, msg_id)
headers = {d['name']: d['value'] for d in message['payload']['headers']}
msg_from = headers.get('From', '')
msg_sizes[msg_from] += message['sizeEstimate']
msg_count[msg_from] += 1
live.update(
get_stats_table(msg_sizes, msg_count, top, query, sort_by_count)
)
def get_api_client():
token_filename = os.path.basename(__file__) + '.token.json'
creds = None
if os.path.exists(token_filename):
creds = Credentials.from_authorized_user_file(token_filename, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
"credentials.json", SCOPES
)
creds = flow.run_local_server(port=0)
with open(token_filename, "w") as token:
token.write(creds.to_json())
api_client = build("gmail", "v1", credentials=creds)
return api_client
def main():
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--query', help='message query')
parser.add_argument('--top', type=int, default=30, help='max top rows to show')
parser.add_argument('--sort-by-count', action='store_true', help='sort by msg count')
args = parser.parse_args()
try:
gmail = get_api_client()
process_messages(gmail, args.query, args.top, args.sort_by_count)
except HttpError as error:
print(f"An error occurred: {error}")
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
@yucer
Copy link
Author

yucer commented Mar 25, 2024

Inspired from Python Quickstart from Gmail API of Google Workspace. The script needs to setup the environment first, as advised there. The file * credentials.json` resulting from that config process should be located in the same folder as this script.

The script requires Rich, you can install it via: python -m pip install rich.

The parameters are:

  • query: the gmail filter in order to select just a subset of the emails.
  • top: get the top rows for the stat, in all the cases the sort is made by reverse order
  • sort-by-count: if you want to sort the result by email count by origin address (from) otherwise it uses sum(size) by origin.

These are some usage examples:

  1. python3 gmail_stats.py --query "in:inbox" , show stats for emails in inbox.
  2. python3 gmail_stats.py --query "in:inbox is:unread" --top 40 show top 40 addresses that sent emails that are unread in inbox sorted by descending size
  3. python3 gmail_stats.py --query "in:inbox to:[email protected] has:attachment" --top 20 --sort-by-count show top 20 addresses of messages in inbox directed to address [email protected] sorted by amount of messages in reverse order

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment