Last active
          May 15, 2021 03:36 
        
      - 
      
- 
        Save geertj/81dbb52e10821101a9f5d9ba774ff90d to your computer and use it in GitHub Desktop. 
    Ethereum tax calculator
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python | |
| # | |
| # Store Ethereum transactions to a certain address in a CSV file, with | |
| # associated pricing data. Can be used to understand mining revenue. | |
| import sys | |
| import csv | |
| import time | |
| import json | |
| import bisect | |
| from datetime import datetime | |
| import requests | |
| import pytz | |
| csvname = 'transactions.csv' | |
| txncache = 'transactions.json' | |
| pricecache = 'prices.json' | |
| if len(sys.argv) < 2: | |
| print('Usage: {} <address> [<year>]'.format(sys.argv[0])) | |
| sys.exit(1) | |
| address = sys.argv[1] | |
| year = int(sys.argv[2]) if len(sys.argv) > 2 else None | |
| # Get transactions from etherscan.io. Warning, returns up to | |
| # 10,000 results. Need to use the paging API if you have more. | |
| url = 'http://api.etherscan.io/api?module=account&action=txlist&' \ | |
| 'address={address}&startblock=0&endblock=99999999&sort=asc' | |
| print('Downloading transactions from etherscan.io...') | |
| r = requests.get(url.format(address=address)) | |
| print('=> Status: {}'.format(r.status_code)) | |
| if r.status_code != 200: | |
| print('Error: could not download transactions') | |
| sys.exit(1) | |
| transactions = r.json()['result'] | |
| #transactions = json.load(open(txncache)) | |
| print('=> Downloaded {:,} transactions.'.format(len(transactions))) | |
| with open(txncache, 'w') as fout: | |
| json.dump(transactions, fout, indent=2, sort_keys=True) | |
| # Download historical pricing from etherchain.org. | |
| # Pricing seems to have a 4 hour resolution. | |
| url = 'https://etherchain.org/api/statistics/price' | |
| print('Downloading historical prices from etherchain.org...') | |
| r = requests.get(url) | |
| print('=> Status: {}'.format(r.status_code)) | |
| if r.status_code != 200: | |
| print('Error: could not download historical prices') | |
| sys.exit(1) | |
| prices = r.json()['data'] | |
| #prices = json.load(open(pricecache)) | |
| print('=> Downloaded {:,} historical prices.'.format(len(prices))) | |
| with open(pricecache, 'w') as fout: | |
| json.dump(prices, fout, indent=2, sort_keys=True) | |
| # Store in a sorted list for easier lookup. | |
| # dateitme.strptime() is not able to parse single digit timezones like the 'Z' | |
| # timezone (= UTC) used in our pricing data. And even with supported timezones | |
| # it still creates naive objects. So don't parse the timezone and simply | |
| # replace the timezone with the UTC timezone. | |
| isoformat = '%Y-%m-%dT%H:%M:%S.%fZ' | |
| def iso_to_timestamp(iso): | |
| dt = datetime.strptime(iso, isoformat) | |
| dt = dt.replace(tzinfo=pytz.utc) | |
| return int(dt.timestamp()) | |
| price_tuples = [(iso_to_timestamp(pr['time']), pr['usd']) for pr in prices] | |
| price_tuples.sort() | |
| price_timestamps = [pt[0] for pt in price_tuples] | |
| price_values = [pt[1] for pt in price_tuples] | |
| # Now price the transactions. | |
| stats = [0.0, 0.0] | |
| def price_txn(txn): | |
| ts = int(txn['timeStamp']) | |
| # Use the price at the timestamp shortest after the transaction. | |
| pos = bisect.bisect_right(price_timestamps, ts) | |
| after = price_timestamps[pos] - ts | |
| stats[0] += after | |
| stats[1] = max(stats[1], after) | |
| txn['eth_usd'] = price_values[pos] | |
| print('Pricing transactions...') | |
| for txn in transactions: | |
| price_txn(txn) | |
| stats[0] = int(stats[0] / (len(transactions) * 60)) | |
| stats[1] = int(stats[1] / 60) | |
| print('=> Average price delay {}m, max price delay {}m.'.format(*stats)) | |
| # Convert units. | |
| def timestamp_to_iso(timestamp): | |
| """Convert from seconds since epoch to ISO format.""" | |
| dt = datetime.fromtimestamp(int(timestamp), pytz.utc) | |
| return dt.isoformat() | |
| def wei_to_ether(value): | |
| """Convert Wei to Ether. 1 Ether = 1e18 Wei.""" | |
| value = '0' * 18 + value | |
| return float('{}.{}'.format(value[:-18], value[-18:])) | |
| for txn in transactions: | |
| txn['isodate'] = timestamp_to_iso(txn['timeStamp']) | |
| txn['ether'] = wei_to_ether(txn['value']) | |
| txn['usd'] = txn['ether'] * txn['eth_usd'] | |
| # Filter the year if needed. | |
| def in_year(txn): | |
| return time.gmtime(int(txn['timeStamp'])).tm_year == year | |
| if year is not None: | |
| print('Filtering transactions for year {}...'.format(year)) | |
| transactions = [txn for txn in transactions if in_year(txn)] | |
| print('=> Filtered {} transactions'.format(len(transactions))) | |
| # Output the result in a CSV file, for further spreadsheet processing. | |
| fields = ['hash', 'blockNumber', 'isodate', 'from', 'to', 'ether', 'eth_usd', 'usd'] | |
| def take(txn, fields): | |
| return {key: txn[key] for key in fields if key in txn} | |
| print('Writing CSV file...') | |
| with open('transactions.csv', 'w') as fout: | |
| writer = csv.DictWriter(fout, fields) | |
| writer.writeheader() | |
| for txn in transactions: | |
| writer.writerow(take(txn, fields)) | |
| print('=> Wrote {} transactions to {}.'.format(len(transactions), csvname)) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
            
Hi,
the download from
url = 'https://etherchain.org/api/statistics/price'
does not work any more.
Is there any solution to fix this?