Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save normanlmfung/5db005d41654ed8b26f595d4ca347016 to your computer and use it in GitHub Desktop.
Save normanlmfung/5db005d41654ed8b26f595d4ca347016 to your computer and use it in GitHub Desktop.
block chain cutoff scanner
import logging
import datetime
from requests import ReadTimeout
from web3.exceptions import BlockNotFound
logger = logging.getLogger(__name__)
def search_block_with_cutoff_ts(
end_block : int,
cutoff_ts : int,
search_step_size : int = 1000,
get_block = None,
precision_sec : int = 180
) -> int:
cutoff_ts = int(cutoff_ts)
this_block_number = int(end_block - 1)
this_block = None
for i in range(10):
if not this_block:
try:
this_block = get_block(this_block_number)
this_block_ts = this_block.timestamp
this_block_dt = datetime.datetime.fromtimestamp(this_block_ts)
logger.info(f"get_block this_block_number: {this_block_number} this_block_ts: {this_block_ts}, this_block_dt: {this_block_dt}, cutoff: {datetime.datetime.fromtimestamp(cutoff_ts)}, search_step_size: {search_step_size}")
except BlockNotFound as blockNotFoundError:
this_block_number = this_block_number - 1
except ReadTimeout as readTimeoutError:
logger.error(f"ReadTimeout while get_block this_block_number: {this_block_number}: {readTimeoutError} ... Keep trying...")
except Exception as error:
logger.error(f"Error while get_block this_block_number: {this_block_number}: {error} ... Keep trying ...")
else:
break
this_block_ts = this_block.timestamp
this_block_number = int(max(this_block_number - search_step_size, 0))
while this_block_ts>cutoff_ts:
try:
this_block = get_block(this_block_number)
this_block_number = int(max(this_block_number - search_step_size, 0))
this_block_ts = this_block.timestamp
this_block_dt = datetime.datetime.fromtimestamp(this_block_ts)
logger.info(f"get_block this_block_number: {this_block_number} this_block_ts: {this_block_ts}, this_block_dt: {this_block_dt}, cutoff: {datetime.datetime.fromtimestamp(cutoff_ts)}, search_step_size: {search_step_size}")
except BlockNotFound as blockNotFoundError:
this_block_number = this_block_number - 1
except ReadTimeout as readTimeoutError:
logger.error(f"ReadTimeout while get_block this_block_number: {this_block_number}: {readTimeoutError} ... Keep trying...")
except Exception as error:
if "unrecognized block" in error:
this_block_number = this_block_number - 1
else:
logger.error(f"Error while get_block this_block_number: {this_block_number}: {error} ... Keep trying ...")
left_bound = this_block_number + search_step_size
right_bound = this_block_number + search_step_size *2
mid_block_number = int((left_bound + right_bound) / 2)
mid_block = None
for i in range(10):
if not mid_block:
try:
mid_block = get_block(mid_block_number)
mid_block_ts = mid_block.timestamp
mid_block_dt = datetime.datetime.fromtimestamp(mid_block_ts)
logger.info(f"get_block mid_block_number: {mid_block_number} mid_block_ts: {mid_block_ts}, mid_block_dt: {mid_block_dt}, cutoff: {datetime.datetime.fromtimestamp(cutoff_ts)}, search_step_size: {search_step_size}")
except BlockNotFound as blockNotFoundError:
mid_block_number = mid_block_number - 1
except ReadTimeout as readTimeoutError:
logger.error(f"ReadTimeout while get_block mid_block_number: {mid_block_number}: {readTimeoutError} ... Keep trying...")
except Exception as error:
logger.error(f"Error while get_block mid_block_number: {mid_block_number}: {error} ... Keep trying ...")
else:
break
delta = datetime.datetime.fromtimestamp(cutoff_ts) - datetime.datetime.fromtimestamp(mid_block.timestamp)
if abs(delta.seconds)>= precision_sec: # The block that's closest to cutoff wont have a timestamp that's precisely equal to cutoff. Also if precision too tight, you'd not converge.
new_search_step_size = int(search_step_size/2)
if cutoff_ts > mid_block.timestamp:
return search_block_with_cutoff_ts(end_block=right_bound, cutoff_ts=cutoff_ts, search_step_size=new_search_step_size, get_block=get_block)
else:
return search_block_with_cutoff_ts(end_block=mid_block_number, cutoff_ts=cutoff_ts, search_step_size=new_search_step_size, get_block=get_block)
else:
return mid_block.number
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment