Skip to content

Instantly share code, notes, and snippets.

@4piu
Last active March 5, 2025 19:52
Show Gist options
  • Save 4piu/908e467f8a67b3c389faa0c8844680bd to your computer and use it in GitHub Desktop.
Save 4piu/908e467f8a67b3c389faa0c8844680bd to your computer and use it in GitHub Desktop.
Automatically move promotion and updates email older than 30 days to trash. Require OAuth app created on GCP.
import logging
import logging.config
import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
},
},
"formatters": {
"default": {
"format": "%(asctime)s [%(levelname)s] %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
},
"root": {
"level": "INFO",
"handlers": ["console"],
},
}
)
logger = logging.getLogger(__name__)
# If modifying these scopes, delete the file token.json.
SCOPES = ["https://mail.google.com/"]
creds = None
def login():
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
global creds
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
# Save the refreshed token
with open("token.json", "w") as token:
token.write(creds.to_json())
else:
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open("token.json", "w") as token:
token.write(creds.to_json())
def clean_category(service, category, older_than="30d"):
logger.info(f"Cleaning {category} older than {older_than}")
while True:
pageToken = None
results = (
service.users()
.messages()
.list(
userId="me",
q=f"category:{category} older_than:{older_than}",
pageToken=pageToken,
)
.execute()
)
messages = results.get("messages", [])
if not messages:
logger.info("No messages found.")
return
logger.info(f"Deleting {len(messages)} messages")
service.users().messages().batchDelete(
userId="me", body={"ids": [msg["id"] for msg in messages]}
).execute()
if "nextPageToken" not in results:
logger.info("No more messages to delete.")
return
pageToken = results["nextPageToken"]
def mark_as_read(service, older_than="30d"):
logger.info(f"Marking messages older than {older_than} as read")
while True:
pageToken = None
results = (
service.users()
.messages()
.list(
userId="me", q=f"is:unread older_than:{older_than}", pageToken=pageToken
)
.execute()
)
messages = results.get("messages", [])
if not messages:
logger.info("No messages found.")
return
logger.info(f"Marking as read {len(messages)} messages")
service.users().messages().batchModify(
userId="me",
body={"ids": [msg["id"] for msg in messages], "removeLabelIds": ["UNREAD"]},
).execute()
if "nextPageToken" not in results:
logger.info("No more messages to mark as read.")
return
pageToken = results["nextPageToken"]
def main():
login()
try:
service = build("gmail", "v1", credentials=creds)
clean_category(service, "promotions", "30d")
clean_category(service, "updates", "30d")
clean_category(service, "forums", "1y")
mark_as_read(service, "7d")
except HttpError as error:
logger.error(f"Error: {error}")
if __name__ == "__main__":
main()
google-api-python-client
google-auth-httplib2
google-auth-oauthlib
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment