Last active
January 1, 2024 11:18
-
-
Save aptgetupgrade/3a5eb63f40496122ee547be670a1d8cc to your computer and use it in GitHub Desktop.
code to poll emails from an IMAP email server and post these to wallabag
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Read email in inbox, look for any URLs and update them to wallabag, delete email if URLs found in it. | |
# Built from various internet sources. Final code donated under GPL v3 by Github user aptgetupgrade | |
# Tested using python 2. Could be run as part of a crontab, e.g. every 5 mins. | |
# | |
import sys | |
import imaplib | |
import getpass | |
import email | |
import email.header | |
import datetime | |
import HTMLParser | |
import re | |
import requests | |
EMAIL_ACCOUNT = "your@account" | |
EMAIL_FOLDER = "INBOX" | |
M = imaplib.IMAP4_SSL('your.imap.server') | |
P = "The_password_of_your_imap_account" | |
W_HOST = 'https://yourwallabag.install/wallabag_root_directory_if_applicable' | |
W_USERNAME = 'wallabag_user' | |
W_PASSWORD = 'password_of_wallabag_user' | |
W_CLIENTID = 'secret_client_id' | |
W_SECRET = 'secret_secret' | |
script_name = 'email_parse_url.py' | |
print('#################') | |
print('Start of execution of script ' + script_name + ' : ' + str(datetime.datetime.now())) | |
################### | |
######## Initialisation of this script's variables | |
################### | |
#Initialisation of Wallabag connection | |
print('Request wallabag token') | |
gettoken = {'username': W_USERNAME, 'password': W_PASSWORD, 'client_id': W_CLIENTID, 'client_secret': W_SECRET, 'grant_type': 'password'} | |
r = requests.get('{}/oauth/v2/token'.format(W_HOST), gettoken) | |
access = r.json().get('access_token') | |
#Mailbox scan procedure | |
def process_mailbox(M): | |
rv, data = M.search(None, "ALL") | |
if rv != 'OK': | |
print "No messages found!" | |
return | |
for num in data[0].split(): | |
print 'Processing email #' + str(num) | |
rv, data = M.fetch(num, '(RFC822)') | |
if rv != 'OK': | |
print "ERROR getting message", num | |
return | |
msg = email.message_from_string(data[0][1]) | |
decode = email.header.decode_header(msg['Subject'])[0] | |
subject = unicode(decode[0]) | |
print 'Message %s: %s' % (num, subject) | |
print 'Raw Date:', msg['Date'] | |
# Now convert to local date-time | |
date_tuple = email.utils.parsedate_tz(msg['Date']) | |
if date_tuple: | |
local_date = datetime.datetime.fromtimestamp( | |
email.utils.mktime_tz(date_tuple)) | |
print "Local Date:", \ | |
local_date.strftime("%a, %d %b %Y %H:%M:%S") | |
#Print the whole message - use for debug purposes | |
#print "PrintMessageBegin#############################################################################" | |
#print "Message: ", str(data[0][1]) | |
#print "PrintMessageEnd###############################################################################" | |
body = "" | |
print 'Extracting the body from the message' | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
ctype = part.get_content_type() | |
cdispo = str(part.get('Content-Disposition')) | |
# skip any text/plain (txt) attachments | |
if ctype == 'text/plain' and 'attachment' not in cdispo: | |
body = part.get_payload(decode=True) # decode | |
break | |
# not multipart - i.e. plain text, no attachments, keeping fingers crossed | |
else: | |
body = msg.get_payload(decode=True) | |
#In case you need a HTML parser | |
#class parseLinks(HTMLParser.HTMLParser): | |
# def handle_starttag(self, tag, attrs): | |
# if tag == 'a': | |
# for name, value in attrs: | |
# if name == 'href': | |
# print value | |
# print self.get_starttag_text() | |
#someHtmlContainingLinks = body | |
#linkParser = parseLinks() | |
#linkParser.feed(someHtmlContainingLinks) | |
print 'Extracting any URLs from the body' | |
url_list = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\), ]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', body) | |
print url_list | |
if len(url_list) == 0: | |
print 'Nothing to do, the message does not contain any URL' | |
else: | |
print 'Send URLs found to wallabag' | |
a = 0 # should the article be already read? 0 or 1 | |
f = 0 # should the article be added as favorited? 0 or 1 | |
for urlno in range(len(url_list)): | |
url = url_list[urlno] | |
print 'Sending URL #' + str(urlno + 1) + ' - ' + url | |
article = {'url': url, 'archive': a , 'starred': f, 'access_token': access} | |
r = requests.post('{}/api/entries.json'.format(W_HOST), article) | |
M.store(num, '+FLAGS', '\\Deleted') | |
print 'Finish sending URLs' | |
print 'Next message\n' | |
#Mailbox connection | |
print('Connect to email server') | |
try: | |
# rv, data = M.login(EMAIL_ACCOUNT, getpass.getpass()) | |
rv, data = M.login(EMAIL_ACCOUNT, P) | |
except imaplib.IMAP4.error: | |
print "LOGIN FAILED!!! " | |
sys.exit(1) | |
print rv, data | |
#List mailboxes - use for debug purposes | |
#print('List mailboxes') | |
#rv, mailboxes = M.list() | |
#if rv == 'OK': | |
# print "Mailboxes:" | |
# print mailboxes | |
#Move to email folder and process | |
print('Move to email folder to process') | |
rv, data = M.select(EMAIL_FOLDER) | |
if rv == 'OK': | |
print "Processing mailbox...\n" | |
process_mailbox(M) | |
print "Processing finished" | |
M.expunge() | |
M.close() | |
else: | |
print "ERROR: Unable to open mailbox ", rv | |
#Log out of email server | |
print('Log out of email server') | |
M.logout() | |
print('#################') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment