Skip to content

Instantly share code, notes, and snippets.

@James-E-A
Last active December 30, 2024 21:05
Show Gist options
  • Save James-E-A/98dedad9d856402f7688c47288e53eba to your computer and use it in GitHub Desktop.
Save James-E-A/98dedad9d856402f7688c47288e53eba to your computer and use it in GitHub Desktop.
Beancount clear different legs of transaction on different dates
from contextlib import contextmanager
from datetime import date, timedelta
from functools import reduce
from gettext import dgettext
import re
import traceback
from types import SimpleNamespace
import uuid
from beancount.core.account import join as account_join, leaf, root, sans_root
from beancount.core.data import *
from beancount.core.interpolate import AUTOMATIC_META, BalanceError as BalanceError_t
__plugins__ = ['temporal_settlement']
BREAKOUT_FLAGS = {'!'}
def temporal_settlement(entries, options_map, account_clearing='Clearing'):
bean_ctx = SimpleNamespace(
entries=entries,
options_map=options_map | {'account_clearing': account_clearing},
errors=[]
)
return list(_main(bean_ctx)), bean_ctx.errors
def _main(bean_ctx):
for entry in bean_ctx.entries:
if isinstance(entry, Transaction):
yield from _process_transaction(entry, bean_ctx)
else:
# skip / passthrough entry
yield entry
def _process_transaction(transaction, bean_ctx) -> 'List[Transaction]':
result = [transaction._replace(
postings=[],
meta=_meta_with_nonrel_dates(transaction.meta, transaction.date)
)]
for posting in transaction.postings:
with _beancount_catching(transaction, errors_into=bean_ctx.errors, t_bean=BalanceError_t):
try:
posting_altered, t_new = _process_posting(TxnPosting(transaction, posting), bean_ctx)
except Exception:
# pass posting through unaltered
result[0].postings.append(posting)
# pass error through to _beancount_catching
raise
else:
result[0].postings.append(posting_altered)
if t_new is not None:
result.append(t_new)
return result
def _process_posting(txnp, bean_ctx) -> 'Tuple[Posting, Optional[Transaction]]':
clearing_positive = account_join(
bean_ctx.options_map.get('name_assets', "Assets"),
bean_ctx.options_map['account_clearing']
)
clearing_negative = account_join(
bean_ctx.options_map.get('name_liabilities', "Liabilities"),
bean_ctx.options_map['account_clearing']
)
clearing_leaf = leaf(bean_ctx.options_map['account_clearing'])
expense_root = bean_ctx.options_map.get('name_expenses', "Expenses")
txn_date = txnp.txn.date
posting_settlement_date = _parse_dateish(txnp.posting.meta.get('settled'), relto=txn_date)
txn_billing_date = _parse_dateish(txnp.txn.meta.get('billed'), relto=txn_date)
posting_billing_date = _parse_dateish(txnp.posting.meta.get('billed'), relto=txn_date)
posting_id = str(reduce(uuid.uuid3, [
uuid.UUID("fa7c59e5-b763-11ef-bd75-a01d48f34869"),
txnp.txn.meta.get('filename', ""),
str(txnp.txn.meta.get('lineno', -1))
]))
# Codepath 1: Individual posting with a settlement date
if (
posting_settlement_date is not None
and leaf(txnp.posting.account) != clearing_leaf
):
info_billing_date = None
if root(1, txnp.posting.account) != expense_root:
expense_posting_billing_dates = set(filter(
lambda d: d is not None,
(_parse_dateish(p_.meta.get('billed')) for p_ in txnp.txn.postings if root(1, p_.account) == expense_root)
))
if txn_billing_date is not None:
# this is not an Expense posting,
# and the parent transaction has an explicit Billing date set.
# clearly this posting is related to a specific billing date.
info_billing_date = txn_billing_date
elif len(expense_posting_billing_dates) == 1:
# this is not an Expense posting,
# and EVERY Expense posting on this transaction has the same explicit Billing date set.
# clearly this posting is related to a specific billing date.
info_billing_date = next(iter(expense_posting_billing_dates))
elif txnp.posting.units.number < 0 and _some_and_all(root(1, p_.account) == expense_root for p_ in txnp.txn.postings if p_.units.number > 0):
# this is not an Expense posting,
# and this is a negative posting,
# and EVERY positive posting on this transaction was an Expense.
# clearly this posting is related to a specific billing date.
info_billing_date = txn_date
if info_billing_date is None or info_billing_date == posting_billing_date:
return _make_breakout(txnp, posting_settlement_date, bean_ctx)
else:
t, p = _make_breakout(txnp, posting_settlement_date, bean_ctx)
p = p._replace(meta=p.meta | {'billed': info_billing_date})
return t, p
# Codepath 2: Expense posting on a transction which has a Billed date
elif (
txn_billing_date is not None
and root(1, txnp.posting.account) == expense_root
):
assert posting_settlement_date is None, "wtf"
# FIXME: should sweep ALL these together as a posting group/batch
return _make_breakout(txnp, txn_billing_date, bean_ctx)
# Codepath 3: Individual posting tagged with a Billed date
elif (
posting_billing_date is not None
):
assert posting_settlement_date is None, "wtf"
return _make_breakout(txnp, posting_billing_date, bean_ctx, force_billed_comment=True)
else:
return txnp.posting, None
def _make_breakout(txnp, effective_date,bean_ctx, force_billed_comment=False) -> 'Tuple[Posting, Transaction]':
clearing_positive = account_join(
bean_ctx.options_map.get('name_assets', "Assets"),
bean_ctx.options_map['account_clearing']
)
clearing_negative = account_join(
bean_ctx.options_map.get('name_liabilities', "Liabilities"),
bean_ctx.options_map['account_clearing']
)
clearing_leaf = leaf(bean_ctx.options_map['account_clearing'])
expense_root = bean_ctx.options_map.get('name_expenses', "Expenses")
posting_meta = _meta_with_nonrel_dates(txnp.posting.meta, txnp.txn.date)
new_txn_meta = _meta_with_nonrel_dates(txnp.txn.meta, txnp.txn.date)
nominal_units = txnp.posting.units # FIXME: need to make sure this is the right way to handle cost/price
loanish = (nominal_units.number < 0) ^ (effective_date < txnp.txn.date)
if root(1, txnp.posting.account) == expense_root:
# this IS the outbound leg of the transaction;
# we don't need to leave the billing tag on
# because it IS ALREADY BEING TRANSMUTED / BROKEN OUT TO THAT DATE
_dict_isub_set(new_txn_meta, {'billed'})
narration_comment = (
dgettext('beancount', "billed") if (force_billed_comment or root(1, txnp.posting.account) == expense_root) else
dgettext('beancount', "early settlement") if effective_date < txnp.txn.date else
dgettext('beancount', "settled")
)
return (
Posting(
account=clearing_negative if loanish else clearing_positive,
units=nominal_units,
cost=None,
price=None,
flag='~',
meta={
'filename': posting_meta['filename'],
'lineno': posting_meta['lineno'],
AUTOMATIC_META: True
}
),
# at least Beancount v2 requires this to be a separate transaction
# https://beancount.github.io/docs/settlement_dates_in_beancount.html
txnp.txn._replace(
meta=new_txn_meta,
date=effective_date,
flag=txnp.posting.flag if txnp.posting.flag in BREAKOUT_FLAGS else txnp.txn.flag,
narration=f"{txnp.txn.narration} ({narration_comment})", # https://softwareengineering.stackexchange.com/questions/284842/why-isnt-gettext-parameterized
postings=[
Posting(
account=clearing_negative if loanish else clearing_positive,
units=-nominal_units,
cost=None,
price=None,
flag="~",
meta={
'filename': posting_meta['filename'],
'lineno': posting_meta['lineno'],
AUTOMATIC_META: True
}
),
txnp.posting._replace(
meta=_dict_sub_set(posting_meta, {'settled'})
)
]
)
)
def _dict_sub_set(a, b):
result = a.copy()
_dict_isub_set(result, b)
return result
def _dict_isub_set(a, b):
for k in b:
if k in a:
del a[k]
return a
def _parse_dateish(obj, relto=None):
if isinstance(obj, date):
return obj
elif isinstance(obj, str):
m = re.match(r'([+-]?\d+)d', obj)
if m:
return relto + timedelta(days=int(m.group(1)))
m = re.match(r'T?([+-]\d+)', obj)
if m:
# FIXME should skip weekends
return relto + timedelta(days=int(m.group(1)))
elif isinstance(obj, Decimal):
assert int(obj) == obj, "wtf"
return relto + timedelta(days=int(obj))
else:
return obj
def _meta_mut_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})):
for k in fields:
d1 = meta.get(k)
d2 = _parse_dateish(d1, relto=d0)
if d1 is not None and d1 is not d2:
meta[k] = d2
def _meta_with_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})):
result = meta.copy()
_meta_mut_nonrel_dates(result, d0, fields)
return result
def _some_and_all(it, /):
some_ = False
for elem in it:
if not elem:
return False
if not some_:
some_ = True
return some_
@contextmanager
def _beancount_catching(ctx_entry, errors_into, t_bean, *, t_exc=Exception):
try:
yield
except t_exc as exc:
fmt = traceback.format_exception(exc) if sys.version_info >= (3, 10) else traceback.format_exception(*sys.exc_info())
message = "\n".join(re.sub(r"\n$", "", line) for line in fmt)
errors_into.append(t_bean(ctx_entry.meta, message, ctx_entry))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment