Last active
October 9, 2017 12:11
-
-
Save wandersoncferreira/430153393c999f254acff363c8c94b31 to your computer and use it in GitHub Desktop.
Python file to download gmails to notmuch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# (C) Copyright 2012 Jason A. Donenfeld <[email protected]>. All Rights Reserved. | |
# We need to use Python 2.7 in order to make this work! - Bartuka | |
# This program is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation; either version 2 of the License, or | |
# (at your option) any later version. | |
# ============================== | |
# == Gmail-->Notmuch Importer == | |
# == == | |
# == Work in progress. == | |
# == by zx2c4 == | |
# == == | |
# ============================== | |
# $ ./gmail-notmuch.py -u jason.donenfeld -p Shien8Boh2vah | |
# Logging in... | |
# Selecting all mail... | |
# Receiving message list: 135126 of 135126|##################################################|100% Time: 0:00:52 2.56 kemails/s | |
# Parsing message list and labels... | |
# Searching for local messages... | |
# Retagging local messages: 135124 of 135124|################################################|100% Time: 0:00:13 10.39 kemails/s | |
# Downloading messages: 2 of 2|##############################################################|100% Time: 0:00:00 5.12 emails/s | |
# Interrupted imports will automatically resume from where they left off. | |
import os | |
from imaplib import IMAP4_SSL | |
from optparse import OptionParser | |
import sys | |
import os.path | |
import os | |
import shlex | |
import re | |
import notmuch | |
from progressbar import * | |
def main(): | |
parser = OptionParser(usage="%prog --username/-u USERNAME --password/-p PASSWORD --silent/-s --debug/-d", description="Slurps gmail messages with labels into a notmuch maildir.") | |
parser.add_option("-u", "--username", action="store", type="string", metavar="USERNAME", help="Gmail username") | |
parser.add_option("-p", "--password", action="store", type="string", metavar="PASSWORD", help="Gmail password") | |
parser.add_option("-d", "--debug", action="store_true", dest="debug", default=False, help="Imap debugging output") | |
parser.add_option("-s", "--silent", action="store_true", dest="silent", default=False, help="Do not show progress information") | |
(options, args) = parser.parse_args() | |
if options.username is None or options.password is None: | |
parser.error("Username and password are required.") | |
if "@" not in options.username: | |
options.username += "@gmail.com" | |
if options.silent: | |
os.close(1) | |
try: | |
# Create should be True, but there's a bug at the moment. | |
database = notmuch.Database(None, False, notmuch.Database.MODE.READ_WRITE) | |
except notmuch.NotmuchError as e: | |
print(str(e)) | |
sys.exit("You must create the notmuch database before running this program.") | |
if database.needs_upgrade(): | |
database.upgrade() | |
destination = database.get_path() | |
for directory in ["cur", "new", "tmp"]: | |
try: | |
os.mkdir(destination + "/" + directory) | |
except: | |
pass | |
imap, total = login(options) | |
messages = discover_messages(imap, total) | |
if len(messages) == 0: | |
print("Discovered no messages!") | |
logout(imap) | |
sys.exit(0) | |
new_messages = retag_old_messages(database, messages, destination) | |
if len(new_messages) == 0: | |
print("Discovered no new messages!") | |
logout(imap) | |
sys.exit(0) | |
try: | |
imap.noop() | |
except IMAP4_SSL.abort: | |
print("Server disconnected us.") | |
imap, total = login(options) | |
download_new_messages(imap, database, new_messages, destination) | |
database.close() | |
logout(imap) | |
def login(options): | |
print("Logging in...") | |
imap = IMAP4_SSL("imap.gmail.com") | |
if options.debug: | |
imap.debug = 10 | |
imap.login(options.username, options.password) | |
print("Selecting all mail...") | |
typ, data = imap.xatom("XLIST", "", "*") | |
if typ != "OK": | |
sys.exit("Could not discover all mail.") | |
allmail = None | |
for label in imap.untagged_responses["XLIST"]: | |
if b"\\AllMail" in label: | |
last_quote = label.rfind("\"") | |
penultimate_quote = label.rfind("\"", 0, last_quote) + 1 | |
allmail = label[penultimate_quote:last_quote] | |
if allmail is None: | |
sys.exit("Could not parse all mail.") | |
typ, data = imap.select("\"" + allmail + "\"", True) | |
if typ != "OK": | |
sys.exit("Could not select all mail.") | |
return imap, int(data[0]) | |
def discover_messages(imap, total): | |
parser = re.compile(r'([0-9]+) [(]X-GM-MSGID ([0-9]+) X-GM-LABELS [(](.*)[)] FLAGS [(](.*)[)][)]') | |
old_readline = imap.readline | |
def new_readline(self): | |
ret = old_readline() | |
if b"FETCH (X-GM-MSGID " in ret: | |
new_readline.progressbar.update(new_readline.i) | |
new_readline.i += 1 | |
return ret | |
new_readline.i = 1 | |
new_readline.progressbar = create_progressbar("Receiving message list", total).start() | |
imap.readline = new_readline.__get__(imap, imap.__class__) | |
typ, data = imap.fetch("1:*", "(FLAGS X-GM-LABELS X-GM-MSGID)") | |
new_readline.progressbar.finish() | |
imap.readline = old_readline | |
new_messages = [] | |
if typ != "OK": | |
sys.exit("Failed to discover new messages: %s" % typ) | |
print("Parsing message list and labels...") | |
for response in data: | |
imap_seq, gmail_id, labels, flags = parser.search(str(response)).groups() | |
labels = filter_labels(shlex.split(labels, False, True) + flags.split(" ")) | |
new_messages.append((gmail_id, imap_seq, labels)) | |
return new_messages | |
def tag_message(database, filename, labels): | |
message = None | |
try: | |
message = database.find_message_by_filename(filename) | |
if message is None: | |
database.begin_atomic() | |
message = database.add_message(filename, False)[0] | |
else: | |
if set(labels) == set(message.get_tags()): | |
message.tags_to_maildir_flags() | |
return | |
database.begin_atomic() | |
message.freeze() | |
message.remove_all_tags(False) | |
for tag in labels: | |
message.add_tag(tag, False) | |
message.thaw() | |
database.end_atomic() | |
message.tags_to_maildir_flags() | |
except Exception as e: | |
if message is not None: | |
database.remove_message(message) | |
database.end_atomic() | |
raise e | |
def create_progressbar(text, total): | |
return ProgressBar(maxval=total, widgets=[text + ": ", SimpleProgress(), Bar(), Percentage(), " ", ETA(), " ", FileTransferSpeed(unit="emails")]) | |
def retag_old_messages(database, messages, destination): | |
print("Searching for local messages...") | |
old_messages = { os.path.basename(filename[0:filename.rfind(".gmail")]): destination + "/cur/" + filename for filename in os.listdir(destination + "/cur/") if ".gmail" in filename } | |
if len(old_messages) == 0: | |
return messages | |
new_messages = [] | |
i = 1 | |
progressbar = create_progressbar("Retagging local messages", len(old_messages)) | |
progressbar.start() | |
for gmail_id, imap_seq, labels in messages: | |
if gmail_id in old_messages: | |
tag_message(database, old_messages[gmail_id], labels) | |
progressbar.update(i) | |
i += 1 | |
else: | |
new_messages.append((gmail_id, imap_seq, labels)) | |
progressbar.finish() | |
return new_messages | |
def download_new_messages(imap, database, messages, destination): | |
i = 1 | |
progressbar = create_progressbar("Downloading messages", len(messages)) | |
progressbar.start() | |
for gmail_id, imap_seq, labels in messages: | |
temp = destination + "/tmp/" + str(gmail_id) + ".gmail" | |
dest = destination + "/new/" + str(gmail_id) + ".gmail" | |
if not os.path.exists(dest): | |
typ, data = imap.fetch(str(imap_seq), "RFC822") | |
if typ != "OK": | |
sys.exit("Failed to download message gmail-%d/imap-%d" % (gmail_id, imap_seq)) | |
f = open(temp, "w") | |
f.write(data[0][1]) | |
f.flush() | |
os.fsync(f.fileno()) | |
f.close() | |
os.link(temp, dest) # Because DJB says so... | |
os.unlink(temp) | |
tag_message(database, dest, labels) | |
progressbar.update(i) | |
i += 1 | |
progressbar.finish() | |
def filter_labels(labels): | |
translation = { "\\Inbox": "inbox", | |
"\\Drafts": "draft", | |
"\\Sent": "sent", | |
"\\Spam": "spam", | |
"\\Starred": "flagged", | |
"\\Trash": "deleted", | |
"\\Answered": "replied", | |
"\\Flagged": "flagged", | |
"\\Draft": "draft", | |
"\\Deleted": "deleted", | |
"\\Seen": "!read!", | |
"\\Important": None, # I realize this is controversial, but I hate the priority inbox. | |
"\\Muted": None, # I also don't intend to use the muted idea going forward. | |
"Junk": "spam", | |
"NonJunk": None } | |
ret = set() | |
for label in labels: | |
if label in translation: | |
if translation[label] is None: | |
continue | |
ret.add(translation[label]) | |
else: | |
ret.add(label) | |
if "!read!" in ret: | |
ret.remove("!read!") | |
else: | |
ret.add("unread") | |
if "" in ret: | |
ret.remove("") | |
return ret | |
def logout(imap): | |
imap.close() | |
imap.logout() | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
print("") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment