Skip to content

Instantly share code, notes, and snippets.

@ice1000
Last active September 11, 2024 03:21
Show Gist options
  • Save ice1000/88df12fd0b9038424887d4cb0097539b to your computer and use it in GitHub Desktop.
Save ice1000/88df12fd0b9038424887d4cb0097539b to your computer and use it in GitHub Desktop.
My take on unsubscription automation of Deadline Hollywood, Variety Breaking News, and Billboard, inspired from the scripts by Alex Chi and Felix Yan
from email.parser import HeaderParser
import imaplib
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
imaplib._MAXLINE = 40000000
ONE_TIME_LIMIT = 10
mailparser = HeaderParser()
SUCCESS_COUNT = 0
def felix_yan(m):
global SUCCESS_COUNT
start_time = time.time()
resp, data = m.uid('search', None, 'OR OR (FROM [email protected]) (FROM [email protected]) (FROM [email protected])')
uids = data[0].split()
collected_data01s = []
for uid in uids[:ONE_TIME_LIMIT]:
resp, data = m.uid('fetch', uid, "(BODY[HEADER])")
m.uid("store", uid, '+FLAGS', '\\Deleted')
collected_data01s.append(data[0][1])
print("Collection finished in", time.time() - start_time, "seconds", flush=True)
def make_request(data01):
msg = mailparser.parsestr(data01.decode())
url = msg['List-Unsubscribe']
url = url.split('<')[1].split('>')[0]
response = requests.post(url, data={"List-Unsubscribe": "One-Click"})
return (response.status_code, response.text, msg['From'], msg['Date'], msg['Subject'])
processes = []
total = len(collected_data01s)
with ThreadPoolExecutor(max_workers=8) as executor:
for data01 in collected_data01s:
processes.append(executor.submit(make_request, data01))
for idx, response in enumerate(as_completed(processes)):
status_code, text, sender, date, subject = response.result()
print("Unsubscribing {}/{}: {} {} {} {} {}"
.format(idx + 1, total, status_code, text, sender, date, subject), flush=True)
SUCCESS_COUNT = SUCCESS_COUNT + 1
print("Unsubscription ", SUCCESS_COUNT, " finished in", time.time() - start_time, "seconds", flush=True)
print("Purging...", m.expunge())
if len(uids) < ONE_TIME_LIMIT:
print("No more emails, slowing down...")
time.sleep(30)
while True:
try:
m = imaplib.IMAP4_SSL("imap.gmail.com")
m.login("EMAIL", "PASSWORD")
m.select("[Gmail]/Spam")
except Exception as e:
print("ERROR:", e)
time.sleep(300)
continue
print("Login successful")
while True:
try:
felix_yan(m)
except imaplib.IMAP4.abort:
print("ABORT, reconnecting...")
break
except Exception as e:
print("ERROR:", e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment