Last active
March 25, 2024 22:27
-
-
Save yucer/1e4f29d868f63ae68310ebef8dc02c89 to your computer and use it in GitHub Desktop.
Python script to Bulk delete google emails with gmail API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import datetime as dt | |
| import os.path | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| from rich.console import Console | |
| from rich.table import Table, box | |
| console = Console() | |
| # If modifying these scopes, delete the file token.json. | |
| SCOPES = [ | |
| "https://mail.google.com/", | |
| ] | |
| def list_labels(gmail): | |
| results = gmail.users().labels().list(userId="me").execute() | |
| labels = results.get("labels", []) | |
| print(labels) | |
| def get_message(gmail, msg_id): | |
| msg = gmail.users().messages().get( | |
| userId="me", id=msg_id, format="metadata" | |
| ).execute() | |
| return msg | |
| def delete_message(gmail, msg_id): | |
| gmail.users().messages().delete(userId="me", id=msg_id).execute() | |
| def trash_message(gmail, msg_id): | |
| gmail.users().messages().trash(userId="me", id=msg_id).execute() | |
| def get_msg_table(page_token): | |
| table = Table( | |
| show_header=True, | |
| header_style="bold magenta", | |
| box=box.MINIMAL | |
| ) | |
| table.add_column("idx", justify="right", style="dim", width=7) | |
| table.add_column("MsgID", justify="right") | |
| table.add_column("Date (UTC)") | |
| table.add_column("From") | |
| table.add_column("Subject") | |
| return table | |
| def process_messages(gmail, query, trash, delete): | |
| page_token = 'first page' | |
| table = get_msg_table(page_token) | |
| idx = 0 | |
| while page_token is not None: | |
| kwargs = dict(userId="me") | |
| if page_token and (page_token != 'first page'): | |
| kwargs.update(pageToken=page_token) | |
| if query is not None: | |
| kwargs.update(q=query) | |
| results = gmail.users().messages().list(**kwargs).execute() | |
| page_token = results.get('nextPageToken') | |
| for msg in results.get('messages', []): | |
| idx += 1 | |
| msg_id = msg['id'] | |
| message = get_message(gmail, msg_id) | |
| headers = {d['name']: d['value'] for d in message['payload']['headers']} | |
| ts_mili = int(message['internalDate']) | |
| msg_ts = dt.datetime.fromtimestamp(ts_mili/1000.0) | |
| table.add_row( | |
| f"{idx}", | |
| msg_id, | |
| str(msg_ts), | |
| headers.get('From', ''), | |
| headers.get('Subject', '') | |
| ) | |
| console.print(table) | |
| if delete: | |
| delete_message(gmail, msg_id) | |
| elif trash: | |
| trash_message(gmail, msg_id) | |
| def get_api_client(): | |
| token_filename = os.path.basename(__file__) + '.token.json' | |
| creds = None | |
| if os.path.exists(token_filename): | |
| creds = Credentials.from_authorized_user_file(token_filename, SCOPES) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| else: | |
| flow = InstalledAppFlow.from_client_secrets_file( | |
| "credentials.json", SCOPES | |
| ) | |
| creds = flow.run_local_server(port=0) | |
| with open(token_filename, "w") as token: | |
| token.write(creds.to_json()) | |
| api_client = build("gmail", "v1", credentials=creds) | |
| return api_client | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Process some integers.') | |
| parser.add_argument('--query', help='message query') | |
| parser.add_argument('--trash', action='store_true', help='move to trash') | |
| parser.add_argument('--delete', action='store_true', help='delete permanently') | |
| args = parser.parse_args() | |
| try: | |
| gmail = get_api_client() | |
| process_messages(gmail, args.query, trash=args.trash, delete=args.delete) | |
| except HttpError as error: | |
| print(f"An error occurred: {error}") | |
| except KeyboardInterrupt: | |
| pass | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Disclaimer of No Responsibility for Software Use
Description:
Inspired from Python Quickstart from Gmail API of Google Workspace. The script needs to setup the environment first, as advised there. The file * credentials.json` resulting from that config process should be located in the same folder as this script.
The script requires Rich, you can install it via:
python -m pip install rich.The parameters are:
query: the gmail filter in order to select just a subset of the emails.trash: if you want to move the messages to the trashdelete: if you want to delete the emails inmediatelyIf you don't use
trashordeleteoptions, then the emails are just listed.These are some usage examples:
python3 gmail_delete.py --query "from:example.com", just list all the emails coming from the domainexample.com.python3 gmail_delete.py --query "from:example.com" --trashmove all the emails coming from the domainexample.comto the trash.python3 gmail_delete.py --query "from:example.com" --deletepermanently delete all the emails coming from the domainexample.compython3 gmail_delete.py --query "in:inbox from:[email protected]" --deletepython3 gmail_delete.py --query "in:inbox before:2023/6/1" --trashmove to the trash all the emails from the inbox before01-06-2023python3 gmail_delete.py --query "in:inbox has:attachment after:2022/1/1 before:2023/1/1" --trashmove to the trash all the emails that have attachment and arrived in 2022.