Last active
December 19, 2015 17:39
-
-
Save monokrome/5992936 to your computer and use it in GitHub Desktop.
Search within a mailbox for iTunes receipts, and write a JSON representation of all iTunes receipts as 'receipts.json'.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import mailbox | |
mailbox_root = 'your_mailbox' | |
mailbox_type = mailbox.Maildir | |
########################################### | |
########################################### | |
## Configuration section finished. ## | |
########################################### | |
########################################### | |
import json | |
import os | |
import re | |
import sys | |
import hashlib | |
def dictize(root, key, value): | |
node = root | |
parts = re.split(r'\s+', key) | |
for part in parts: | |
if part not in node: | |
if parts.index(part) == len(parts) - 1: | |
node[part] = value | |
else: | |
node[part] = {} | |
node = node[part] | |
node = value | |
def build_cache(): | |
box = mailbox_type(mailbox_root) | |
def is_itunes_message(message): | |
if 'from' in message: | |
return 'itunes.com' in message['from'] | |
def is_receipt(message): | |
return 'receipt' in message['subject'] | |
def related_messages(box): | |
for identifier, message in box.iteritems(): | |
if is_itunes_message(message): | |
if is_receipt(message): | |
yield identifier, message | |
for identifier, message in related_messages(box): | |
f = open('.cache/{0}.json'.format(identifier), 'w') | |
result = { | |
'body': message.fp.read().replace('"', '\\"'), | |
'from': message['from'], | |
'subject': message['subject'], | |
} | |
f.write(json.dumps(result)) | |
f.close() | |
class Receipt(object): | |
def to_dict(self): | |
return { | |
'billing': self.billing, | |
'order': self.order, | |
'items': self.items, | |
'totals': self.totals, | |
} | |
@classmethod | |
def make_address(cls, data): | |
firstBreakIndex = data[3:].index('') + 4 | |
secondBreakIndex = data[firstBreakIndex:].index('') + firstBreakIndex | |
return data[secondBreakIndex:], { | |
'street': ' '.join(data[3:firstBreakIndex]), | |
'area': ' '.join(data[firstBreakIndex:secondBreakIndex]) | |
} | |
@classmethod | |
def order_information(cls, data): | |
results = {} | |
for index in xrange(len(data)): | |
datum = data[index] | |
try: | |
datum.index('Item') | |
return data[index:], results | |
except ValueError: | |
if datum == '': | |
continue | |
datum = re.sub(r'^(\s+)', '', datum) | |
datum = re.sub(r'(\s+)$', '', datum) | |
key, value = datum.split(': ') | |
if '=' in value: | |
value, _ = value.split('=') | |
dictize(results, key, value) | |
return results | |
@classmethod | |
def line_items(cls, data): | |
results = [] | |
currentLine = '' | |
for index in xrange(len(data)): | |
line = data[index] | |
if line == '': | |
continue | |
if line[-1] == '=': | |
currentLine = currentLine + line[:len(line)-1] | |
else: | |
line = currentLine + line | |
currentLine = '' | |
parts = re.split('\s\s\s+', line) | |
# Ignore headers | |
if parts[0] == 'Item': | |
headers = parts | |
continue | |
if parts[0][0:4] == '----': | |
continue | |
if parts[0] == '': | |
break | |
result = {} | |
for index in xrange(len(parts)): | |
category = headers[index].lower() | |
if category == 'type' and parts[index].strip()[0] == '$': | |
category = 'unit price' | |
result[category] = parts[index] | |
results.append(result) | |
return data[index-1:], results | |
@classmethod | |
def totals(cls, data): | |
results = {} | |
currentLine = '' | |
for index in xrange(len(data)): | |
line = data[index] | |
if line == '' or line[0] == '-': | |
continue | |
if line[-1] == '=': | |
currentLine = currentLine + line[:len(line)-1] | |
else: | |
line = currentLine + line | |
line = re.sub(r'^(\s+)', '', line) | |
currentLine = '' | |
key, value = re.split(':\s+', line) | |
dictize(results, key, value) | |
return data, results | |
@classmethod | |
def factory(cls, report): | |
data = report.split('\n') | |
receipt = cls() | |
receipt.billing = { | |
'email': data[0], | |
'name': data[1], | |
} | |
data, address = cls.make_address(data) | |
receipt.billing['address'] = address | |
while data[0] == '': | |
data = data[1:] | |
data, receipt.order = cls.order_information(data) | |
data, receipt.items = cls.line_items(data) | |
data, receipt.totals = cls.totals(data) | |
return receipt | |
class ReceiptParser(object): | |
def parse(self, receipt): | |
body = receipt['body'] | |
exclusion = body[body.index('Apple Receipt'):] | |
exclusion = exclusion[:exclusion.index('<!')] | |
exclusion = exclusion[:exclusion.index('Please retain for your records.')] | |
report = '\n'.join(exclusion.split('\n')[5:-3]) | |
return Receipt.factory(report) | |
parser = ReceiptParser() | |
def receipts(): | |
hashes = [] | |
for filename in os.listdir('.cache'): | |
hasher = hashlib.md5() | |
contents = open('.cache/{0}'.format(filename), 'r').read() | |
try: | |
data = json.loads(contents) | |
except ValueError: | |
print('Invalid JSON in {0}'.format(filename)) | |
hasher.update(contents) | |
current_hash = hasher.digest() | |
if not current_hash in hashes: | |
hashes.append(current_hash) | |
yield parser.parse(data), data['subject'], data['from'] | |
def main(): | |
data = [] | |
if len(sys.argv) > 1 and sys.argv[1] == 'reparse': | |
print('Reparsing maildir. This could take quite a while.') | |
os.remove('.cache') | |
build_cache() | |
for receipt, subject, from_address in receipts(): | |
result = receipt.to_dict() | |
result['subject'] = subject | |
result['from'] = from_address | |
data.append(result) | |
open('receipts.json', 'w').write(json.dumps({ | |
"receipts": data | |
}, indent=2)) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import json | |
import csv | |
receipt_data = open('receipts.json', 'r') | |
receipts = json.load(receipt_data)['receipts'] | |
output_file = open('orders.csv', 'w') | |
def normalize(receipt): | |
results = [] | |
for item in receipt['items']: | |
result = { | |
'order number': receipt['order']['Order']['Number'], | |
'item': item['item'], | |
'price': '', | |
'type': '', | |
'artist': '', | |
'ordered date': receipt['order']['Receipt']['Date'], | |
'ordered via': receipt['order']['Billed']['To'], | |
} | |
if 'unit price' in item: | |
result['price'] = item['unit price'] | |
if 'artist' in item: | |
result['artist'] = item['artist'] | |
if 'type' in item: | |
result['type'] = item['type'] | |
results.append(result) | |
return results | |
headers = [ | |
'Order Number', | |
'Item', | |
'Price', | |
'Type', | |
'Artist', | |
'Ordered Date', | |
'Ordered Via', | |
] | |
data = [] | |
writer = csv.writer(output_file, dialect='excel') | |
writer.writerow(headers) | |
print('') | |
rows = [] | |
for receipt in receipts: | |
for data in normalize(receipt): | |
row = [] | |
for header in headers: | |
row.append(data[header.lower()]) | |
rows.append(row) | |
for row in rows: | |
writer.writerow(row) | |
print('{0} created with {1} items.'.format('orders.csv', len(rows))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment