Last active
September 11, 2024 03:21
-
-
Save ice1000/88df12fd0b9038424887d4cb0097539b to your computer and use it in GitHub Desktop.
My take on unsubscription automation of Deadline Hollywood, Variety Breaking News, and Billboard, inspired from the scripts by Alex Chi and Felix Yan
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from email.parser import HeaderParser | |
import imaplib | |
import time | |
import requests | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
imaplib._MAXLINE = 40000000 | |
ONE_TIME_LIMIT = 10 | |
mailparser = HeaderParser() | |
SUCCESS_COUNT = 0 | |
def felix_yan(m): | |
global SUCCESS_COUNT | |
start_time = time.time() | |
resp, data = m.uid('search', None, 'OR OR (FROM [email protected]) (FROM [email protected]) (FROM [email protected])') | |
uids = data[0].split() | |
collected_data01s = [] | |
for uid in uids[:ONE_TIME_LIMIT]: | |
resp, data = m.uid('fetch', uid, "(BODY[HEADER])") | |
m.uid("store", uid, '+FLAGS', '\\Deleted') | |
collected_data01s.append(data[0][1]) | |
print("Collection finished in", time.time() - start_time, "seconds", flush=True) | |
def make_request(data01): | |
msg = mailparser.parsestr(data01.decode()) | |
url = msg['List-Unsubscribe'] | |
url = url.split('<')[1].split('>')[0] | |
response = requests.post(url, data={"List-Unsubscribe": "One-Click"}) | |
return (response.status_code, response.text, msg['From'], msg['Date'], msg['Subject']) | |
processes = [] | |
total = len(collected_data01s) | |
with ThreadPoolExecutor(max_workers=8) as executor: | |
for data01 in collected_data01s: | |
processes.append(executor.submit(make_request, data01)) | |
for idx, response in enumerate(as_completed(processes)): | |
status_code, text, sender, date, subject = response.result() | |
print("Unsubscribing {}/{}: {} {} {} {} {}" | |
.format(idx + 1, total, status_code, text, sender, date, subject), flush=True) | |
SUCCESS_COUNT = SUCCESS_COUNT + 1 | |
print("Unsubscription ", SUCCESS_COUNT, " finished in", time.time() - start_time, "seconds", flush=True) | |
print("Purging...", m.expunge()) | |
if len(uids) < ONE_TIME_LIMIT: | |
print("No more emails, slowing down...") | |
time.sleep(30) | |
while True: | |
try: | |
m = imaplib.IMAP4_SSL("imap.gmail.com") | |
m.login("EMAIL", "PASSWORD") | |
m.select("[Gmail]/Spam") | |
except Exception as e: | |
print("ERROR:", e) | |
time.sleep(300) | |
continue | |
print("Login successful") | |
while True: | |
try: | |
felix_yan(m) | |
except imaplib.IMAP4.abort: | |
print("ABORT, reconnecting...") | |
break | |
except Exception as e: | |
print("ERROR:", e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment