Last active
September 11, 2025 02:11
-
-
Save James-E-A/98dedad9d856402f7688c47288e53eba to your computer and use it in GitHub Desktop.
Beancount clear different legs of transaction on different dates
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from contextlib import contextmanager | |
| from datetime import date | |
| from functools import reduce | |
| from gettext import dgettext | |
| from traceback import format_exception | |
| from types import SimpleNamespace | |
| import re | |
| import uuid | |
| from beancount.core.account import join as account_join, leaf, root, sans_root | |
| from beancount.core.data import * | |
| from beancount.core.interpolate import AUTOMATIC_META, BalanceError as BalanceError_t | |
| BREAKOUT_ALLOW_FLAGS = {'!'} | |
| __plugins__ = ['temporal_settlement'] | |
| def temporal_settlement(entries, options_map): | |
| bean_ctx = SimpleNamespace( | |
| entries=entries, | |
| options_map=options_map, | |
| errors=[] | |
| ) | |
| return _main(bean_ctx), bean_ctx.errors | |
| @lambda f: lambda *a, **k: list(f(*a, **k)) | |
| def _main(bean_ctx): | |
| for entry in bean_ctx.entries: | |
| if not isinstance(entry, Transaction): | |
| yield entry | |
| continue | |
| yield from process_transaction(entry, bean_ctx=bean_ctx) | |
| def process_transaction(transaction, bean_ctx): | |
| result = [ | |
| transaction._replace( | |
| postings=[], | |
| meta=_meta_with_nonrel_dates(transaction.meta, transaction.date) | |
| ) | |
| ] | |
| for posting in transaction.postings: | |
| with _beancount_catching(transaction, errors_into=bean_ctx.errors, t_bean=BalanceError_t): | |
| try: | |
| posting_altered, t_new = process_posting(TxnPosting(transaction, posting), bean_ctx=bean_ctx) | |
| except Exception: | |
| # pass posting through unaltered | |
| result[0].postings.append(posting) | |
| # pass error through to _beancount_catching | |
| raise | |
| else: | |
| result[0].postings.append(posting_altered) | |
| if t_new is not None: | |
| result.append(t_new) | |
| return result | |
| def process_posting(txnp, bean_ctx): | |
| expense_root = bean_ctx.options_map.get('name_expenses', "Expenses") | |
| txn_date = txnp.txn.date | |
| posting_settlement_date = _parse_dateish(txnp.posting.meta.get('settled'), relto=txn_date) | |
| txn_billing_date = _parse_dateish(txnp.txn.meta.get('billed'), relto=txn_date) | |
| posting_billing_date = _parse_dateish(txnp.posting.meta.get('billed'), relto=txn_date) | |
| posting_id = str(reduce(uuid.uuid3, [ | |
| uuid.UUID("fa7c59e5-b763-11ef-bd75-a01d48f34869"), | |
| txnp.txn.meta.get('filename', ""), | |
| str(txnp.txn.meta.get('lineno', -1)) | |
| ])) | |
| # Codepath 1: Individual posting with a settlement date | |
| if posting_settlement_date is not None: | |
| info_billing_date = None | |
| if root(1, txnp.posting.account) != expense_root: | |
| expense_posting_billing_dates = set( | |
| d for d in ( | |
| _parse_dateish(p_.meta.get('billed')) | |
| for p_ in txnp.txn.postings | |
| if root(1, p_.account) == expense_root | |
| ) | |
| if d is not None | |
| ) | |
| if txn_billing_date is not None: | |
| # this is not an Expense posting, | |
| # and the parent transaction has an explicit Billing date set. | |
| # clearly this posting is related to a specific billing date. | |
| info_billing_date = txn_billing_date | |
| elif len(expense_posting_billing_dates) == 1: | |
| # this is not an Expense posting, | |
| # and EVERY Expense posting on this transaction has the same explicit Billing date set. | |
| # clearly this posting is related to a specific billing date. | |
| info_billing_date = next(iter(expense_posting_billing_dates)) | |
| elif txnp.posting.units.number < 0 and _some_and_all(root(1, p_.account) == expense_root for p_ in txnp.txn.postings if p_.units.number > 0): | |
| # this is not an Expense posting, | |
| # and this is a negative posting, | |
| # and EVERY positive posting on this transaction was an Expense. | |
| # clearly this posting is related to a specific billing date. | |
| info_billing_date = txn_date | |
| if info_billing_date is None or info_billing_date == posting_billing_date: | |
| return make_breakout(txnp, posting_settlement_date, bean_ctx=bean_ctx) | |
| else: | |
| t, p = make_breakout(txnp, posting_settlement_date, bean_ctx=bean_ctx) | |
| p = p._replace(meta=p.meta | {'billed': info_billing_date}) | |
| return t, p | |
| # Codepath 2: Expense posting on a transction which has a Billed date | |
| if ( | |
| txn_billing_date is not None | |
| and root(1, txnp.posting.account) == expense_root | |
| ): | |
| assert posting_settlement_date is None, "wtf" | |
| # FIXME: should sweep ALL these together as a posting group/batch | |
| return make_breakout(txnp, txn_billing_date, bean_ctx) | |
| # Codepath 3: Individual posting tagged with a Billed date | |
| if posting_billing_date is not None: | |
| assert posting_settlement_date is None, "wtf" | |
| return make_breakout(txnp, posting_billing_date, bean_ctx, force_billed_comment=True) | |
| return txnp.posting, None | |
| def make_breakout(txnp, effective_date, bean_ctx, force_billed_comment=False): | |
| clearing_positive = account_join( | |
| bean_ctx.options_map.get('name_assets', "Assets"), | |
| bean_ctx.options_map.get('account_clearing', "Clearing") | |
| ) | |
| clearing_negative = account_join( | |
| bean_ctx.options_map.get('name_liabilities', "Liabilities"), | |
| bean_ctx.options_map.get('account_clearing', "Clearing") | |
| ) | |
| expense_root = bean_ctx.options_map.get('name_expenses', "Expenses") | |
| posting_meta = _meta_with_nonrel_dates(txnp.posting.meta, txnp.txn.date) | |
| new_txn_meta = _meta_with_nonrel_dates(txnp.txn.meta, txnp.txn.date) | |
| nominal_units = txnp.posting.units # FIXME: need to make sure this is the right way to handle cost/price | |
| clearing_account = clearing_negative if ((nominal_units.number < 0) ^ (effective_date < txnp.txn.date)) else clearing_positive | |
| if root(1, txnp.posting.account) == expense_root: | |
| if 'billed' in new_txn_meta: | |
| del new_txn_meta['billed'] | |
| narration_comment = ( | |
| dgettext('beancount', "billed") if (force_billed_comment or root(1, txnp.posting.account) == expense_root) else | |
| dgettext('beancount', "early settlement") if effective_date < txnp.txn.date else | |
| dgettext('beancount', "settled") | |
| ) | |
| return ( | |
| Posting( | |
| account=clearing_account, | |
| units=nominal_units, | |
| cost=None, | |
| price=None, | |
| flag='~', | |
| meta={ | |
| 'filename': posting_meta['filename'], | |
| 'lineno': posting_meta['lineno'], | |
| AUTOMATIC_META: True | |
| } | |
| ), | |
| # at least Beancount v2 requires this to be a separate transaction | |
| # https://beancount.github.io/docs/settlement_dates_in_beancount.html | |
| txnp.txn._replace( | |
| meta=new_txn_meta, | |
| date=effective_date, | |
| flag=txnp.posting.flag if txnp.posting.flag in BREAKOUT_ALLOW_FLAGS else txnp.txn.flag, | |
| # https://softwareengineering.stackexchange.com/questions/284842/why-isnt-gettext-parameterized | |
| narration=f"{txnp.txn.narration} ({narration_comment})", | |
| postings=[ | |
| Posting( | |
| account=clearing_account, | |
| units=-(nominal_units), | |
| cost=None, | |
| price=None, | |
| flag='~', | |
| meta={ | |
| 'filename': posting_meta['filename'], | |
| 'lineno': posting_meta['lineno'], | |
| AUTOMATIC_META: True | |
| } | |
| ), | |
| txnp.posting._replace( | |
| meta={k: v for k, v in posting_meta.items() if k != 'settled'} | |
| ) | |
| ] | |
| ) | |
| ) | |
| @contextmanager | |
| def _beancount_catching(ctx_entry, errors_into, t_bean, *, t_exc=Exception): | |
| try: | |
| yield | |
| except t_exc as exc: | |
| fmt = format_exception(exc) if sys.version_info >= (3, 10) else format_exception(*sys.exc_info()) | |
| message = "\n".join(re.sub(r"\n$", "", line) for line in fmt) | |
| errors_into.append(t_bean(ctx_entry.meta, message, ctx_entry)) | |
| def _parse_dateish(obj, relto=None): | |
| if isinstance(obj, date): | |
| return obj | |
| elif isinstance(obj, str): | |
| m = re.match(r'([+-]?\d+)d', obj) | |
| if m: | |
| return relto + timedelta(days=int(m.group(1))) | |
| m = re.match(r'T?([+-]\d+)', obj) | |
| if m: | |
| # FIXME should skip weekends | |
| return relto + timedelta(days=int(m.group(1))) | |
| elif isinstance(obj, Decimal): | |
| assert int(obj) == obj, "wtf" | |
| return relto + timedelta(days=int(obj)) | |
| else: | |
| return obj | |
| def _some_and_all(it, /): | |
| some_ = False | |
| for elem in it: | |
| if not elem: | |
| return False | |
| if not some_: | |
| some_ = True | |
| return some_ | |
| def _meta_with_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})): | |
| result = meta.copy() | |
| _meta_mut_nonrel_dates(result, d0, fields) | |
| return result | |
| def _meta_mut_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})): | |
| for k in fields: | |
| d1 = meta.get(k) | |
| d2 = _parse_dateish(d1, relto=d0) | |
| if d1 is not None and d1 is not d2: | |
| meta[k] = d2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment