Last active
December 18, 2015 18:59
-
-
Save jernoble/5829444 to your computer and use it in GitHub Desktop.
This module allows you to ingest CSV data exported by Mint.com and perform queries on the resulting Sqlite DB.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
# This module allows you to ingest CSV data exported by Mint.com and | |
# perform queries on the resulting Sqlite DB. | |
# | |
# Usage: | |
# >>> from MintParser import MintParser | |
# >>> m = MintParser('transactions.csv.txt') | |
import csv | |
import datetime | |
import sqlite3 | |
import sys | |
class MintParser: | |
def __init__(self, filename): | |
with open(filename) as csvfile: | |
reader = csv.reader(csvfile) | |
fieldNames = [name.replace(' ', '_') for name in reader.next()] | |
mandatoryFieldNames = ['Transaction_Type', 'Amount', 'Date', 'Category'] | |
if not set(mandatoryFieldNames).issubset(set(fieldNames)): | |
raise Exception('Error: the first row in the input file does not contain the necessary header fields.') | |
dateIndex = fieldNames.index('Date') | |
con = sqlite3.connect(":memory:") | |
con.isolation_level = None | |
self.cur = con.cursor() | |
self.cur.execute('CREATE TABLE transactions(' + ','.join(fieldNames) + ')') | |
for row in reader: | |
# sqlite3 does not understand dates in the format mm/dd/yyyy, so convert them to | |
# yyyy-mm-dd, which allows us to use the built in date functions in SELECT statements. | |
row[dateIndex] = datetime.datetime.strptime(row[dateIndex], '%m/%d/%Y').strftime('%Y-%m-%d') | |
columns = ','.join(fieldNames) | |
values = ','.join(['?' for x in fieldNames]) | |
self.cur.execute('INSERT INTO transactions(%s) VALUES (%s)' % (columns, values), row) | |
def categories(self): | |
return [row[0] for row in self.cur.execute('SELECT DISTINCT Category FROM transactions').fetchall()] | |
def listCategories(self): | |
print(', '.join(self.categories())) | |
def totalCategories(self, categories): | |
conditions = " OR ".join(["Category = ?" for category in categories]) | |
total = self.cur.execute("SELECT total(CASE WHEN Transaction_Type = 'debit' THEN -Amount ELSE Amount END) \ | |
FROM transactions WHERE %s" % conditions, categories).fetchone()[0] | |
print total | |
def tabulateCategories(self, categories): | |
conditions = " OR ".join(["Category = ?" for category in categories]) | |
sums = self.cur.execute("SELECT DISTINCT strftime('%%Y-%%m', Date) AS month, \ | |
total(CASE WHEN Transaction_Type = 'debit' THEN -Amount ELSE Amount END) \ | |
FROM transactions WHERE %s GROUP BY MONTH ORDER BY MONTH ASC" % conditions, | |
categories).fetchall() | |
print "\t".join([row[0] for row in sums]); | |
print "\t".join([("%.2f" % float(row[1])) for row in sums]); | |
def tabulateEverything(self): | |
for category in self.categories(): | |
print category | |
self.tabulateCategories([category]) | |
def breakdownCategories(self, categories): | |
conditions = " OR ".join(["Category = ?" for category in categories]) | |
sums = self.cur.execute("SELECT DISTINCT strftime('%%Y-%%m', Date) AS month, \ | |
total(case when Transaction_Type = 'debit' then -Amount else Amount end) \ | |
FROM transactions WHERE %s GROUP BY MONTH ORDER BY MONTH ASC" % conditions, | |
categories).fetchall() | |
for row in sums: | |
print '%s: %.2f' % (row[0], float(row[1])) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment