Skip to content

Instantly share code, notes, and snippets.

@aptgetupgrade
Last active January 1, 2024 11:18
Show Gist options
  • Save aptgetupgrade/3a5eb63f40496122ee547be670a1d8cc to your computer and use it in GitHub Desktop.
Save aptgetupgrade/3a5eb63f40496122ee547be670a1d8cc to your computer and use it in GitHub Desktop.
code to poll emails from an IMAP email server and post these to wallabag
# Read email in inbox, look for any URLs and update them to wallabag, delete email if URLs found in it.
# Built from various internet sources. Final code donated under GPL v3 by Github user aptgetupgrade
# Tested using python 2. Could be run as part of a crontab, e.g. every 5 mins.
#
import sys
import imaplib
import getpass
import email
import email.header
import datetime
import HTMLParser
import re
import requests
EMAIL_ACCOUNT = "your@account"
EMAIL_FOLDER = "INBOX"
M = imaplib.IMAP4_SSL('your.imap.server')
P = "The_password_of_your_imap_account"
W_HOST = 'https://yourwallabag.install/wallabag_root_directory_if_applicable'
W_USERNAME = 'wallabag_user'
W_PASSWORD = 'password_of_wallabag_user'
W_CLIENTID = 'secret_client_id'
W_SECRET = 'secret_secret'
script_name = 'email_parse_url.py'
print('#################')
print('Start of execution of script ' + script_name + ' : ' + str(datetime.datetime.now()))
###################
######## Initialisation of this script's variables
###################
#Initialisation of Wallabag connection
print('Request wallabag token')
gettoken = {'username': W_USERNAME, 'password': W_PASSWORD, 'client_id': W_CLIENTID, 'client_secret': W_SECRET, 'grant_type': 'password'}
r = requests.get('{}/oauth/v2/token'.format(W_HOST), gettoken)
access = r.json().get('access_token')
#Mailbox scan procedure
def process_mailbox(M):
rv, data = M.search(None, "ALL")
if rv != 'OK':
print "No messages found!"
return
for num in data[0].split():
print 'Processing email #' + str(num)
rv, data = M.fetch(num, '(RFC822)')
if rv != 'OK':
print "ERROR getting message", num
return
msg = email.message_from_string(data[0][1])
decode = email.header.decode_header(msg['Subject'])[0]
subject = unicode(decode[0])
print 'Message %s: %s' % (num, subject)
print 'Raw Date:', msg['Date']
# Now convert to local date-time
date_tuple = email.utils.parsedate_tz(msg['Date'])
if date_tuple:
local_date = datetime.datetime.fromtimestamp(
email.utils.mktime_tz(date_tuple))
print "Local Date:", \
local_date.strftime("%a, %d %b %Y %H:%M:%S")
#Print the whole message - use for debug purposes
#print "PrintMessageBegin#############################################################################"
#print "Message: ", str(data[0][1])
#print "PrintMessageEnd###############################################################################"
body = ""
print 'Extracting the body from the message'
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
cdispo = str(part.get('Content-Disposition'))
# skip any text/plain (txt) attachments
if ctype == 'text/plain' and 'attachment' not in cdispo:
body = part.get_payload(decode=True) # decode
break
# not multipart - i.e. plain text, no attachments, keeping fingers crossed
else:
body = msg.get_payload(decode=True)
#In case you need a HTML parser
#class parseLinks(HTMLParser.HTMLParser):
# def handle_starttag(self, tag, attrs):
# if tag == 'a':
# for name, value in attrs:
# if name == 'href':
# print value
# print self.get_starttag_text()
#someHtmlContainingLinks = body
#linkParser = parseLinks()
#linkParser.feed(someHtmlContainingLinks)
print 'Extracting any URLs from the body'
url_list = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\), ]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', body)
print url_list
if len(url_list) == 0:
print 'Nothing to do, the message does not contain any URL'
else:
print 'Send URLs found to wallabag'
a = 0 # should the article be already read? 0 or 1
f = 0 # should the article be added as favorited? 0 or 1
for urlno in range(len(url_list)):
url = url_list[urlno]
print 'Sending URL #' + str(urlno + 1) + ' - ' + url
article = {'url': url, 'archive': a , 'starred': f, 'access_token': access}
r = requests.post('{}/api/entries.json'.format(W_HOST), article)
M.store(num, '+FLAGS', '\\Deleted')
print 'Finish sending URLs'
print 'Next message\n'
#Mailbox connection
print('Connect to email server')
try:
# rv, data = M.login(EMAIL_ACCOUNT, getpass.getpass())
rv, data = M.login(EMAIL_ACCOUNT, P)
except imaplib.IMAP4.error:
print "LOGIN FAILED!!! "
sys.exit(1)
print rv, data
#List mailboxes - use for debug purposes
#print('List mailboxes')
#rv, mailboxes = M.list()
#if rv == 'OK':
# print "Mailboxes:"
# print mailboxes
#Move to email folder and process
print('Move to email folder to process')
rv, data = M.select(EMAIL_FOLDER)
if rv == 'OK':
print "Processing mailbox...\n"
process_mailbox(M)
print "Processing finished"
M.expunge()
M.close()
else:
print "ERROR: Unable to open mailbox ", rv
#Log out of email server
print('Log out of email server')
M.logout()
print('#################')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment