Created
November 14, 2019 21:51
-
-
Save chillpop/5be09dec00de8b0b03e3c75d815a7319 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from json import load | |
from collections import defaultdict, deque, namedtuple | |
# handle transaction cancellations | |
# disallow payments with insufficient balance | |
# (- allow deferred up to ... time) | |
# handle payments to (but not from) unknown accounts | |
# TRY: | |
# Print out some accounts for each account: | |
# last payment | |
# change since T-1 | |
# (Stretch) Identify account takeover: | |
# flag accounts with three consecutive outgoing payments | |
# use deque for last-N transactions | |
def currency_code(currency): | |
return { | |
'USD': '$', | |
'EUR': '€', | |
'JPY': '¥' | |
}.get(currency, currency_code) | |
class FraudChecker: | |
def __init__(self, max_consecutive_payments=3): | |
self.payments_by_account = defaultdict(list) | |
self.max_consecutive_payments = max_consecutive_payments | |
def add_payment(self, payment, sender): | |
self.payments_by_account[sender.ident].append(payment) | |
count = len(self.payments_by_account[sender.ident]) | |
if count >= self.max_consecutive_payments: | |
print( | |
f'{sender.name} is potentially fraudulent with {count} consecutive payments' | |
) | |
class Account: | |
def __init__(self, ident=0, name="<Unknown>", country=None, balances=None): | |
self.ident = ident | |
self.name = name | |
self.country = country | |
self.balances = balances | |
if balances is None: | |
self.balances = defaultdict(int) | |
self.history = deque(maxlen=3) | |
@classmethod | |
def from_json(cls, json): | |
return cls( | |
ident=json["id"], | |
name=json["name"], | |
country=json["country"], | |
balances=json["balances"] | |
) | |
def append_payment(self, payment): | |
if payment.sender == self.ident: | |
self.history.append(payment) | |
def last_payment(self): | |
if len(self.history) > 0: | |
return self.history[-1] | |
return None | |
def can_pay(self, payment): | |
current_balance = self.balances[payment.currency] | |
return current_balance >= payment.total_amount(who=SENDER) | |
class Payment(namedtuple('PaymentBase', 'ident type datetime sender recipient amount currency')): | |
action = 'paid' | |
fee = None | |
def __repr__(self): | |
code = currency_code(self.currency) | |
return f'{self.ident} at {self.datetime}: {self.sender} {self.action} {self.recipient} {code}{self.amount}' | |
@classmethod | |
def from_json(cls, json): | |
return cls( | |
json['id'], | |
json['type'], | |
json['datetime'], | |
json['sender'], | |
json['recipient'], | |
json['amount'], | |
json['currency'] | |
) | |
def reverse(self, ident=0, datetime=None): | |
reversal = Payment( | |
ident=ident, | |
type=self.type, | |
datetime=datetime, | |
sender=self.recipient, | |
recipient=self.sender, | |
amount=self.amount, | |
currency=self.currency | |
) | |
reversal.action = 'reversed' | |
reversal.fee = self.fee | |
return reversal | |
@property | |
def fee_amount(self): | |
if self.fee is None: | |
return 0 | |
return self.fee.flat_value + (self.amount * self.fee.percent) | |
def total_amount(self, who): | |
if self.fee is not None and self.fee.who == who: | |
return self.amount + self.fee_amount | |
return self.amount | |
class Reversal(namedtuple('ReversalBase', 'id type datetime payment')): | |
@classmethod | |
def from_json(cls, json): | |
return cls( | |
json['id'], | |
json['type'], | |
json['datetime'], | |
json['payment'], | |
) | |
# class FeeType() | |
class Fee: | |
def __init__(self, value, percent, who): | |
self.flat_value = value | |
self.percent = percent | |
self.who = who | |
@classmethod | |
def from_json(cls, json): | |
return cls( | |
value=json.get('flat', 0), | |
percent=json.get('pct', 0), | |
who=json['who'], | |
) | |
@classmethod | |
def load_fees(cls, file='fees.json'): | |
fees = {} | |
with open(file) as f: | |
data = load(f) | |
for currency, json in data.items(): | |
fees[currency] = Fee.from_json(json) | |
return fees | |
fees_by_currency = Fee.load_fees() | |
PAYMENT_TYPE = 'payment' | |
REVERSAL_TYPE = 'cancel' | |
SENDER = 'sender' | |
RECIPIENT = 'recipient' | |
def load_balances(file='balances.json', accounts=None): | |
with open(file) as f: | |
data = load(f) | |
for d in data: | |
balances = d['balances'] | |
d['balances'] = defaultdict(int, **balances) | |
new_account = Account.from_json(d) | |
accounts[new_account.ident] = new_account | |
return accounts | |
def load_payments(file='payments.json'): | |
payments = list() | |
with open(file) as f: | |
data = load(f) | |
for d in data: | |
p_type = d['type'] | |
if p_type == PAYMENT_TYPE: | |
p = Payment.from_json(d) | |
if p.currency in fees_by_currency: | |
p.fee = fees_by_currency[p.currency] | |
payments.append(p) | |
elif p_type == REVERSAL_TYPE: | |
payments.append(Reversal.from_json(d)) | |
else: | |
raise BaseException | |
return payments | |
def process(accounts, payments): | |
for p in payments: | |
if isinstance(p, Payment): | |
process_payment(accounts, p) | |
elif isinstance(p, Reversal): | |
payment = find_transaction(p.payment, payments) | |
process_payment(accounts, payment.reverse()) | |
else: | |
raise Exception | |
def find_transaction(ident, transactions): | |
for t in transactions: | |
if t.ident == ident: | |
return t | |
return None | |
def process_payment(accounts, payment, fraud_checker=FraudChecker()): | |
sender = accounts[payment.sender] | |
recipient = accounts[payment.recipient] | |
if sender.can_pay(payment) is not True: | |
print( | |
f'{sender.name} does not have at least {payment.total_amount} {payment.currency}' | |
) | |
return | |
sender.balances[payment.currency] -= payment.total_amount(who=SENDER) | |
recipient.balances[payment.currency] += payment.total_amount(who=RECIPIENT) | |
print(f'{sender.name} {payment.action} {recipient.name} {payment.amount} {payment.currency}') | |
if payment.fee is not None: | |
fee_payer = sender | |
if payment.fee.who == RECIPIENT: | |
fee_payer = recipient | |
print( | |
f'Fee of {payment.fee_amount} paid by {payment.fee.who}, {fee_payer.name}') | |
sender.append_payment(payment) | |
recipient.append_payment(payment) | |
fraud_checker.add_payment(payment, sender) | |
def pretty_print(accounts): | |
for account in accounts.values(): | |
print(f'{account.name}') | |
for (currency, value) in account.balances.items(): | |
code = currency_code(currency) | |
print(f'\t{code} {value}') | |
payment = account.last_payment() | |
if payment is not None: | |
print(f'Last payment: ') | |
print(payment) | |
print('-' * 10) | |
# fraud_checker = FraudChecker() | |
accounts = load_balances(accounts=defaultdict(Account)) | |
# pretty_print(accounts) | |
print('-' * 10) | |
payments = load_payments() | |
print('-' * 10) | |
process(accounts, payments) | |
print('-' * 10) | |
pretty_print(accounts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment