Created
August 16, 2024 18:44
-
-
Save choutianxius/e9e72df2d0471a6d0d24c6f3156c4020 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
A simple in-memory banking system which supports: | |
- Create accounts | |
- Deposit money | |
- Transfer money from one account to another | |
- Make payments, which adds 2% cashback after one day | |
- Get payment status at current timestamp | |
- Get balance at certain timestamp | |
- Merge accounts | |
- Get the most active accounts in terms of spending money | |
(Not thoroughly tested) | |
''' | |
class BankingSystem: | |
def __init__(self): | |
self.balance = {} | |
self.balance_history = {} | |
self.spent = {} | |
self.payment = [] | |
self.processed_payment_count = 0 | |
def create_account(self, timestamp: int, account_id: str) -> bool: | |
if account_id in self.balance: | |
return False | |
self.balance[account_id] = 0 | |
self.balance_history[account_id] = [(timestamp, 0)] | |
self.spent[account_id] = 0 | |
return True | |
def _add_balance_history(self, timestamp: int, account_id: str) -> None: | |
self.balance_history[account_id].append( | |
(timestamp, self.balance[account_id])) | |
def _process_cashbacks(self, timestamp: int) -> None: | |
while self.processed_payment_count < len(self.payment): | |
t, account_id, amount = self.payment[self.processed_payment_count] | |
MILLISECONDS_IN_ONE_DAY = 86400000 | |
CASHBACK_RATE = 0.02 | |
if t + MILLISECONDS_IN_ONE_DAY > timestamp: | |
break | |
self.processed_payment_count += 1 | |
self.balance[account_id] += int(amount * CASHBACK_RATE) | |
self._add_balance_history(t + MILLISECONDS_IN_ONE_DAY, account_id) | |
def deposit(self, timestamp: int, account_id: str, amount: int) -> None | int: | |
self._process_cashbacks(timestamp) | |
if account_id not in self.balance: | |
return None | |
self.balance[account_id] += amount | |
self._add_balance_history(timestamp, account_id) | |
return self.balance[account_id] | |
def transfer( | |
self, | |
timestamp: int, | |
source_account_id: str, | |
target_account_id: str, | |
amount: int, | |
) -> None | int: | |
self._process_cashbacks(timestamp) | |
if ( | |
source_account_id not in self.balance | |
or target_account_id not in self.balance | |
): | |
return None | |
if source_account_id == target_account_id: | |
return None | |
if self.balance[source_account_id] < amount: | |
return None | |
self.balance[source_account_id] -= amount | |
self.balance[target_account_id] += amount | |
self._add_balance_history(timestamp, source_account_id) | |
self._add_balance_history(timestamp, target_account_id) | |
self.spent[source_account_id] += amount | |
def pay(self, timestamp: int, account_id: str, amount: int) -> None | str: | |
self._process_cashbacks(timestamp) | |
if account_id not in self.balance: | |
return None | |
if self.balance[account_id] < amount: | |
return None | |
self.balance[account_id] -= amount | |
self._add_balance_history(timestamp, account_id) | |
self.payment.append([timestamp, account_id, amount]) | |
self.spent[account_id] += amount | |
def merge_accounts(self, timestamp: int, account_id_1: str, account_id_2: str) -> bool: | |
self._process_cashbacks(timestamp) | |
if account_id_1 not in self.balance or account_id_2 not in self.balance: | |
return False | |
if account_id_1 == account_id_2: | |
return False | |
# merge balance | |
self.balance[account_id_1] += self.balance[account_id_2] | |
del self.balance[account_id_2] | |
# merge spent | |
self.spent[account_id_1] += self.spent[account_id_2] | |
del self.spent[account_id_2] | |
# update payment | |
for i in range(len(self.payment)): | |
if self.payment[i][1] == account_id_2: | |
self.payment[i][1] = account_id_1 | |
# update balance history | |
new_history = [] | |
curr_balance_1, curr_balance_2, curr_t = 0, 0, 0 | |
while self.balance_history[account_id_1] and self.balance_history[account_id_2]: | |
if self.balance_history[account_id_1][0] < self.balance_history[account_id_2][0]: | |
curr_t = self.balance_history[account_id_1][0][0] | |
curr_balance_1 = self.balance_history[account_id_1][0][1] | |
self.balance_history[account_id_1].pop(0) | |
else: | |
curr_t = self.balance_history[account_id_2][0][0] | |
curr_balance_2 = self.balance_history[account_id_2][0][1] | |
self.balance_history[account_id_2].pop(0) | |
new_history.append((curr_t, curr_balance_1 + curr_balance_2)) | |
if self.balance_history[account_id_1]: | |
for t1, balance1 in self.balance_history[account_id_1]: | |
new_history.append((t1, balance1 + curr_balance_2)) | |
else: | |
for t2, balance2 in self.balance_history[account_id_2]: | |
new_history.append((t2, balance2 + curr_balance_1)) | |
self.balance_history[account_id_1] = new_history | |
del self.balance_history[account_id_2] | |
self._add_balance_history(timestamp, account_id_1) | |
def top_outgoing_accounts(self, timestamp: int, n: int) -> list[str]: | |
self._process_cashbacks(timestamp) | |
data = {} | |
for account_id, spent in self.spent.items(): | |
data[spent] = data.get(spent, []) + [account_id] | |
ans = [] | |
for spent in sorted(data.keys(), reverse=True): | |
for account_id in sorted[data[spent]]: | |
ans.append(f'{account_id}({spent})') | |
if len(ans) == n: | |
return ans | |
return ans | |
def get_payment_status(self, timestamp: int, account_id: str, payment: str) -> None | str: | |
self._process_cashbacks(timestamp) | |
payment_id = int(payment[7:]) - 1 | |
if payment_id >= len(self.payment): | |
return None | |
t, payment_account_id, amount = self.payment[payment_id] | |
if payment_account_id != account_id: | |
return None | |
if payment_id < self.processed_payment_count: | |
return 'PROCESSED' | |
return 'PENDING' | |
def get_balance(self, timestamp: int, account_id: str, time_at: int) -> None | int: | |
self._process_cashbacks(timestamp) | |
if account_id not in self.balance_history: | |
return None | |
# account doesn't exist yet | |
if time_at < self.balance_history[account_id][0][0]: | |
return None | |
for t, balance in reversed(self.balance_history[account_id]): | |
if time_at >= t: | |
return balance | |
# test | |
if __name__ == '__main__': | |
banking_system = BankingSystem() | |
banking_system.create_account(1, 'account1') | |
banking_system.create_account(2, 'account2') | |
banking_system.deposit(3, 'account1', 1000) | |
banking_system.deposit(4, 'account2', 2000) | |
banking_system.transfer(5, 'account2', 'account1', 500) | |
banking_system.pay(6, 'account2', 1000) | |
print(banking_system.get_payment_status(7, 'account2', 'payment1')) | |
print(banking_system.get_payment_status( | |
86400000 + 9, 'account2', 'payment1')) | |
banking_system.merge_accounts(86400000 + 10, 'account1', 'account2') | |
print(banking_system.get_balance(86400000 + 11, 'account2', 1)) | |
print(banking_system.get_balance(86400000 + 12, 'account1', 3)) | |
print(banking_system.get_balance(86400000 + 13, 'account1', 4)) | |
print(banking_system.get_balance(86400000 + 14, 'account1', 5)) | |
print(banking_system.get_balance(86400000 + 15, 'account1', 6)) | |
print(banking_system.get_balance(86400000 + 16, 'account1', 86400000 + 11)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment