Last active
April 14, 2021 14:23
-
-
Save MosheStauber/5cf83310f5b33acd8ac352f9d3f37843 to your computer and use it in GitHub Desktop.
ercxxx_distribution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
from collections import defaultdict | |
from json import JSONDecodeError | |
from pprint import pprint | |
from typing import Dict, Union, List | |
import requests | |
from bs4 import BeautifulSoup | |
def get_top_tokens(num_pages: int = 10) -> List[Dict[str, Union[int, str]]]: | |
""" | |
Gets the tokens listed on the first `num_pages` pages of "https://etherscan.io/tokens?p=page_number" | |
:param num_pages: the number of pages of tokens | |
:return: list of objects in the following format: | |
tokens = [ | |
{"name": <token name>, "rank": <rank on etherscan>, "address": <token address>} | |
] | |
""" | |
url = "https://etherscan.io/tokens?p={}" | |
tokens = [] | |
for i in range(num_pages): | |
print(f"Getting tokens {(i*50)} - {(i*50) + 50}") | |
response = requests.get(url.format(i+1)) | |
if response.ok: | |
soup = BeautifulSoup(response.text, 'html.parser') | |
rows = soup.find('tbody') | |
if rows: | |
for j, tr in enumerate(rows.find_all('tr')): | |
# all token info is in the a tag | |
token = tr.find('a') | |
tokens.append({'name': token.text, 'rank': i*50 + j + 1, 'address': token['href'].split('/')[-1]}) | |
else: | |
print("No data") | |
print(f"Got {len(tokens)} tokens") | |
return tokens | |
def get_abi(address: str) -> str: | |
""" | |
Gets the abi for the `address` using etherscan.io api. | |
Assumes environment variable `API_KEY` exists. | |
:param address: the contract address for abi | |
:return: abi object json string or empty string | |
""" | |
url = f"https://api.etherscan.io/api?module=contract&action=getabi&address={address}&apikey=" + os.environ["API_KEY"] | |
print(f'Getting {url}') | |
r = requests.get(url) | |
if r.ok: | |
body = r.json() | |
if body['message'] == "OK": | |
return body['result'] | |
else: | |
print(body['message']) | |
return "" | |
else: | |
print(f'Bad request: {r}') | |
return "" | |
def download_abis(download_dir, num_top_tokens=500): | |
# download the top tokens | |
top_tokens = get_top_tokens(num_pages=num_top_tokens//50) | |
# save the top tokens locally | |
with open(f'top{len(top_tokens)}_tokens.json', 'w', encoding='utf-8') as f: | |
json.dump(top_tokens, f) | |
try: | |
os.mkdir(download_dir) | |
except FileExistsError: | |
print("Not creating download folder") | |
for token_info in top_tokens: | |
token_address = token_info['address'] | |
file_name = os.path.join(download_dir, f"{token_info['name'].replace(' ', '_')}__{token_info['rank']}") | |
if os.path.exists(file_name): | |
print("abi already downloaded") | |
continue | |
print(f"Getting {token_info['name']}@{token_address}") | |
downloaded_abi = get_abi(token_address) | |
if downloaded_abi: | |
with open(file_name, 'w', encoding='utf-8') as f: | |
f.write(downloaded_abi) | |
def get_distributions(abi_files: List): | |
func_counter = defaultdict(int) | |
erc_distributions = defaultdict(list) | |
for abi_file in abi_files: | |
with open(os.path.join(abi_folder, abi_file), 'r', encoding='utf-8') as f: | |
try: | |
abi = json.load(f) | |
except JSONDecodeError as e: | |
print(f'Failed reading abi: {abi_file}', e) | |
for func in abi: | |
if 'name' in func and func['type'] != 'event': | |
name = func['name'].lower() | |
func_counter[name] += 1 | |
if name == 'transfer': | |
# erc223 has function transfer(address _to, uint _value, bytes _data) returns (bool success) | |
if len(func['inputs']) > 2: | |
erc_distributions['erc223 - transferwithbytes'].append(abi_file) | |
elif name == 'transferandcall': | |
# erc677 has function transferAndCall(address receiver, uint amount, bytes data) returns (bool success) | |
erc_distributions['erc677 - transferandcall'].append(abi_file) | |
elif name == 'approveandcall': | |
# non standard but a bunch implement function approveAndCall(address _recipient, uint256 _value, bytes _extraData) | |
erc_distributions['non-standard - approveandcall'].append(abi_file) | |
elif name == 'send': | |
# erc223 has function transfer(address _to, uint _value, bytes _data) returns (bool success) | |
if len(func['inputs']) > 2: | |
erc_distributions['erc777 - send'].append(abi_file) | |
if is_erc20(abi): | |
erc_distributions['erc20compatible'].append(abi_file) | |
with open('function_list.json', 'w', encoding='utf-8') as f: | |
json.dump(func_counter, f) | |
pprint(erc_distributions) | |
return erc_distributions | |
def is_erc20(abi: List) -> bool: | |
has_approve = False | |
has_transferfrom = False | |
for func in abi: | |
if 'name' in func and func['type'] != 'event': | |
name = func['name'].lower() | |
if name == 'transferfrom': | |
has_transferfrom = True | |
if name == "approve": | |
has_approve = True | |
return has_approve and has_transferfrom | |
if __name__ == "__main__": | |
# folder to download the abis for cache | |
abi_folder = "contract_abis" | |
# comment line if using cached abis | |
# download_abis(abi_folder, num_top_tokens=1000) | |
contract_abis = os.listdir(abi_folder) | |
distributions = get_distributions(contract_abis) | |
for erc, tokens in distributions.items(): | |
print(f'{erc}: {len(tokens)}') | |
approveandcall_in_erc20 = [o for o in distributions["non-standard - approveandcall"] if o in distributions["erc20compatible"]] | |
print(f'{len(approveandcall_in_erc20)} approveandcall tokens are erc20compatible') | |
print(f'Scanned {len(contract_abis)} contracts') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Still working like a charm, thank you for the script!
For anyone else trying, here's my command-line session for running it:
you also need to change line 8
import requests
to:This gets around Cloudflare block on
requests
. Also uncomment line 152, then justand cloudscraper goes brrrrrrr