Last active
July 5, 2023 18:41
-
-
Save andynog/67e722dd10007aeee67b481740ddd26d to your computer and use it in GitHub Desktop.
Generate a report about the genesis file size per Cosmos chain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from urllib.parse import urljoin, urlparse | |
import json | |
import shutil | |
import zipfile | |
import gzip | |
import tarfile | |
import requests | |
import os | |
import glob | |
from datetime import datetime | |
from prettytable import PrettyTable | |
x = PrettyTable() | |
def get_chain_info(chain): | |
url = 'https://chains.cosmos.directory/' + chain + '/chain' | |
payload = {} | |
headers = {} | |
response = requests.request('GET', url, headers=headers, data=payload) | |
return response.json() | |
def get_chains(): | |
url = 'https://chains.cosmos.directory/' | |
payload = {} | |
headers = {} | |
response = requests.request('GET', url, headers=headers, data=payload) | |
return response.json() | |
def check_if_downloaded(name): | |
dir_path = os.path.join(os.getcwd(), 'genesis', name) | |
genesis_file_path = os.path.join(dir_path, 'genesis.json') | |
if not os.path.exists(genesis_file_path): | |
return False | |
else: | |
return True | |
def fetch_genesis(genesis_location, name): | |
genesis_file_path = None | |
try: | |
# remove any querystrings or parameters | |
genesis_location = parsed = urljoin(genesis_location, urlparse(genesis_location).path) | |
# check if not raw GitHub location, if so replace with raw location | |
if '/blob/' in genesis_location: | |
genesis_location = genesis_location.replace('/blob/', '/raw/') | |
response = requests.get(genesis_location) | |
except: | |
print('failed to download genesis from', genesis_location, 'for', name, 'chain') | |
return None | |
else: | |
headers = response.headers | |
if response.status_code == 200: | |
if not genesis_location.endswith('.json'): | |
# check type of compression | |
if genesis_location.endswith('.tar.gz'): | |
print('compressed .tar.gz', genesis_location) | |
save_tar_gz_file(name, response.content) | |
elif genesis_location.endswith('.gz'): | |
print('compressed .gz', genesis_location) | |
save_gz_file(name, response.content) | |
elif genesis_location.endswith('.zip'): | |
print('compressed .zip', genesis_location) | |
save_zip_file(name, response.content) | |
else: | |
# check if it's json | |
try: | |
json.loads(response.text) | |
print('other is json', genesis_location) | |
except: | |
print('failed to parse other as json', genesis_location) | |
genesis_file_path = save_genesis_file(name, response.content) | |
else: | |
try: | |
json.loads(response.text) | |
print('json', genesis_location) | |
except: | |
print('failed to parse as json', genesis_location) | |
genesis_file_path = save_genesis_file(name, response.content) | |
else: | |
print('error fetching with http status', response.status_code, 'for chain', chain_name) | |
return genesis_file_path | |
def save_genesis_file(name, content): | |
dir_path = os.path.join(os.getcwd(), 'genesis', name) | |
genesis_file_path = os.path.join(dir_path, 'genesis.json') | |
if not os.path.exists(dir_path): | |
os.makedirs(dir_path) | |
with open(genesis_file_path, 'wb') as f: | |
f.write(content) | |
print('saved genesis for', name) | |
return genesis_file_path | |
def save_zip_file(name, content): | |
dir_path = os.path.join(os.getcwd(), 'compressed', name) | |
compressed_genesis_file_path = os.path.join(dir_path, 'genesis.zip') | |
if not os.path.exists(dir_path): | |
os.makedirs(dir_path) | |
with open(compressed_genesis_file_path, 'wb') as f: | |
f.write(content) | |
with zipfile.ZipFile(compressed_genesis_file_path, 'r') as zip_ref: | |
zip_ref.extractall(dir_path) | |
print('uncompressed zip', compressed_genesis_file_path) | |
unzipped_genesis_file_path = os.path.join(dir_path, 'genesis.json') | |
file_content = open(unzipped_genesis_file_path, "rb") | |
data = file_content.read() | |
save_genesis_file(name, data) | |
print('saved compressed genesis for', name) | |
return compressed_genesis_file_path | |
def save_tar_gz_file(name, content): | |
dir_path = os.path.join(os.getcwd(), 'compressed', name) | |
compressed_genesis_file_path = os.path.join(dir_path, 'genesis.tar.gz') | |
if not os.path.exists(dir_path): | |
os.makedirs(dir_path) | |
with open(compressed_genesis_file_path, 'wb') as f: | |
f.write(content) | |
with tarfile.open(compressed_genesis_file_path, 'r') as tar_file: | |
tar_file.extractall(dir_path) | |
print('uncompressed tar', compressed_genesis_file_path) | |
unzipped_genesis_file_path = os.path.join(dir_path, 'genesis.json') | |
if not os.path.exists(unzipped_genesis_file_path): | |
for genesis in glob.glob(dir_path + "/*genesis.json"): | |
unzipped_genesis_file_path = genesis | |
file_content = open(unzipped_genesis_file_path, "rb") | |
data = file_content.read() | |
save_genesis_file(name, data) | |
print('saved compressed genesis for', name) | |
return compressed_genesis_file_path | |
def save_gz_file(name, content): | |
dir_path = os.path.join(os.getcwd(), 'compressed', name) | |
compressed_genesis_file_path = os.path.join(dir_path, 'genesis.gz') | |
if not os.path.exists(dir_path): | |
os.makedirs(dir_path) | |
with open(compressed_genesis_file_path, 'wb') as f: | |
f.write(content) | |
unzipped_genesis_file_path = os.path.join(dir_path, 'genesis.json') | |
with gzip.open(compressed_genesis_file_path, 'r') as gzip_file, \ | |
open(unzipped_genesis_file_path, 'wb') as unzipped_file: | |
shutil.copyfileobj(gzip_file, unzipped_file) | |
print('uncompressed gz', unzipped_genesis_file_path) | |
file_content = open(unzipped_genesis_file_path, "rb") | |
data = file_content.read() | |
save_genesis_file(name, data) | |
print('saved compressed genesis for', name) | |
return compressed_genesis_file_path | |
def get_file_size(genesis_path): | |
return os.stat(genesis_path).st_size / (1024 * 1024) | |
def get_local_genesis_info(): | |
local_genesis_info = [] | |
genesis_root = os.path.join(os.getcwd(), 'genesis') | |
for root, dirs, files in os.walk(genesis_root): | |
for name in files: | |
file_path = os.path.join(root, name) | |
chain = os.path.basename(root) | |
local_genesis_info.append({'name': chain, 'file': file_path}) | |
return local_genesis_info | |
def print_table(): | |
print('\ngenesis info as of', datetime.now().strftime('%d/%b/%Y, %H:%M:%S')) | |
total_size = 0 | |
num_genesis = 0 | |
local_genesis_info = get_local_genesis_info() | |
x.field_names = ['chain', 'genesis size (MB)'] | |
for gi in local_genesis_info: | |
num_genesis += 1 | |
size = get_file_size(gi['file']) | |
total_size = total_size + size | |
x.add_row([gi['name'], '{:.3f}'.format(size)]) | |
x.align['chain'] = 'l' | |
x.align['genesis size (MB)'] = 'r' | |
x.sortby = 'genesis size (MB)' | |
x.sort_key = lambda row: float(row[0]) | |
x.reversesort = True | |
print(to_markdown_table(x)) | |
# print(pt.get_string(sortby='genesis size (MB)', reversesort=True, sort_key=lambda row: float(row[0]))) | |
avg = total_size / num_genesis | |
print('\naverage genesis size is', '{:.3f}'.format(avg), 'MB across', num_genesis, 'chains\n') | |
def to_markdown_table(pt): | |
_junc = pt.junction_char | |
if _junc != "|": | |
pt.junction_char = "|" | |
markdown = [row[1:-1] for row in pt.get_string().split("\n")[1:-1]] | |
pt.junction_char = _junc | |
return "\n".join(markdown) | |
if __name__ == '__main__': | |
chains_response = get_chains() | |
for chain in chains_response['chains']: | |
chain_name = chain['chain_name'] | |
if not check_if_downloaded(chain_name): | |
chain_info = get_chain_info(chain_name) | |
genesis_url = chain_info['codebase']['genesis']['genesis_url'] | |
if not genesis_url: | |
print('no genesis information for', chain_name, 'chain') | |
else: | |
fetch_genesis(genesis_url, chain_name) | |
else: | |
print('already downloaded genesis for', chain_name) | |
print_table() | |
print('finished!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment