Skip to content

Instantly share code, notes, and snippets.

@bboe
Last active August 29, 2015 14:23
Show Gist options
  • Save bboe/939352acee87fa7646eb to your computer and use it in GitHub Desktop.
Save bboe/939352acee87fa7646eb to your computer and use it in GitHub Desktop.
Email image attachment download (POP3 support only for now)
#!/usr/bin/env python
from __future__ import print_function
from functools import wraps
from getpass import getpass
from hashlib import sha512
from poplib import POP3_SSL, error_proto
import email
import os
import socket
import sys
def file_hash(filepath):
fhash = sha512()
with open(filepath, 'rb') as fp:
chunk = True
while chunk:
chunk = fp.read(4096)
fhash.update(chunk)
return fhash.digest()
def handle_pop_exception(msg=None):
def factory(function):
@wraps(function)
def wrapped(obj):
try:
return function(obj)
except error_proto:
if msg:
print(msg)
obj.quit()
return wrapped
return factory
def prompt(msg):
sys.stdout.write(msg)
sys.stdout.flush()
return sys.stdin.readline()[:-1]
def username_prompt():
username = prompt('Username: ')
while not username:
print('Not a username. Try again.')
username = prompt('Username: ')
return username
class POPEmail(object):
def __init__(self, host, port=None, username=None, password=None):
self.host = host
self.port = port
self.username = username
self.password = password
self.pop = None
def __enter__(self):
self.authenticate()
return self
def __exit__(self, _type, _value, _traceback):
self.quit()
def connect(self):
self.pop = POP3_SSL(self.host, *([self.port] if self.port else []))
def count(self):
return self.pop.stat()[0]
def authenticate(self):
authenticated = False
while not authenticated:
self.connect()
authenticated = self.set_username() and self.set_password()
def quit(self):
try:
self.pop.quit()
except socket.error:
pass
def messages(self, debug=False):
for i in range(1, self.count() + 1):
message_raw = b'\n'.join(
(self.pop.top(i, 131072) if debug else self.pop.retr(i))[1])
if sys.version_info >= (3,):
yield email.message_from_bytes(message_raw)
else:
yield email.message_from_string(message_raw)
@handle_pop_exception('Invalid username or password. Try again.')
def set_password(self):
password = self.password or getpass()
self.password = None # Prompt after the first attempt.
return bool(self.pop.pass_(password))
@handle_pop_exception('Invalid username. Try again.')
def set_username(self):
return bool(self.pop.user(self.username or username_prompt()))
class ImageDownloader(object):
@staticmethod
def filepath(filepath, new_hash):
path, ext = os.path.splitext(filepath)
count = 0
while os.path.isfile(filepath):
if new_hash == file_hash(filepath):
sys.stdout.write('d')
return None
count += 1
sys.stdout.write('r')
filepath = '{0}_{1}{2}'.format(path, count, ext)
sys.stdout.write('+')
return filepath
def __init__(self, email_handler, directory):
self.email_handler = email_handler
if not os.path.isdir(directory):
os.mkdir(directory, 0o700)
self.directory = directory
def download(self, part):
assert part.get('Content-Disposition')
body = part.get_payload(decode=True)
filepath = self.filepath(os.path.join(
self.directory, part.get_filename()), sha512(body).digest())
sys.stdout.flush()
if not filepath:
return 0
with open(filepath, 'wb') as fp:
fp.write(body)
return 1
def download_attachments(self, debug=False):
count = 0
with self.email_handler as handler:
print('There are {0} new messages.'.format(handler.count()))
for message in handler.messages(debug=debug):
count += self.process(message)
print('\nDownloaded {0} attachment{1}.'.format(
count, '' if count == 1 else 's'))
return count
def process(self, message):
if message.get_content_maintype() != 'multipart':
assert message.get_content_maintype() == 'text'
return 0
count = 0
for part in message.walk():
if not part.get_content_maintype() == 'image':
continue
count += self.download(part)
return count
def main():
downloader = ImageDownloader(POPEmail('pop.gmail.com'), 'images')
downloader.download_attachments(debug=True)
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment