-
-
Save TechNickAI/13d931ae379dc34ee00c15e12248738e to your computer and use it in GitHub Desktop.
Get validator duties (find largest gap in which to update for that $0.04 it will save/benefit you)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import math | |
from datetime import datetime, timedelta | |
import requests | |
SLOTS_PER_EPOCH = 32 | |
SECONDS_PER_SLOT = 12 | |
def main(validators_indices, eth2_api_url="http://localhost:5052/eth/v1/"): | |
def api_get(endpoint): | |
return requests.get(f"{eth2_api_url}{endpoint}").json() | |
def api_post(endpoint, data): | |
return requests.post(f"{eth2_api_url}{endpoint}", data).json() | |
head_slot = int(api_get("beacon/headers/head")["data"]["header"]["message"]["slot"]) | |
epoch = head_slot // SLOTS_PER_EPOCH | |
cur_epoch_data = api_post(f"validator/duties/attester/{epoch}", json.dumps(validators_indices))[ | |
"data" | |
] | |
next_epoch_data = api_post( | |
f"validator/duties/attester/{epoch + 1}", json.dumps(validators_indices) | |
)["data"] | |
genesis_timestamp = 1606824023 | |
attestation_duties = {} | |
for d in (*cur_epoch_data, *next_epoch_data): | |
attestation_duties.setdefault(int(d["slot"]), []).append(d["validator_index"]) | |
attestation_duties = {k: v for k, v in sorted(attestation_duties.items()) if k > head_slot} | |
all_proposer_duties = api_get(f"validator/duties/proposer/{epoch}")["data"] | |
validators_indices_set = set(validators_indices) | |
duties = attestation_duties.copy() | |
for s in all_proposer_duties: | |
slot = int(s["slot"]) | |
if slot <= head_slot: | |
continue | |
prop_index = int(s["validator_index"]) | |
if prop_index in validators_indices_set: | |
duties.setdefault(slot, []).append(f"{prop_index} (proposal)") | |
duties = dict(sorted(duties.items())) | |
# Also insert (still unknown) attestation duties at epoch after next, | |
# assuming worst case of having to attest at its first slot | |
first_slot_epoch_p2 = (epoch + 2) * SLOTS_PER_EPOCH | |
attestation_duties[first_slot_epoch_p2] = [] | |
print(f"Calculating attestation/proposal slots and gaps for validators:") | |
print(f" {validators_indices}") | |
print("\nUpcoming voting/proposal slots and gaps") | |
print("(Gap in seconds)") | |
print("(slot/epoch - time range - validators)") | |
print("*" * 80) | |
prev_end_time = datetime.now() | |
# Floor to seconds | |
prev_end_time = datetime(*datetime.utctimetuple(prev_end_time)[:6]) | |
# Current epoch gaps | |
cur_epoch_gap_store = {"longest_gap": timedelta(seconds=0), "gap_time_range": (None, None)} | |
overall_gap_store = cur_epoch_gap_store.copy() | |
next_epoch_start_slot = (epoch + 1) * SLOTS_PER_EPOCH | |
next_epoch_start_time = datetime.fromtimestamp(genesis_timestamp + next_epoch_start_slot * 12.0) | |
in_next_epoch = False | |
def _update_gap(end, start, gap_store): | |
gap = end - start | |
assert gap.total_seconds() >= 0 | |
if gap > gap_store["longest_gap"]: | |
gap_store["longest_gap"] = gap | |
gap_store["gap_time_range"] = (end, start) | |
for slot, validators in duties.items(): | |
slot_start = datetime.fromtimestamp(genesis_timestamp + slot * SECONDS_PER_SLOT) | |
slot_end = slot_start + timedelta(seconds=SECONDS_PER_SLOT) | |
suf = "" | |
if not in_next_epoch and slot >= next_epoch_start_slot: | |
print("- " * 40) | |
print( | |
f"Time until epoch change: {math.floor((next_epoch_start_time - prev_end_time).total_seconds())} seconds" | |
) | |
print( | |
f"Epoch boundary (proposal duties are not yet known for next epoch): {next_epoch_start_time}" | |
) | |
print( | |
f"Time until next duty: {math.floor((slot_start - next_epoch_start_time).total_seconds())} seconds" | |
) | |
print("- " * 40) | |
suf = "(after prev. slot duty or current time)" | |
_update_gap(next_epoch_start_time, prev_end_time, cur_epoch_gap_store) | |
in_next_epoch = True | |
print(f"Gap - {math.floor((slot_start - prev_end_time).total_seconds())} seconds {suf}") | |
if validators: | |
print( | |
f" {slot}/{slot // SLOTS_PER_EPOCH}" | |
f" - {slot_start.strftime('%H:%M:%S')} until {slot_end.strftime('%H:%M:%S')}" | |
f" - [{', '.join(validators)}]" | |
) | |
else: | |
assert slot % SLOTS_PER_EPOCH == 0 | |
_update_gap(slot_start, prev_end_time, overall_gap_store) | |
if in_next_epoch is False: | |
_update_gap(slot_start, prev_end_time, cur_epoch_gap_store) | |
prev_end_time = slot_end | |
print("\nLongest attestation and proposer duty gap (only current epoch, first):") | |
longest_gap, gap_time_range = cur_epoch_gap_store.values() | |
print("*" * 80) | |
print( | |
f"{longest_gap.total_seconds()} seconds" | |
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots)," | |
f" from {gap_time_range[1].strftime('%H:%M:%S')}" | |
f" until {gap_time_range[0].strftime('%H:%M:%S')}" | |
) | |
print("\nLongest attestation gap (first):") | |
longest_gap, gap_time_range = overall_gap_store.values() | |
print("*" * 80) | |
print( | |
f"{longest_gap.total_seconds()} seconds" | |
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots)," | |
f" from {gap_time_range[1].strftime('%H:%M:%S')}" | |
f" until {gap_time_range[0].strftime('%H:%M:%S')}" | |
) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Show validator duties of current and next epoch to find largest gap." | |
) | |
parser.add_argument("indices", metavar="index", type=int, nargs="+", help="validator indices") | |
args = parser.parse_args() | |
main(args.indices) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment