Skip to content

Instantly share code, notes, and snippets.

@lachesis
Last active March 30, 2022 01:15
Show Gist options
  • Save lachesis/bb69fca009a9dcebcf83ba815b7dddb5 to your computer and use it in GitHub Desktop.
Save lachesis/bb69fca009a9dcebcf83ba815b7dddb5 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# usage: python3 dump_cosmos_staking_rewards.py < address_list.txt > staking_rewards.tsv
import decimal
import fileinput
import collections
import re
import sys
import requests
import datetime
import json
import os
NETWORKS = [
('cosmos', 'cosmos'),
('osmo', 'osmosis'),
('akash', 'akash'),
('regen', 'regen'),
('juno', 'juno'),
('kava', 'kava'),
('stars', 'stargaze'),
('secret', 'secret'),
]
MILLION = decimal.Decimal(1000000)
def try_cache(key, func):
# tries to load json from cache from $CACHE/key if $CACHE is defined
# if not, runs func() and saves its result to cache
try:
cache_root = os.getenv('CACHE')
os.makedirs(cache_root, exist_ok=True)
cache_path = os.path.join(cache_root, key)
with open(cache_path, 'r') as inp:
js = json.load(inp)
# any cache problem? carry on without
except Exception:
# run the slow net func
js = func()
# save it maybe
try:
with open(cache_path, 'w') as out:
json.dump(js, out)
except Exception:
pass
return js
def unix_to_dt(unixtime):
return datetime.datetime(1970, 1, 1, 0, 0, 0) + datetime.timedelta(0, unixtime)
def iso_to_dt(isotime):
return datetime.datetime.fromisoformat(isotime.replace('Z', ''))
def get_cosmos_account_staking_rewards(addr, convert_balances=True):
# Find network
for prefix, net in NETWORKS:
if addr.startswith(prefix):
network = net
break
else:
raise ValueError("unknown network: " + addr)
# fetch all transaction logs
url = 'https://api{}.cosmostation.io/v1/account/new_txs/{}'.format(('-'+network) if network != 'cosmos' else '', addr)
def fetch_all_txns():
all_txns = []
PAGE_SIZE = 50
MAX_TXNS = 1000
for i in range(MAX_TXNS // PAGE_SIZE):
resp = requests.get(url, params={'from': i * PAGE_SIZE, 'limit': PAGE_SIZE}, timeout=31)
resp.raise_for_status()
js = resp.json()
all_txns.extend(js)
if not js:
break
return all_txns
all_txns = try_cache('all-txns-{}.json'.format(network), fetch_all_txns)
unique_units = set()
out = []
for txn in all_txns:
ts = txn['header']['timestamp']
blk = txn['header']['block_id']
events = [e for l in txn['data']['logs'] for e in l['events']]
ctr = collections.Counter()
intermed = []
for evt in events:
if evt['type'] == 'withdraw_rewards':
unit = None
amount = None
valdator = None
for attrib in evt['attributes']:
if attrib['key'] == 'amount':
amount = attrib['value']
if attrib['key'] == 'validator':
validator = attrib['value']
if amount and validator:
m = re.match(r'(\d+)(\w+)', amount)
if m:
amount = decimal.Decimal(m.group(1))
unit = m.group(2)
if convert_balances and unit and unit.startswith('u'):
unit = unit[1:]
amount /= MILLION
intermed.append({"validator": validator, "amount": amount, "unit": unit})
ctr[unit] += amount
unique_units.add(unit)
if intermed:
out.append({
"address": addr,
"timestamp": ts,
"block_id": blk,
"token_amounts": dict(ctr),
"individual": intermed,
"total_unit": ctr.most_common()[0][0] if len(ctr) == 1 else None,
"total_amount": ctr.most_common()[0][1] if len(ctr) == 1 else None,
})
prices = {}
for unit in unique_units:
try:
prices.update(get_historical_prices(unit))
except Exception:
pass
return out, prices
def get_historical_prices(symbol):
def get_prices():
resp = requests.get('https://api-osmosis.imperator.co/tokens/v2/historical/{symbol}/chart?tf=1440'.format(symbol=symbol), timeout=61)
resp.raise_for_status()
return resp.json()
#[{"time":1624406400,"close":5.4010989774,"high":5.4141295587,"low":5.0003632977,"open":5.0003632977,"volume":null},...]
js = try_cache('prices-{}.json'.format(symbol), get_prices)
# { (symbol, date): {open,close,low,high,time,volume} }
out = {}
for rec in js:
date = unix_to_dt(rec['time']).date()
out[(symbol, date)] = rec
return out
def build_cosmos_staking_rewards_tsv(txns, historical_prices, header=False):
out = []
if header:
out.append("timestamp\tblock_id\tsymbol\tamount\tprice low\tprice high\tprice open\tprice close\tvalidator\treceiving address") # header
for txn in txns:
for ind in txn['individual']:
txn_date = iso_to_dt(txn['timestamp']).date()
price = historical_prices.get( (ind['unit'], txn_date) )
out.append("\t".join(str(x) for x in [
txn['timestamp'],
txn['block_id'],
ind['unit'],
ind['amount'],
price and price['low'],
price and price['high'],
price and price['open'],
price and price['close'],
ind['validator'],
txn['address'],
]))
return "\n".join(out)
def get_all_cosmos_network_staking_rewards():
is_first = True
csv = []
for line in fileinput.input():
addr = line.strip()
if any(addr.startswith(prefix) for prefix,_ in NETWORKS):
print("Getting reward transactions for", addr, file=sys.stderr)
try:
txns, historical_prices = get_cosmos_account_staking_rewards(addr)
csv.append(build_cosmos_staking_rewards_tsv(txns, historical_prices, header=is_first))
is_first = False
except Exception as e:
print("Error:", repr(e), file=sys.stderr)
return "\n".join(csv)
def main():
print("Supply a list of cosmos addresses on stdin...", file=sys.stderr)
print(get_all_cosmos_network_staking_rewards())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment