Skip to content

Instantly share code, notes, and snippets.

@James-E-A
Last active September 11, 2025 02:11
Show Gist options
  • Save James-E-A/98dedad9d856402f7688c47288e53eba to your computer and use it in GitHub Desktop.
Save James-E-A/98dedad9d856402f7688c47288e53eba to your computer and use it in GitHub Desktop.
Beancount clear different legs of transaction on different dates
from contextlib import contextmanager
from datetime import date
from functools import reduce
from gettext import dgettext
from traceback import format_exception
from types import SimpleNamespace
import re
import uuid
from beancount.core.account import join as account_join, leaf, root, sans_root
from beancount.core.data import *
from beancount.core.interpolate import AUTOMATIC_META, BalanceError as BalanceError_t
BREAKOUT_ALLOW_FLAGS = {'!'}
__plugins__ = ['temporal_settlement']
def temporal_settlement(entries, options_map):
bean_ctx = SimpleNamespace(
entries=entries,
options_map=options_map,
errors=[]
)
return _main(bean_ctx), bean_ctx.errors
@lambda f: lambda *a, **k: list(f(*a, **k))
def _main(bean_ctx):
for entry in bean_ctx.entries:
if not isinstance(entry, Transaction):
yield entry
continue
yield from process_transaction(entry, bean_ctx=bean_ctx)
def process_transaction(transaction, bean_ctx):
result = [
transaction._replace(
postings=[],
meta=_meta_with_nonrel_dates(transaction.meta, transaction.date)
)
]
for posting in transaction.postings:
with _beancount_catching(transaction, errors_into=bean_ctx.errors, t_bean=BalanceError_t):
try:
posting_altered, t_new = process_posting(TxnPosting(transaction, posting), bean_ctx=bean_ctx)
except Exception:
# pass posting through unaltered
result[0].postings.append(posting)
# pass error through to _beancount_catching
raise
else:
result[0].postings.append(posting_altered)
if t_new is not None:
result.append(t_new)
return result
def process_posting(txnp, bean_ctx):
expense_root = bean_ctx.options_map.get('name_expenses', "Expenses")
txn_date = txnp.txn.date
posting_settlement_date = _parse_dateish(txnp.posting.meta.get('settled'), relto=txn_date)
txn_billing_date = _parse_dateish(txnp.txn.meta.get('billed'), relto=txn_date)
posting_billing_date = _parse_dateish(txnp.posting.meta.get('billed'), relto=txn_date)
posting_id = str(reduce(uuid.uuid3, [
uuid.UUID("fa7c59e5-b763-11ef-bd75-a01d48f34869"),
txnp.txn.meta.get('filename', ""),
str(txnp.txn.meta.get('lineno', -1))
]))
# Codepath 1: Individual posting with a settlement date
if posting_settlement_date is not None:
info_billing_date = None
if root(1, txnp.posting.account) != expense_root:
expense_posting_billing_dates = set(
d for d in (
_parse_dateish(p_.meta.get('billed'))
for p_ in txnp.txn.postings
if root(1, p_.account) == expense_root
)
if d is not None
)
if txn_billing_date is not None:
# this is not an Expense posting,
# and the parent transaction has an explicit Billing date set.
# clearly this posting is related to a specific billing date.
info_billing_date = txn_billing_date
elif len(expense_posting_billing_dates) == 1:
# this is not an Expense posting,
# and EVERY Expense posting on this transaction has the same explicit Billing date set.
# clearly this posting is related to a specific billing date.
info_billing_date = next(iter(expense_posting_billing_dates))
elif txnp.posting.units.number < 0 and _some_and_all(root(1, p_.account) == expense_root for p_ in txnp.txn.postings if p_.units.number > 0):
# this is not an Expense posting,
# and this is a negative posting,
# and EVERY positive posting on this transaction was an Expense.
# clearly this posting is related to a specific billing date.
info_billing_date = txn_date
if info_billing_date is None or info_billing_date == posting_billing_date:
return make_breakout(txnp, posting_settlement_date, bean_ctx=bean_ctx)
else:
t, p = make_breakout(txnp, posting_settlement_date, bean_ctx=bean_ctx)
p = p._replace(meta=p.meta | {'billed': info_billing_date})
return t, p
# Codepath 2: Expense posting on a transction which has a Billed date
if (
txn_billing_date is not None
and root(1, txnp.posting.account) == expense_root
):
assert posting_settlement_date is None, "wtf"
# FIXME: should sweep ALL these together as a posting group/batch
return make_breakout(txnp, txn_billing_date, bean_ctx)
# Codepath 3: Individual posting tagged with a Billed date
if posting_billing_date is not None:
assert posting_settlement_date is None, "wtf"
return make_breakout(txnp, posting_billing_date, bean_ctx, force_billed_comment=True)
return txnp.posting, None
def make_breakout(txnp, effective_date, bean_ctx, force_billed_comment=False):
clearing_positive = account_join(
bean_ctx.options_map.get('name_assets', "Assets"),
bean_ctx.options_map.get('account_clearing', "Clearing")
)
clearing_negative = account_join(
bean_ctx.options_map.get('name_liabilities', "Liabilities"),
bean_ctx.options_map.get('account_clearing', "Clearing")
)
expense_root = bean_ctx.options_map.get('name_expenses', "Expenses")
posting_meta = _meta_with_nonrel_dates(txnp.posting.meta, txnp.txn.date)
new_txn_meta = _meta_with_nonrel_dates(txnp.txn.meta, txnp.txn.date)
nominal_units = txnp.posting.units # FIXME: need to make sure this is the right way to handle cost/price
clearing_account = clearing_negative if ((nominal_units.number < 0) ^ (effective_date < txnp.txn.date)) else clearing_positive
if root(1, txnp.posting.account) == expense_root:
if 'billed' in new_txn_meta:
del new_txn_meta['billed']
narration_comment = (
dgettext('beancount', "billed") if (force_billed_comment or root(1, txnp.posting.account) == expense_root) else
dgettext('beancount', "early settlement") if effective_date < txnp.txn.date else
dgettext('beancount', "settled")
)
return (
Posting(
account=clearing_account,
units=nominal_units,
cost=None,
price=None,
flag='~',
meta={
'filename': posting_meta['filename'],
'lineno': posting_meta['lineno'],
AUTOMATIC_META: True
}
),
# at least Beancount v2 requires this to be a separate transaction
# https://beancount.github.io/docs/settlement_dates_in_beancount.html
txnp.txn._replace(
meta=new_txn_meta,
date=effective_date,
flag=txnp.posting.flag if txnp.posting.flag in BREAKOUT_ALLOW_FLAGS else txnp.txn.flag,
# https://softwareengineering.stackexchange.com/questions/284842/why-isnt-gettext-parameterized
narration=f"{txnp.txn.narration} ({narration_comment})",
postings=[
Posting(
account=clearing_account,
units=-(nominal_units),
cost=None,
price=None,
flag='~',
meta={
'filename': posting_meta['filename'],
'lineno': posting_meta['lineno'],
AUTOMATIC_META: True
}
),
txnp.posting._replace(
meta={k: v for k, v in posting_meta.items() if k != 'settled'}
)
]
)
)
@contextmanager
def _beancount_catching(ctx_entry, errors_into, t_bean, *, t_exc=Exception):
try:
yield
except t_exc as exc:
fmt = format_exception(exc) if sys.version_info >= (3, 10) else format_exception(*sys.exc_info())
message = "\n".join(re.sub(r"\n$", "", line) for line in fmt)
errors_into.append(t_bean(ctx_entry.meta, message, ctx_entry))
def _parse_dateish(obj, relto=None):
if isinstance(obj, date):
return obj
elif isinstance(obj, str):
m = re.match(r'([+-]?\d+)d', obj)
if m:
return relto + timedelta(days=int(m.group(1)))
m = re.match(r'T?([+-]\d+)', obj)
if m:
# FIXME should skip weekends
return relto + timedelta(days=int(m.group(1)))
elif isinstance(obj, Decimal):
assert int(obj) == obj, "wtf"
return relto + timedelta(days=int(obj))
else:
return obj
def _some_and_all(it, /):
some_ = False
for elem in it:
if not elem:
return False
if not some_:
some_ = True
return some_
def _meta_with_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})):
result = meta.copy()
_meta_mut_nonrel_dates(result, d0, fields)
return result
def _meta_mut_nonrel_dates(meta, d0, fields=frozenset({'settled', 'billed'})):
for k in fields:
d1 = meta.get(k)
d2 = _parse_dateish(d1, relto=d0)
if d1 is not None and d1 is not d2:
meta[k] = d2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment