Created
May 14, 2022 23:00
-
-
Save normanlmfung/5db005d41654ed8b26f595d4ca347016 to your computer and use it in GitHub Desktop.
block chain cutoff scanner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import datetime | |
from requests import ReadTimeout | |
from web3.exceptions import BlockNotFound | |
logger = logging.getLogger(__name__) | |
def search_block_with_cutoff_ts( | |
end_block : int, | |
cutoff_ts : int, | |
search_step_size : int = 1000, | |
get_block = None, | |
precision_sec : int = 180 | |
) -> int: | |
cutoff_ts = int(cutoff_ts) | |
this_block_number = int(end_block - 1) | |
this_block = None | |
for i in range(10): | |
if not this_block: | |
try: | |
this_block = get_block(this_block_number) | |
this_block_ts = this_block.timestamp | |
this_block_dt = datetime.datetime.fromtimestamp(this_block_ts) | |
logger.info(f"get_block this_block_number: {this_block_number} this_block_ts: {this_block_ts}, this_block_dt: {this_block_dt}, cutoff: {datetime.datetime.fromtimestamp(cutoff_ts)}, search_step_size: {search_step_size}") | |
except BlockNotFound as blockNotFoundError: | |
this_block_number = this_block_number - 1 | |
except ReadTimeout as readTimeoutError: | |
logger.error(f"ReadTimeout while get_block this_block_number: {this_block_number}: {readTimeoutError} ... Keep trying...") | |
except Exception as error: | |
logger.error(f"Error while get_block this_block_number: {this_block_number}: {error} ... Keep trying ...") | |
else: | |
break | |
this_block_ts = this_block.timestamp | |
this_block_number = int(max(this_block_number - search_step_size, 0)) | |
while this_block_ts>cutoff_ts: | |
try: | |
this_block = get_block(this_block_number) | |
this_block_number = int(max(this_block_number - search_step_size, 0)) | |
this_block_ts = this_block.timestamp | |
this_block_dt = datetime.datetime.fromtimestamp(this_block_ts) | |
logger.info(f"get_block this_block_number: {this_block_number} this_block_ts: {this_block_ts}, this_block_dt: {this_block_dt}, cutoff: {datetime.datetime.fromtimestamp(cutoff_ts)}, search_step_size: {search_step_size}") | |
except BlockNotFound as blockNotFoundError: | |
this_block_number = this_block_number - 1 | |
except ReadTimeout as readTimeoutError: | |
logger.error(f"ReadTimeout while get_block this_block_number: {this_block_number}: {readTimeoutError} ... Keep trying...") | |
except Exception as error: | |
if "unrecognized block" in error: | |
this_block_number = this_block_number - 1 | |
else: | |
logger.error(f"Error while get_block this_block_number: {this_block_number}: {error} ... Keep trying ...") | |
left_bound = this_block_number + search_step_size | |
right_bound = this_block_number + search_step_size *2 | |
mid_block_number = int((left_bound + right_bound) / 2) | |
mid_block = None | |
for i in range(10): | |
if not mid_block: | |
try: | |
mid_block = get_block(mid_block_number) | |
mid_block_ts = mid_block.timestamp | |
mid_block_dt = datetime.datetime.fromtimestamp(mid_block_ts) | |
logger.info(f"get_block mid_block_number: {mid_block_number} mid_block_ts: {mid_block_ts}, mid_block_dt: {mid_block_dt}, cutoff: {datetime.datetime.fromtimestamp(cutoff_ts)}, search_step_size: {search_step_size}") | |
except BlockNotFound as blockNotFoundError: | |
mid_block_number = mid_block_number - 1 | |
except ReadTimeout as readTimeoutError: | |
logger.error(f"ReadTimeout while get_block mid_block_number: {mid_block_number}: {readTimeoutError} ... Keep trying...") | |
except Exception as error: | |
logger.error(f"Error while get_block mid_block_number: {mid_block_number}: {error} ... Keep trying ...") | |
else: | |
break | |
delta = datetime.datetime.fromtimestamp(cutoff_ts) - datetime.datetime.fromtimestamp(mid_block.timestamp) | |
if abs(delta.seconds)>= precision_sec: # The block that's closest to cutoff wont have a timestamp that's precisely equal to cutoff. Also if precision too tight, you'd not converge. | |
new_search_step_size = int(search_step_size/2) | |
if cutoff_ts > mid_block.timestamp: | |
return search_block_with_cutoff_ts(end_block=right_bound, cutoff_ts=cutoff_ts, search_step_size=new_search_step_size, get_block=get_block) | |
else: | |
return search_block_with_cutoff_ts(end_block=mid_block_number, cutoff_ts=cutoff_ts, search_step_size=new_search_step_size, get_block=get_block) | |
else: | |
return mid_block.number |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment