Created
May 26, 2022 17:56
-
-
Save lastorset/787a872c841d89ff9c35732759c6236a to your computer and use it in GitHub Desktop.
Convert Firi CSV to Koinly CSV
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Install typer and arrow to use this | |
from dataclasses import dataclass | |
from datetime import datetime | |
from decimal import Decimal | |
import csv | |
import arrow | |
from typing import Sequence, Dict | |
import typer | |
app = typer.Typer() | |
KOINLY_FIELD_NAMES = ( | |
'Date', | |
'Sent Amount', | |
'Sent Currency', | |
'Received Amount', | |
"Received Currency", | |
"Fee Amount", | |
"Fee Currency", | |
"Net Worth Amount", | |
"Net Worth Currency", | |
"Label", | |
"Description", | |
"TxHash", | |
) | |
@dataclass | |
class Money: | |
amount: Decimal | |
currency: str | |
def __bool__(self): | |
return bool(self.amount) | |
def __str__(self): | |
return f"{str(self.amount)} {self.currency}" | |
def __neg__(self): | |
return Money(-self.amount, self.currency) | |
@dataclass | |
class KoinlyRow: | |
date: datetime | |
sent: Money = None | |
received: Money = None | |
fee: Money = None | |
description: str = None | |
label: str = None | |
tx_hash: str = None | |
def to_dict(self): | |
return { | |
'Date': self.date.isoformat(), | |
'Sent Amount': self.sent and self.sent.amount, | |
'Sent Currency': self.sent and self.sent.currency, | |
'Received Amount': self.received and self.received.amount, | |
"Received Currency": self.received and self.received.currency, | |
"Fee Amount": self.fee and self.fee.amount, | |
"Fee Currency": self.fee and self.fee.currency, | |
"Net Worth Amount": None, | |
"Net Worth Currency": None, | |
"Label": self.label, | |
"Description": self.description, | |
"TxHash": self.tx_hash, | |
} | |
def __str__(self): | |
return f'{self.date.isoformat()} "{self.description}" sent: {self.sent}, received: {self.received}, fee: {self.fee}' | |
@dataclass | |
class FiriRow: | |
tx_id: str | |
match_id: str | |
withdraw_id: str | |
deposit_id: str | |
withdraw_address: str | |
withdraw_txid: str | |
deposit_address: str | |
deposit_txid: str | |
action: str | |
amount: Money | |
created_at: datetime | |
@classmethod | |
def from_dict(cls, i): | |
row_datetime = arrow.get( | |
i['Created at'].replace(' (Coordinated Universal Time)', ''), 'ddd MMM DD YYYY HH:mm:ss [GMT]Z').datetime | |
return FiriRow( | |
tx_id=i['Transaction ID'], | |
match_id=i['Match ID'], | |
withdraw_id=i['Withdraw ID'], | |
deposit_id=i['Deposit ID'], | |
withdraw_address=i['Withdraw address'], | |
withdraw_txid=i['Withdraw transaction ID'], | |
deposit_address=i['Deposit address'], | |
deposit_txid=i['Deposit transaction ID'], | |
action=i['Action'], | |
amount=Money(normalize_fraction(Decimal(i['Amount'])), i['Currency']), | |
created_at=row_datetime, | |
) | |
# https://stackoverflow.com/a/11227743 | |
def normalize_fraction(d): | |
normalized = d.normalize() | |
sign, digit, exponent = normalized.as_tuple() | |
return normalized if exponent <= 0 else normalized.quantize(1) | |
def firi2koinly(input_filename: str) -> Sequence[KoinlyRow]: | |
output_rows: Dict[str, KoinlyRow] = dict() | |
with open(input_filename) as input_file: | |
reader = csv.DictReader(input_file, restkey=None) | |
for i in reader: | |
i = FiriRow.from_dict(i) | |
row_id = i.match_id or i.withdraw_id or i.deposit_id or i.tx_id | |
o = output_rows.get(row_id) or KoinlyRow(date=i.created_at) | |
if i.action == 'Match': | |
if i.amount.amount >= 0: | |
assert not o.received | |
o.received = i.amount | |
else: | |
assert not o.sent | |
o.sent = -i.amount | |
o.description = i.action | |
elif i.action in ('MatchFee', 'WithdrawFee'): | |
assert not o.fee | |
o.fee = -i.amount | |
elif i.action in ('BankDeposit', 'Bonus', 'FeebackBonus'): | |
assert not o.received | |
o.received = i.amount | |
o.description = i.action | |
o.tx_hash = i.deposit_txid | |
elif i.action in ('Withdraw'): | |
assert not o.sent | |
o.sent = -i.amount | |
o.description = i.action | |
o.tx_hash = i.withdraw_txid | |
else: | |
typer.echo(f'warning: no match for {i.action} {row_id}') | |
continue | |
if "Bonus" in i.action: | |
o.label = 'reward' | |
output_rows[row_id] = o | |
# for txid, row in output_rows.items(): | |
# typer.echo(f"{row}") | |
return output_rows.values() | |
def make_output_filename(input_filename: str) -> str: | |
""" | |
>>> make_output_filename('firi.csv') | |
'firi.koinly.csv' | |
>>> make_output_filename('firi') | |
'firi.koinly.csv' | |
>>> make_output_filename('firi.xls') | |
'firi.xls.koinly.csv' | |
""" | |
if input_filename.endswith('.csv'): | |
input_basename, _sep, _tail = input_filename.rpartition('.csv') | |
else: | |
input_basename = input_filename | |
output_filename = f"{input_basename}.koinly.csv" | |
return output_filename | |
@app.command() | |
def firi(input_filename: str, output_filename: str = None): | |
output_filename = output_filename or make_output_filename(input_filename) | |
rows: Sequence[KoinlyRow] = firi2koinly(input_filename) | |
with open(output_filename, 'w') as output_file: | |
writer = csv.DictWriter(output_file, KOINLY_FIELD_NAMES) | |
writer.writeheader() | |
for o in rows: | |
writer.writerow(o.to_dict()) | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment