Last active
October 4, 2023 14:26
-
-
Save obycode/42e7906bc3bd61946564df652cab9574 to your computer and use it in GitHub Desktop.
Check the fullness of a block
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import argparse | |
from datetime import datetime, timedelta | |
from termcolor import colored | |
def time_ago(epoch_timestamp, current_time=None): | |
timestamp = datetime.utcfromtimestamp( | |
epoch_timestamp | |
) # Convert epoch to UTC datetime | |
# Use datetime.fromtimestamp(epoch_timestamp) for local time | |
if current_time is None: | |
current_time = ( | |
datetime.utcnow() | |
) # Use UTC time; use datetime.now() for local time | |
delta = current_time - timestamp | |
if delta.days >= 1: | |
# If it's been more than a day, return the full date and time | |
return timestamp.strftime("%Y-%m-%d %H:%M:%S") | |
elif delta.seconds < 60: | |
# If it's been less than a minute, return the seconds | |
return f"{delta.seconds} seconds ago" | |
elif delta.seconds < 3600: | |
# If it's been less than an hour, return the minutes | |
minutes = delta.seconds // 60 | |
return f"{minutes} minutes ago" | |
else: | |
# If it's been less than a day, return the hours | |
hours = delta.seconds // 3600 | |
return f"{hours} hours ago" | |
def get_block(block_height): | |
url = f"https://api.mainnet.hiro.so/extended/v1/block/by_height/{block_height}" | |
response = requests.get(url) | |
if response.status_code != 200: | |
raise Exception(f"Failed to retrieve data: {response.text}") | |
data = response.json() | |
return data | |
def get_costs(x): | |
# Filter out keys that don't begin with 'execution_cost_' | |
return { | |
k.removeprefix("execution_cost_"): v | |
for k, v in x.items() | |
if k.startswith("execution_cost_") | |
} | |
def get_tip_height(): | |
url = "https://api.mainnet.hiro.so/v2/info" | |
response = requests.get(url) | |
if response.status_code != 200: | |
raise Exception(f"Failed to retrieve data: {response.text}") | |
data = response.json() | |
return data["stacks_tip_height"] | |
block_limits = { | |
"read_count": 15000, | |
"read_length": 100000000, | |
"runtime": 5000000000, | |
"write_count": 15000, | |
"write_length": 15000000, | |
} | |
def get_all_transactions(block_height): | |
url = f"https://api.mainnet.hiro.so/extended/v1/tx/block_height/{block_height}" | |
offset = 0 | |
limit = 20 # The default limit as per the provided object structure | |
all_transactions = [] | |
while True: | |
params = {"offset": offset, "limit": limit} | |
response = requests.get(url, params=params) | |
if response.status_code != 200: | |
raise Exception(f"Failed to retrieve data: {response.text}") | |
data = response.json() | |
all_transactions.extend(data["results"]) | |
# Check if there are more pages | |
if offset + limit < data["total"]: | |
offset += limit | |
else: | |
break | |
return all_transactions | |
def get_total_fees(transactions): | |
return sum(int(transaction["fee_rate"]) for transaction in transactions) | |
def get_total_costs(transactions): | |
block_txs = 0 | |
block_costs = { | |
"read_count": 0, | |
"read_length": 0, | |
"runtime": 0, | |
"write_count": 0, | |
"write_length": 0, | |
} | |
microblock_txs = 0 | |
microblock_costs = { | |
"read_count": 0, | |
"read_length": 0, | |
"runtime": 0, | |
"write_count": 0, | |
"write_length": 0, | |
} | |
for transaction in transactions: | |
# This tx is in the anchor block | |
if transaction["microblock_hash"] == "0x": | |
block_txs += 1 | |
costs = get_costs(transaction) | |
for key, value in costs.items(): | |
block_costs[key] += value | |
else: | |
microblock_txs += 1 | |
costs = get_costs(transaction) | |
for key, value in costs.items(): | |
microblock_costs[key] += value | |
return { | |
"block_txs": block_txs, | |
"block_costs": block_costs, | |
"microblock_txs": microblock_txs, | |
"microblock_costs": microblock_costs, | |
} | |
def print_block_fullness(costs, percentages, color=None): | |
max_label_length = max(len(cost_type) for cost_type in costs.keys()) | |
# Print header | |
print( | |
colored( | |
f"\n{'Cost Type'.ljust(max_label_length)} {'Cost Limit':>12} {'Total Cost':>12} {'%':>7}", | |
color, | |
) | |
) | |
print( | |
colored( | |
f"{'---------'.ljust(max_label_length)} {'----------':>12} {'----------':>12} {'-----':>7}", | |
color, | |
) | |
) | |
# Print data | |
for cost_type in block_limits.keys(): | |
print( | |
colored( | |
f"{cost_type.ljust(max_label_length)} {block_limits[cost_type]:>12} {costs[cost_type]:>12} {percentages[cost_type]:>7.2f}", | |
color, | |
) | |
) | |
def get_cost_percentages(costs): | |
return { | |
cost_type: (value / block_limits[cost_type]) * 100 | |
for cost_type, value in costs.items() | |
} | |
def print_block_fullness_bars(percentages, color=None): | |
max_bar_length = 50 | |
max_label_length = max(len(cost_type) for cost_type in percentages.keys()) | |
for cost_type, percentage in percentages.items(): | |
bar_length = int((percentage / 100) * max_bar_length) | |
bar = "#" * bar_length | |
label = cost_type.replace("execution_cost_", "").ljust(max_label_length) | |
print(colored(f"{label} [{bar:<{max_bar_length}}] {percentage:.2f}%", color)) | |
def print_fee_summary(total_fees, percentages): | |
# There is not a clear way to do this, given the 5 dimensional cost model | |
# Take the highest dimension of the cost model and use that as the total cost. | |
# The fee rate is then the total fees divided by the total cost, so the units | |
# are uSTX per % of the block. | |
highest_cost = max(percentages.items(), key=lambda x: x[1])[1] | |
fee_rate = total_fees / highest_cost | |
print(f"Total fees: {total_fees/1_000_000:<8} STX") | |
print(f"Avg. fee rate: {int(fee_rate):<8} uSTX/%\n") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Calculate and display block costs.") | |
parser.add_argument( | |
"block_height", | |
nargs="?", | |
default=None, | |
help="The block height to analyze (default to tip).", | |
) | |
parser.add_argument( | |
"--details", action="store_true", help="Display detailed costs." | |
) | |
args = parser.parse_args() | |
if args.block_height is None: | |
block_height = get_tip_height() | |
else: | |
block_height = args.block_height | |
block = get_block(block_height) | |
transactions = get_all_transactions(block_height) | |
costs = get_total_costs(transactions) | |
print( | |
colored("x\u0336 ", "magenta") | |
+ str(block_height) | |
+ colored(" \u2192 \u20BF ", "yellow") | |
+ str(block["burn_block_height"]) | |
+ " " | |
+ colored(time_ago(block["burn_block_time"]), "grey") | |
+ "\n" | |
) | |
print(costs) | |
print(f"{colored(str(costs['block_txs']), 'light_blue')} transactions\n") | |
if len(block["microblocks_accepted"]) > 0: | |
print( | |
f"Microblocks confirmed: {len(block['microblocks_accepted'])} ({colored(str(costs['microblock_txs']), 'light_blue')} txs)\n" | |
) | |
block_percentages = get_cost_percentages(costs["block_costs"]) | |
total_fees = get_total_fees(transactions) | |
print_fee_summary(total_fees, block_percentages) | |
print_block_fullness_bars(block_percentages) | |
if args.details: | |
print_block_fullness(costs["block_costs"], block_percentages) | |
if len(block["microblocks_accepted"]) > 0: | |
print(colored("\n\nMicroblocks:", "dark_grey")) | |
microblock_percentages = get_cost_percentages(costs["microblock_costs"]) | |
print_block_fullness_bars(microblock_percentages, "dark_grey") | |
if args.details: | |
print_block_fullness( | |
costs["microblock_costs"], microblock_percentages, "dark_grey" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment