Skip to content

Instantly share code, notes, and snippets.

@eliheady
Created October 16, 2017 13:25
Show Gist options
  • Save eliheady/83cc595c2c5843d8fe25c231c52ef4a4 to your computer and use it in GitHub Desktop.
Save eliheady/83cc595c2c5843d8fe25c231c52ef4a4 to your computer and use it in GitHub Desktop.
get attachments from gmail messages
#!/usr/bin/env python
# collect attachments from messages identified by label and date
# taken from Gmail API Python Quickstart
from __future__ import print_function
import httplib2
import os
import sys
import time
from StringIO import StringIO
import base64
import csv
import pprint
from apiclient import discovery
from oauth2client import client
from oauth2client import tools
from oauth2client.file import Storage
try:
import argparse
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args()
except ImportError:
flags = None
# If modifying these scopes, delete your previously saved credentials
# at ~/.credentials/gmail-python-quickstart.json
SCOPES = 'https://www.googleapis.com/auth/gmail.readonly'
CLIENT_SECRET_FILE = 'secret.json'
APPLICATION_NAME = 'Gmail API Python Quickstart'
USER_ID = 'me'
MESSAGE_SELECTOR = 'subject'
MESSAGE_SUBJECTS = ['foo', 'bar']
FILE_NAMES = []
class Message:
def __init__(self, message, api):
self.api = api
self.msg = message
self.headers = []
self.attachmentIds = []
self.msgId = message['id']
self.timestamp = ""
self.uid = USER_ID
def parse(self):
self.msgparts = self.msg.get('payload').get('parts')
if self.msg.get('payload'):
self.headers = self.msg['payload']['headers']
self.get_attachments()
if self.msg.get('internalDate'):
self.timestamp = self.msg['internalDate']
self.attachmentIds = [a.get('body')
for a in self.msgparts if a.get('body').get('size') > 0]
def dump_message(self):
pprint.pprint(self.msg)
def dump_headers(self):
for h in self.headers:
if h['name'] == 'MIME-Version':
print('\n')
print('{name: <{fill}}: {value}'.format(
name=h['name'], value=h['value'], fill='16'))
def get_attachments(self):
for a in attachments:
data = self.api.users().messages().attachments().get(
userId=self.uid,
id=a['attachmentId'],
messageId=self.msgId).execute()
yield data
def decode_attachment(self, attachment):
return base64.urlsafe_b64decode(attachment['data'].encode('UTF-8'))
def write_record(self, record_json):
pass
def limit_date(start_date, end_date, iterable):
pass
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'gmail-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def parse_args():
parser = argparse.ArgumentParser(description='Report importer')
parser.add_argument('--from-date', action="store_true", default=False)
parser.add_argument('from_date', action="store")
parser.add_argument('--to-date', action="store_true", default=False)
parser.add_argument('to_date', action="store")
parser.add_argument('--label', action="store", help="Label to search for
messages")
args = parser.parse_args()
def main():
"""
"""
DATE_FORMAT = "%Y-%m-%d"
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
api = discovery.build('gmail', 'v1', http=http)
# ensure provided label exists
labels = api.users().labels().list(userId='me').execute()
if args.label not in labels:
print("The label given was not found")
sys.exit(1)
# if from_date or to_date are provided, use those
# otherwise use epoch start and now
if not args.to_date:
to_date = int(time.time())
else:
to_date = int(time.mktime(time.strptime(args.to_date, DATE_FORMAT)))
if not args.from_date:
from_date = 0
else:
from_date = int(time.mktime(time.strptime(args.from_date, DATE_FORMAT)))
# collect messages
message_list = api.users().messages().list(userId=USER_ID,
q="label:%s" % args.label).execute()
message_ids = []
if 'messages' in message_list:
message_ids.extend(message_list['messages'])
# paging would be a good idea
messages = []
for m in message_ids:
message = api.users().messages().get(
userId=USER_ID,
id=m['id']).execute()
if from_date < message['internalDate'] < to_date:
messages.append(message)
for m in messages:
r = Message(message, api)
r.parse()
r.dump_headers()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment