Created
September 8, 2017 20:40
-
-
Save eriknelson/05ee80a1e6e0987db9b06c2a2ec48eb5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import imaplib | |
import re | |
import email | |
################################################################################ | |
# RULE DEFINITIONS | |
################################################################################ | |
class ListHeaderRule: | |
def __init__(self): | |
self.name = 'List-Header check' | |
def hit_msg(self, msg): | |
sender = msg['From'] | |
print 'Found unsub header in msg [ {0} ] from: [ {1} ]'.format(msgid, sender) | |
print 'List-Unsubscribe: {0}'.format(msg['List-Unsubscribe']) | |
def is_hit(self, msg): | |
return 'List-Unsubscribe' in msg | |
class RegexMatchRule: | |
def __init__(self, regex): | |
self.name = 'Regex Match: {0}'.format(regex) | |
self.regex = regex | |
# Make sure we're not looking at things like attachments or images | |
self.valid_payload_types= { | |
'text/plain': True, | |
'text/html': True, | |
} | |
def hit_msg(self, msg): | |
print '{0} got a hit in message with subject [ {1} ] from: [{2}]'.format( | |
self.name, msg['Subject'], msg['From'] | |
def is_hit(self, msg): | |
for part in msg.walk(): | |
if part.get_content_type() in self.valid_payload_types: | |
re.part.get | |
payload = part.get_payload() | |
match = re.match(self.regex, payload, re.I) # Case insensitive match | |
return True if match else False | |
################################################################################ | |
# HELPERS | |
################################################################################ | |
def run_rules(rules, msg): | |
is_hit = False | |
for rule in rules: | |
print "============================================================" | |
print "Executing rule: " + rule.name | |
print "============================================================" | |
if rule.is_hit(msg): | |
rule.hit_msg(msg) | |
is_hit = True | |
break | |
return is_hit | |
################################################################################ | |
# CONFIG | |
################################################################################ | |
EMAIL_USER = 'xxx' | |
EMAIL_PASS = 'xxx' | |
DUMP_MESSAGES = True | |
DUMP_LOCATION = '/tmp/unsub_raw_mail' | |
################################################################################ | |
# MAIN | |
################################################################################ | |
# Build rules | |
rules = [ | |
ListHeaderRule(), | |
RegexMatchRule(r'unsubscribe'), | |
# RegexMatchRule(r'some other string you want to hit'), | |
] | |
mail = imaplib.IMAP4_SSL('imap.gmail.com') | |
mail.login(EMAIL_USER, EMAIL_PASS) | |
mail.select('[Gmail]/All Mail') | |
_, data = mail.search(None, 'All') | |
ids = data[0] | |
id_list = ids.split() | |
for msgid in id_list: | |
_, data = mail.fetch(msgid, '(RFC822)') | |
raw_msg = data[0][1] | |
msg = email.message_from_string(raw_msg) | |
# dump all the messages to /tmp | |
if DUMP_MESSAGES: | |
if not os.path.exists(DUMP_LOCATION): | |
os.makedirs(DUMP_DUMP_LOCATION) | |
subject = msg['Subject'].replace(' ', '_') | |
filename = DUMP_LOCATION + '/' + subject + '.txt' with open(filename, 'w') as f: | |
f.write(raw_msg) | |
is_hit = run_rules(rules, msg) | |
if is_hit: | |
hitcount += 1 | |
print "============================================================" | |
print "Status:" | |
print "============================================================" | |
print 'Found {0} of {1} messages with unsubscribe headers...'.format(hitcount, len(id_list)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment