Last active
December 30, 2024 21:05
-
-
Save James-E-A/98dedad9d856402f7688c47288e53eba to your computer and use it in GitHub Desktop.
Beancount clear different legs of transaction on different dates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
from datetime import date, timedelta | |
from functools import reduce | |
from gettext import dgettext | |
import re | |
import traceback | |
from types import SimpleNamespace | |
import uuid | |
from beancount.core.account import join as account_join, leaf, root, sans_root | |
from beancount.core.data import * | |
from beancount.core.interpolate import AUTOMATIC_META, BalanceError as BalanceError_t | |
__plugins__ = ['temporal_settlement'] | |
BREAKOUT_FLAGS = {'!'} | |
def temporal_settlement(entries, options_map, account_clearing='Clearing'): | |
bean_ctx = SimpleNamespace( | |
entries=entries, | |
options_map=options_map | {'account_clearing': account_clearing}, | |
errors=[] | |
) | |
return list(_main(bean_ctx)), bean_ctx.errors | |
def _main(bean_ctx): | |
for entry in bean_ctx.entries: | |
if isinstance(entry, Transaction): | |
yield from _process_transaction(entry, bean_ctx) | |
else: | |
# skip / passthrough entry | |
yield entry | |
def _process_transaction(transaction, bean_ctx) -> 'List[Transaction]': | |
result = [transaction._replace( | |
postings=[], | |
meta=_meta_with_nonrel_dates(transaction.meta, transaction.date) | |
)] | |
for posting in transaction.postings: | |
with _beancount_catching(transaction, errors_into=bean_ctx.errors, t_bean=BalanceError_t): | |
try: | |
posting_altered, t_new = _process_posting(TxnPosting(transaction, posting), bean_ctx) | |
except Exception: | |
# pass posting through unaltered | |
result[0].postings.append(posting) | |
# pass error through to _beancount_catching | |
raise | |
else: | |
result[0].postings.append(posting_altered) | |
if t_new is not None: | |
result.append(t_new) | |
return result | |
def _process_posting(txnp, bean_ctx) -> 'Tuple[Posting, Optional[Transaction]]': | |
clearing_positive = account_join( | |
bean_ctx.options_map.get('name_assets', "Assets"), | |
bean_ctx.options_map['account_clearing'] | |
) | |
clearing_negative = account_join( | |
bean_ctx.options_map.get('name_liabilities', "Liabilities"), | |
bean_ctx.options_map['account_clearing'] | |
) | |
clearing_leaf = leaf(bean_ctx.options_map['account_clearing']) | |
expense_root = bean_ctx.options_map.get('name_expenses', "Expenses") | |
txn_date = txnp.txn.date | |
posting_settlement_date = _parse_dateish(txnp.posting.meta.get('settled'), relto=txn_date) | |
txn_billing_date = _parse_dateish(txnp.txn.meta.get('billed'), relto=txn_date) | |
posting_billing_date = _parse_dateish(txnp.posting.meta.get('billed'), relto=txn_date) | |
posting_id = str(reduce(uuid.uuid3, [ | |
uuid.UUID("fa7c59e5-b763-11ef-bd75-a01d48f34869"), | |
txnp.txn.meta.get('filename', ""), | |
str(txnp.txn.meta.get('lineno', -1)) | |
])) | |
# Codepath 1: Individual posting with a settlement date | |
if ( | |
posting_settlement_date is not None | |
and leaf(txnp.posting.account) != clearing_leaf | |
): | |
info_billing_date = None | |
if root(1, txnp.posting.account) != expense_root: | |
expense_posting_billing_dates = set(filter( | |
lambda d: d is not None, | |
(_parse_dateish(p_.meta.get('billed')) for p_ in txnp.txn.postings if root(1, p_.account) == expense_root) | |
)) | |
if txn_billing_date is not None: | |
# this is not an Expense posting, | |
# and the parent transaction has an explicit Billing date set. | |
# clearly this posting is related to a specific billing date. | |
info_billing_date = txn_billing_date | |
elif len(expense_posting_billing_dates) == 1: | |
# this is not an Expense posting, | |
# and EVERY Expense posting on this transaction has the same explicit Billing date set. | |
# clearly this posting is related to a specific billing date. | |
info_billing_date = next(iter(expense_posting_billing_dates)) | |
elif txnp.posting.units.number < 0 and _some_and_all(root(1, p_.account) == expense_root for p_ in txnp.txn.postings if p_.units.number > 0): | |
# this is not an Expense posting, | |
# and this is a negative posting, | |
# and EVERY positive posting on this transaction was an Expense. | |
# clearly this posting is related to a specific billing date. | |
info_billing_date = txn_date | |
if info_billing_date is None or info_billing_date == posting_billing_date: | |
return _make_breakout(txnp, posting_settlement_date, bean_ctx) | |
else: | |
t, p = _make_breakout(txnp, posting_settlement_date, bean_ctx) | |
p = p._replace(meta=p.meta | {'billed': info_billing_date}) | |
return t, p | |
# Codepath 2: Expense posting on a transction which has a Billed date | |
elif ( | |
txn_billing_date is not None | |
and root(1, txnp.posting.account) == expense_root | |
): | |
assert posting_settlement_date is None, "wtf" | |
# FIXME: should sweep ALL these together as a posting group/batch | |
return _make_breakout(txnp, txn_billing_date, bean_ctx) | |
# Codepath 3: Individual posting tagged with a Billed date | |
elif ( | |
posting_billing_date is not None | |
): | |
assert posting_settlement_date is None, "wtf" | |
return _make_breakout(txnp, posting_billing_date, bean_ctx, force_billed_comment=True) | |
else: | |
return txnp.posting, None | |
def _make_breakout(txnp, effective_date,bean_ctx, force_billed_comment=False) -> 'Tuple[Posting, Transaction]': | |
clearing_positive = account_join( | |
bean_ctx.options_map.get('name_assets', "Assets"), | |
bean_ctx.options_map['account_clearing'] | |
) | |
clearing_negative = account_join( | |
bean_ctx.options_map.get('name_liabilities', "Liabilities"), | |
bean_ctx.options_map['account_clearing'] | |
) | |
clearing_leaf = leaf(bean_ctx.options_map['account_clearing']) | |
expense_root = bean_ctx.options_map.get('name_expenses', "Expenses") | |
posting_meta = _meta_with_nonrel_dates(txnp.posting.meta, txnp.txn.date) | |
new_txn_meta = _meta_with_nonrel_dates(txnp.txn.meta, txnp.txn.date) | |
nominal_units = txnp.posting.units # FIXME: need to make sure this is the right way to handle cost/price | |
loanish = (nominal_units.number < 0) ^ (effective_date < txnp.txn.date) | |
if root(1, txnp.posting.account) == expense_root: | |
# this IS the outbound leg of the transaction; | |
# we don't need to leave the billing tag on | |
# because it IS ALREADY BEING TRANSMUTED / BROKEN OUT TO THAT DATE | |
_dict_isub_set(new_txn_meta, {'billed'}) | |
narration_comment = ( | |
dgettext('beancount', "billed") if (force_billed_comment or root(1, txnp.posting.account) == expense_root) else | |
dgettext('beancount', "early settlement") if effective_date < txnp.txn.date else | |
dgettext('beancount', "settled") | |
) | |
return ( | |
Posting( | |
account=clearing_negative if loanish else clearing_positive, | |
units=nominal_units, | |
cost=None, | |
price=None, | |
flag='~', | |
meta={ | |
'filename': posting_meta['filename'], | |
'lineno': posting_meta['lineno'], | |
AUTOMATIC_META: True | |
} | |
), | |
# at least Beancount v2 requires this to be a separate transaction | |
# https://beancount.github.io/docs/settlement_dates_in_beancount.html | |
txnp.txn._replace( | |
meta=new_txn_meta, | |
date=effective_date, | |
flag=txnp.posting.flag if txnp.posting.flag in BREAKOUT_FLAGS else txnp.txn.flag, | |
narration=f"{txnp.txn.narration} ({narration_comment})", # https://softwareengineering.stackexchange.com/questions/284842/why-isnt-gettext-parameterized | |
postings=[ | |
Posting( | |
account=clearing_negative if loanish else clearing_positive, | |
units=-nominal_units, | |
cost=None, | |
price=None, | |
flag="~", | |
meta={ | |
'filename': posting_meta['filename'], | |
'lineno': posting_meta['lineno'], | |
AUTOMATIC_META: True | |
} | |
), | |
txnp.posting._replace( | |
meta=_dict_sub_set(posting_meta, {'settled'}) | |
) | |
] | |
) | |
) | |
def _dict_sub_set(a, b): | |
result = a.copy() | |
_dict_isub_set(result, b) | |
return result | |
def _dict_isub_set(a, b): | |
for k in b: | |
if k in a: | |
del a[k] | |
return a | |
def _parse_dateish(obj, relto=None): | |
if isinstance(obj, date): | |
return obj | |
elif isinstance(obj, str): | |
m = re.match(r'([+-]?\d+)d', obj) | |
if m: | |
return relto + timedelta(days=int(m.group(1))) | |
m = re.match(r'T?([+-]\d+)', obj) | |
if m: | |
# FIXME should skip weekends | |
return relto + timedelta(days=int(m.group(1))) | |
elif isinstance(obj, Decimal): | |
assert int(obj) == obj, "wtf" | |
return relto + timedelta(days=int(obj)) | |
else: | |
return obj | |
def _meta_mut_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})): | |
for k in fields: | |
d1 = meta.get(k) | |
d2 = _parse_dateish(d1, relto=d0) | |
if d1 is not None and d1 is not d2: | |
meta[k] = d2 | |
def _meta_with_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})): | |
result = meta.copy() | |
_meta_mut_nonrel_dates(result, d0, fields) | |
return result | |
def _some_and_all(it, /): | |
some_ = False | |
for elem in it: | |
if not elem: | |
return False | |
if not some_: | |
some_ = True | |
return some_ | |
@contextmanager | |
def _beancount_catching(ctx_entry, errors_into, t_bean, *, t_exc=Exception): | |
try: | |
yield | |
except t_exc as exc: | |
fmt = traceback.format_exception(exc) if sys.version_info >= (3, 10) else traceback.format_exception(*sys.exc_info()) | |
message = "\n".join(re.sub(r"\n$", "", line) for line in fmt) | |
errors_into.append(t_bean(ctx_entry.meta, message, ctx_entry)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment