Last active
March 25, 2024 22:27
-
-
Save yucer/1e4f29d868f63ae68310ebef8dc02c89 to your computer and use it in GitHub Desktop.
Python script to Bulk delete google emails with gmail API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import datetime as dt | |
import os.path | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
from rich.console import Console | |
from rich.table import Table, box | |
console = Console() | |
# If modifying these scopes, delete the file token.json. | |
SCOPES = [ | |
"https://mail.google.com/", | |
] | |
def list_labels(gmail): | |
results = gmail.users().labels().list(userId="me").execute() | |
labels = results.get("labels", []) | |
print(labels) | |
def get_message(gmail, msg_id): | |
msg = gmail.users().messages().get( | |
userId="me", id=msg_id, format="metadata" | |
).execute() | |
return msg | |
def delete_message(gmail, msg_id): | |
gmail.users().messages().delete(userId="me", id=msg_id).execute() | |
def trash_message(gmail, msg_id): | |
gmail.users().messages().trash(userId="me", id=msg_id).execute() | |
def get_msg_table(page_token): | |
table = Table( | |
show_header=True, | |
header_style="bold magenta", | |
box=box.MINIMAL | |
) | |
table.add_column("idx", justify="right", style="dim", width=7) | |
table.add_column("MsgID", justify="right") | |
table.add_column("Date (UTC)") | |
table.add_column("From") | |
table.add_column("Subject") | |
return table | |
def process_messages(gmail, query, trash, delete): | |
page_token = 'first page' | |
table = get_msg_table(page_token) | |
idx = 0 | |
while page_token is not None: | |
kwargs = dict(userId="me") | |
if page_token and (page_token != 'first page'): | |
kwargs.update(pageToken=page_token) | |
if query is not None: | |
kwargs.update(q=query) | |
results = gmail.users().messages().list(**kwargs).execute() | |
page_token = results.get('nextPageToken') | |
for msg in results.get('messages', []): | |
idx += 1 | |
msg_id = msg['id'] | |
message = get_message(gmail, msg_id) | |
headers = {d['name']: d['value'] for d in message['payload']['headers']} | |
ts_mili = int(message['internalDate']) | |
msg_ts = dt.datetime.fromtimestamp(ts_mili/1000.0) | |
table.add_row( | |
f"{idx}", | |
msg_id, | |
str(msg_ts), | |
headers.get('From', ''), | |
headers.get('Subject', '') | |
) | |
console.print(table) | |
if delete: | |
delete_message(gmail, msg_id) | |
elif trash: | |
trash_message(gmail, msg_id) | |
def get_api_client(): | |
token_filename = os.path.basename(__file__) + '.token.json' | |
creds = None | |
if os.path.exists(token_filename): | |
creds = Credentials.from_authorized_user_file(token_filename, SCOPES) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
"credentials.json", SCOPES | |
) | |
creds = flow.run_local_server(port=0) | |
with open(token_filename, "w") as token: | |
token.write(creds.to_json()) | |
api_client = build("gmail", "v1", credentials=creds) | |
return api_client | |
def main(): | |
parser = argparse.ArgumentParser(description='Process some integers.') | |
parser.add_argument('--query', help='message query') | |
parser.add_argument('--trash', action='store_true', help='move to trash') | |
parser.add_argument('--delete', action='store_true', help='delete permanently') | |
args = parser.parse_args() | |
try: | |
gmail = get_api_client() | |
process_messages(gmail, args.query, trash=args.trash, delete=args.delete) | |
except HttpError as error: | |
print(f"An error occurred: {error}") | |
except KeyboardInterrupt: | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Disclaimer of No Responsibility for Software Use
Description:
Inspired from Python Quickstart from Gmail API of Google Workspace. The script needs to setup the environment first, as advised there. The file * credentials.json` resulting from that config process should be located in the same folder as this script.
The script requires Rich, you can install it via:
python -m pip install rich
.The parameters are:
query
: the gmail filter in order to select just a subset of the emails.trash
: if you want to move the messages to the trashdelete
: if you want to delete the emails inmediatelyIf you don't use
trash
ordelete
options, then the emails are just listed.These are some usage examples:
python3 gmail_delete.py --query "from:example.com"
, just list all the emails coming from the domainexample.com
.python3 gmail_delete.py --query "from:example.com" --trash
move all the emails coming from the domainexample.com
to the trash.python3 gmail_delete.py --query "from:example.com" --delete
permanently delete all the emails coming from the domainexample.com
python3 gmail_delete.py --query "in:inbox from:[email protected]" --delete
python3 gmail_delete.py --query "in:inbox before:2023/6/1" --trash
move to the trash all the emails from the inbox before01-06-2023
python3 gmail_delete.py --query "in:inbox has:attachment after:2022/1/1 before:2023/1/1" --trash
move to the trash all the emails that have attachment and arrived in 2022.