|
from typing import Any, Dict, List, OrderedDict |
|
import re |
|
from beancount.core.data import Posting, Transaction, Balance |
|
from beancount_import.journal_editor import JournalEditor |
|
from beancount_import.source import LogFunction, Source, SourceResults, SourceSpec |
|
import importlib |
|
import numbers |
|
from beancount.core.number import MISSING, D, ZERO |
|
|
|
def instr(needle): |
|
return lambda hay: needle in hay |
|
|
|
RULES=[ |
|
{ |
|
'criteria': [ |
|
("name", lambda n: re.match(r'^(amazon|AMZNGrcy|AMZN Mktp)', n, re.I) is not None), |
|
], |
|
'attributes': { |
|
'payee': "Amazon", |
|
'account': "Expenses:FIXME", |
|
} |
|
}, |
|
{ |
|
'criteria': [ |
|
("name" , lambda n: re.search(r'uber eats', n, re.I)), |
|
("amount", lambda a: abs(a.number) == D('5') ), |
|
], |
|
'attributes': { |
|
'payee': 'Uber Eats', |
|
'tags': ['tips'], |
|
} |
|
}, |
|
{ # WITHDRAWAL TRANSFER TO LOAN 0001 |
|
'criteria': [ |
|
("name" , lambda n: re.search(r'TRANSFER TO LOAN', n, re.I) is not None), |
|
("account_id", lambda i: i == "pzZp6rN8J3XSJq1Jxq" ), |
|
], |
|
'attributes': { |
|
'payee': 'Auto Loan Payment', |
|
'account': 'Assets:Payments:Auto:ThatCar', |
|
} |
|
}, |
|
|
|
{ 'criteria': [ ("name", instr("WHOLEFDS") ) ], 'attributes': { 'payee': 'Whole Foods' }}, |
|
{ 'criteria': [ ("name", instr("CHIPOTLE") ) ], 'attributes': { 'payee': 'Chipotle', 'narration': '', 'account': "Expenses:Food:Restaurants" }}, |
|
{ 'criteria': [ ("name", instr(" CAVA ") ) ], 'attributes': { 'payee': 'Cava', 'narration': '', 'account': "Expenses:Food:Restaurants" }}, |
|
{ 'criteria': [ ("name", "AUTOPAY PAYMENT - THANK YOU" ) ], 'attributes': { 'payee': "Credit Card Payment" } }, |
|
{ 'criteria': [ ("name", "AUTOMATIC PAYMENT - THANK" ) ], 'attributes': { 'payee': 'Credit Card Payment' } }, |
|
{ 'criteria': [ ("name", "CHASE CREDIT CRD AUTOPAY" ) ], 'attributes': { 'payee': 'Credit Card Payment' } }, |
|
{ 'criteria': [ ("name", instr("YouTubePremium") ) ], 'attributes': { 'payee': "YouTube Premium" , 'narration': None, 'account': 'Expenses:Entertainment:Music' } }, |
|
] |
|
|
|
|
|
def matches_all_criteria( plaid_entry: dict, rule: dict ): |
|
for criteria in rule['criteria']: |
|
attr, logic = criteria |
|
if attr == "plaid": |
|
value = plaid_entry |
|
elif attr == "name": |
|
value = plaid_entry.get( 'plaid_name', plaid_entry.get('plaid_pending_name', plaid_entry.get('ofx_name') ) ) |
|
else: |
|
value = plaid_entry.get( attr ) |
|
|
|
# can't match the criteria if there's no value for the specified attribute |
|
# can't match ALL criteria if one doesn't match |
|
if not value: |
|
return False |
|
|
|
if callable(logic): |
|
match = logic( value ) |
|
elif isinstance(logic, str): |
|
match = ( value.lower() == logic.lower() ) |
|
elif isinstance(logic, numbers.Number): |
|
match = ( value == D(str(value)) ) |
|
else: |
|
raise RuntimeError("Unsupported match criteria type: %s" % type(logic)) |
|
|
|
# can't match ALL criteria if one doesn't match |
|
if not match: |
|
return False |
|
|
|
return True |
|
|
|
def match_rules( posting: Posting ) -> List[Dict[str, Any]]: |
|
entry_dict:OrderedDict[str, Any] = posting.meta |
|
|
|
apply = [] |
|
|
|
for rule in RULES: |
|
match = matches_all_criteria( entry_dict, rule ) |
|
if not match: continue |
|
|
|
apply.append( rule['attributes'] ) |
|
|
|
return apply |
|
|
|
def merge_rule_attributes( to_apply: list ) -> Dict[str, Any]: |
|
merged: Dict[str, Any] = { |
|
'payee': None, |
|
'narration': None, |
|
'account': None, |
|
'tags': set(), |
|
'flag': None, |
|
|
|
} |
|
for apply in to_apply: |
|
for attr, value in apply.items(): |
|
if not attr in merged: |
|
raise RuntimeError("Plaid module: Rule updates unknown attribute [%s]" % attr) |
|
|
|
if isinstance( merged[attr], set ): |
|
if isinstance( value, str ): value = [ value ] |
|
for v in value: |
|
merged[attr].add( v ) |
|
else: |
|
merged[attr] = value |
|
|
|
# remove unset entries so the logic below can use .get(attr, DEFAULT) |
|
return dict([(k,v) for k,v in merged.items() if v is not None]) |
|
|
|
|
|
def load(spec: SourceSpec, log_status: LogFunction): |
|
if not 'wrapped_module' in spec: |
|
raise Exception("ruleswrapper source requires wrapped_module specifying the module being wrapped") |
|
|
|
original_module = importlib.import_module(spec.pop('wrapped_module')) |
|
loaded: Source = original_module.load(spec, log_status) |
|
|
|
old_prepare = loaded.prepare |
|
def new_prepare(journal: JournalEditor, results: SourceResults): |
|
old_prepare(journal, results) |
|
|
|
for result in results.pending: # Type: ImportResult |
|
for i, entry in enumerate(result.entries): # Type: Directive |
|
if type(entry) != Transaction: |
|
# only supports Transactions, although it might make sense |
|
# to extend to other types of Directives |
|
continue |
|
|
|
# current logic assumes a single Posting with an account |
|
# owned by this source, and a single Posting with an account |
|
# not owned by this source - any more than that, and not sure |
|
# how to proceed |
|
owned_postings = list(filter(lambda p: p.account in results.accounts, entry.postings)) |
|
other_postings = list(filter(lambda p: p.account not in results.accounts, entry.postings)) |
|
|
|
if len(owned_postings) != 1 or len(other_postings) != 1: |
|
log_status("Unable to process rules - needed 1 owned posting and 1 other posting, got: %d owned, %d other" % (len(owned_postings), len(other_postings))) |
|
continue |
|
|
|
matches = match_rules( owned_postings[0] ) |
|
if matches: |
|
rule_overrides = merge_rule_attributes( matches ) |
|
|
|
other_postings = [ |
|
Posting( |
|
account = rule_overrides.get('account', other_postings[0].account), |
|
units = other_postings[0].units, |
|
cost = other_postings[0].cost, |
|
price = other_postings[0].price, |
|
flag = other_postings[0].flag, |
|
meta = other_postings[0].meta, |
|
) |
|
] |
|
|
|
result.entries[i] = Transaction( |
|
meta = entry.meta, |
|
date = entry.date, |
|
flag = entry.flag, |
|
payee = rule_overrides.get('payee', entry.payee), |
|
narration = rule_overrides.get('narration', entry.narration), |
|
tags = rule_overrides.get('tags', entry.tags), |
|
links = entry.links, |
|
postings = [ |
|
owned_postings[0], |
|
other_postings[0] |
|
] |
|
) |
|
# result.entries = list(filter(lambda e: e is not None, result.entries)) |
|
|
|
loaded.prepare = new_prepare |
|
|
|
return loaded |
|
|
|
|