Last active
July 31, 2021 06:46
-
-
Save erh/6029922 to your computer and use it in GitHub Desktop.
This is a very simple python program that reads your email inbox and generates a histogram of how much email is unread in your inbox.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/usr/bin/env python | |
### | |
### This generates a very simple histogram of unread e-mail in your INBOX. | |
### This is meant as a toy, and your are free to do whatever you want with the code. | |
### | |
# core python libraries | |
import datetime | |
import getpass | |
import imaplib | |
import re | |
import rfc822 | |
import sys | |
import time | |
# mongo libraries | |
# the pymongo package includes bson | |
import bson | |
import pymongo | |
# optional- keyring | |
imapHost = "imap.gmail.com" | |
imapUser = None | |
imapPassword = None | |
class imapclient: | |
def __init__(self,host,user,secure=True,pwd=None,cache=False): | |
self.host = host | |
self.user = user | |
self.pwd = pwd | |
if self.pwd is None: | |
try: | |
import keyring | |
pwd = keyring.get_password( host , user ) | |
print( pwd ) | |
except Exception,e: | |
print( "can't get password from keyring: " + str(e) ) | |
if pwd is None: | |
pwd = getpass.getpass() | |
try: | |
import keyring | |
keyring.set_password( host , user , pwd ) | |
except Exception,e: | |
print( "can't save password: " + str(e) ) | |
if secure: | |
self.mailbox = imaplib.IMAP4_SSL( host , 993 ) | |
else: | |
self.mailbox = imaplib.IMAP4( host ) | |
self.mailbox.login( user , pwd ) | |
self.select( "INBOX" ) | |
self.cache = None | |
if cache: | |
self.cache = pymongo.Connection().mail_cache.raw | |
def _parse(self,res): | |
if res[0] != "OK": | |
raise Exception( "error: %s" % str(res[0]) ) | |
return res[1] | |
def select(self,name,readonly=True): | |
self.mailbox.select( name , readonly=readonly ) | |
self.folder = name | |
def list(self): | |
res = self.mailbox.uid( "search" , "ALL" ) | |
return res[1][0].split() | |
def _parse_headered( self , txt ): | |
headers = {} | |
prev = "" | |
while True: | |
line,end,txt = txt.partition( "\n" ) | |
line = line.replace( "\r" , "" ) | |
if len(line) == 0: | |
break | |
if line[0].isspace(): | |
prev += "\n" + line | |
continue | |
if len(prev) > 0: | |
self._add_header( headers , prev ) | |
prev = line | |
self._add_header( headers , prev ) | |
for x in headers: | |
if len(headers[x]) == 1: | |
headers[x] = headers[x][0] | |
return ( headers , txt ) | |
def _add_header( self , headers , line ): | |
line = line.rstrip() | |
if len(line) == 0: | |
return | |
name,temp,value = line.partition( ":" ) | |
name = name.lower() | |
value = value.strip() | |
value = self._cleanSingleHeader( name , value ) | |
if name in headers: | |
headers[name].append( value ) | |
else: | |
headers[name] = [ value ] | |
def _convert_raw( self, txt ): | |
try: | |
headers , body = self._parse_headered( txt ) | |
return { "headers" : headers , "body" : body } | |
except: | |
print( "couldn't parse" ) | |
print( txt ) | |
raise | |
def _cleanID(self,foo): | |
foo = foo.lower(); | |
foo = foo.strip(); | |
if foo.count( "<" ) != 1 or foo.count( ">") != 1: | |
if foo.count( " " ): | |
raise Exception( "bad id [%s]" , foo ) | |
return foo | |
foo = foo.partition( "<" )[2] | |
foo = foo.partition( ">" )[0] | |
return foo | |
def _cleanSingleHeader(self,name,value): | |
if name == "message-id": | |
return self._cleanID( value ) | |
if name == "to": | |
return [ z.strip() for z in value.split( "," ) ] | |
if name == "references": | |
return [ self._cleanID( x ) for x in re.split( "\s+" , value.lower() ) ] | |
if name == "in-reply-to": | |
try : | |
return self._cleanID( value ) | |
except: | |
print( "bad id [%s]" % value ) | |
return value | |
if name == "date": | |
t = rfc822.parsedate( value ) | |
return datetime.datetime.fromtimestamp( time.mktime( t ) ) | |
return value | |
def get_cache(self): | |
return self.cache | |
def get_id(self,uid): | |
return self.host + "-" + self.user + "-" + self.folder + "-" + str(uid) | |
def fetch(self,uid,headerOnly=False): | |
key = self.get_id(uid) | |
data = None | |
if self.cache: | |
data = self.cache.find_one( { "_id" : key } ) | |
if data: | |
if data["headerOnly"] == headerOnly: | |
return self._convert_raw( data["data"] ) | |
what = "(RFC822)" | |
if headerOnly: | |
what = "(RFC822.HEADER)" | |
typ, data = self.mailbox.uid( "fetch" , uid, what) | |
if typ != "OK": | |
raise Exception( "failed loading uid: %s typ: %s" % ( str(uid) , str(typ) ) ) | |
if data is None: | |
return None | |
data = data[0] | |
if data is None: | |
return None | |
data = data[1] | |
converted = self._convert_raw( data ) | |
if self.cache: | |
try: | |
self.cache.save( { "_id" : key, | |
"headerOnly" : headerOnly, | |
"headers" : converted["headers"], | |
"data" : bson.binary.Binary( data ) } ) | |
except Exception,e: | |
print( "couldn't save message because of: %s" % e ) | |
return converted | |
def print_histogram(): | |
mailbox = imapclient( imapHost , imapUser , cache=True, pwd=imapPassword) | |
mailbox.select( "INBOX" , False ) | |
last_seen = time.time() | |
all_mail = mailbox.list() | |
done = 0 | |
for uid in all_mail: | |
if done % 10 == 1: | |
print( "%d / %d" % ( done , len(all_mail) ) ) | |
done = done + 1 | |
msg = mailbox.fetch( uid , True ) | |
mailbox.get_cache().update( { "_id" : mailbox.get_id( uid ) }, | |
{ "$set" : { "lastSeen" : last_seen } } ) | |
pipeline = [] | |
pipeline.append( { "$match" : { "lastSeen" : last_seen, | |
"_id" : re.compile( "INBOX" ), | |
"headers.date" : { "$gt" : datetime.datetime.fromtimestamp( last_seen - ( 20 * 86400 ) ) } } } ) | |
p = {} | |
g = {} | |
for x in [ "year", "month", "dayOfMonth"]:#, "hour" ]: | |
p[x] = { "$" + x : "$headers.date" } | |
g[x] = "$" + x | |
pipeline.append( { "$project" : p } ) | |
pipeline.append( { "$group" : { "_id" : g , "total" : { "$sum" : 1 } } } ) | |
pipeline.append( { "$sort" : { "_id" : -1 } } ) | |
res = mailbox.get_cache().aggregate( pipeline ) | |
if res["ok"] == 1: | |
out = open( "histogram.html", "w" ) | |
out.write( "<html><body>" ) | |
now = datetime.datetime.now() | |
for x in res["result"]: | |
when = datetime.datetime( x["_id"]["year"], x["_id"]["month"], x["_id"]["dayOfMonth"] ) | |
delta = now - when | |
out.write( "%d days ago, %d e-mails<br>" % ( delta.days, x["total"] ) ) | |
out.write( "</body></html>" ) | |
out.close() | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
print( "Usage: python %s <imap username>" ) | |
sys.exit(-1) | |
imapUser = sys.argv[1] | |
print_histogram() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment