Last active
March 30, 2022 01:15
-
-
Save lachesis/bb69fca009a9dcebcf83ba815b7dddb5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# usage: python3 dump_cosmos_staking_rewards.py < address_list.txt > staking_rewards.tsv | |
import decimal | |
import fileinput | |
import collections | |
import re | |
import sys | |
import requests | |
import datetime | |
import json | |
import os | |
NETWORKS = [ | |
('cosmos', 'cosmos'), | |
('osmo', 'osmosis'), | |
('akash', 'akash'), | |
('regen', 'regen'), | |
('juno', 'juno'), | |
('kava', 'kava'), | |
('stars', 'stargaze'), | |
('secret', 'secret'), | |
] | |
MILLION = decimal.Decimal(1000000) | |
def try_cache(key, func): | |
# tries to load json from cache from $CACHE/key if $CACHE is defined | |
# if not, runs func() and saves its result to cache | |
try: | |
cache_root = os.getenv('CACHE') | |
os.makedirs(cache_root, exist_ok=True) | |
cache_path = os.path.join(cache_root, key) | |
with open(cache_path, 'r') as inp: | |
js = json.load(inp) | |
# any cache problem? carry on without | |
except Exception: | |
# run the slow net func | |
js = func() | |
# save it maybe | |
try: | |
with open(cache_path, 'w') as out: | |
json.dump(js, out) | |
except Exception: | |
pass | |
return js | |
def unix_to_dt(unixtime): | |
return datetime.datetime(1970, 1, 1, 0, 0, 0) + datetime.timedelta(0, unixtime) | |
def iso_to_dt(isotime): | |
return datetime.datetime.fromisoformat(isotime.replace('Z', '')) | |
def get_cosmos_account_staking_rewards(addr, convert_balances=True): | |
# Find network | |
for prefix, net in NETWORKS: | |
if addr.startswith(prefix): | |
network = net | |
break | |
else: | |
raise ValueError("unknown network: " + addr) | |
# fetch all transaction logs | |
url = 'https://api{}.cosmostation.io/v1/account/new_txs/{}'.format(('-'+network) if network != 'cosmos' else '', addr) | |
def fetch_all_txns(): | |
all_txns = [] | |
PAGE_SIZE = 50 | |
MAX_TXNS = 1000 | |
for i in range(MAX_TXNS // PAGE_SIZE): | |
resp = requests.get(url, params={'from': i * PAGE_SIZE, 'limit': PAGE_SIZE}, timeout=31) | |
resp.raise_for_status() | |
js = resp.json() | |
all_txns.extend(js) | |
if not js: | |
break | |
return all_txns | |
all_txns = try_cache('all-txns-{}.json'.format(network), fetch_all_txns) | |
unique_units = set() | |
out = [] | |
for txn in all_txns: | |
ts = txn['header']['timestamp'] | |
blk = txn['header']['block_id'] | |
events = [e for l in txn['data']['logs'] for e in l['events']] | |
ctr = collections.Counter() | |
intermed = [] | |
for evt in events: | |
if evt['type'] == 'withdraw_rewards': | |
unit = None | |
amount = None | |
valdator = None | |
for attrib in evt['attributes']: | |
if attrib['key'] == 'amount': | |
amount = attrib['value'] | |
if attrib['key'] == 'validator': | |
validator = attrib['value'] | |
if amount and validator: | |
m = re.match(r'(\d+)(\w+)', amount) | |
if m: | |
amount = decimal.Decimal(m.group(1)) | |
unit = m.group(2) | |
if convert_balances and unit and unit.startswith('u'): | |
unit = unit[1:] | |
amount /= MILLION | |
intermed.append({"validator": validator, "amount": amount, "unit": unit}) | |
ctr[unit] += amount | |
unique_units.add(unit) | |
if intermed: | |
out.append({ | |
"address": addr, | |
"timestamp": ts, | |
"block_id": blk, | |
"token_amounts": dict(ctr), | |
"individual": intermed, | |
"total_unit": ctr.most_common()[0][0] if len(ctr) == 1 else None, | |
"total_amount": ctr.most_common()[0][1] if len(ctr) == 1 else None, | |
}) | |
prices = {} | |
for unit in unique_units: | |
try: | |
prices.update(get_historical_prices(unit)) | |
except Exception: | |
pass | |
return out, prices | |
def get_historical_prices(symbol): | |
def get_prices(): | |
resp = requests.get('https://api-osmosis.imperator.co/tokens/v2/historical/{symbol}/chart?tf=1440'.format(symbol=symbol), timeout=61) | |
resp.raise_for_status() | |
return resp.json() | |
#[{"time":1624406400,"close":5.4010989774,"high":5.4141295587,"low":5.0003632977,"open":5.0003632977,"volume":null},...] | |
js = try_cache('prices-{}.json'.format(symbol), get_prices) | |
# { (symbol, date): {open,close,low,high,time,volume} } | |
out = {} | |
for rec in js: | |
date = unix_to_dt(rec['time']).date() | |
out[(symbol, date)] = rec | |
return out | |
def build_cosmos_staking_rewards_tsv(txns, historical_prices, header=False): | |
out = [] | |
if header: | |
out.append("timestamp\tblock_id\tsymbol\tamount\tprice low\tprice high\tprice open\tprice close\tvalidator\treceiving address") # header | |
for txn in txns: | |
for ind in txn['individual']: | |
txn_date = iso_to_dt(txn['timestamp']).date() | |
price = historical_prices.get( (ind['unit'], txn_date) ) | |
out.append("\t".join(str(x) for x in [ | |
txn['timestamp'], | |
txn['block_id'], | |
ind['unit'], | |
ind['amount'], | |
price and price['low'], | |
price and price['high'], | |
price and price['open'], | |
price and price['close'], | |
ind['validator'], | |
txn['address'], | |
])) | |
return "\n".join(out) | |
def get_all_cosmos_network_staking_rewards(): | |
is_first = True | |
csv = [] | |
for line in fileinput.input(): | |
addr = line.strip() | |
if any(addr.startswith(prefix) for prefix,_ in NETWORKS): | |
print("Getting reward transactions for", addr, file=sys.stderr) | |
try: | |
txns, historical_prices = get_cosmos_account_staking_rewards(addr) | |
csv.append(build_cosmos_staking_rewards_tsv(txns, historical_prices, header=is_first)) | |
is_first = False | |
except Exception as e: | |
print("Error:", repr(e), file=sys.stderr) | |
return "\n".join(csv) | |
def main(): | |
print("Supply a list of cosmos addresses on stdin...", file=sys.stderr) | |
print(get_all_cosmos_network_staking_rewards()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment