Last active
May 28, 2021 21:57
-
-
Save itzmeanjan/65d63637dae5f270f8f66fa040f70602 to your computer and use it in GitHub Desktop.
Polygon ( aka Matic Network ) Mempool Exploration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from python_graphql_client import GraphqlClient | |
from json import dumps | |
from asyncio import run | |
from re import compile as re_compile | |
from pytimeparse import parse | |
reg = re_compile(r'^(\d+(\.\d+)?)') | |
handle = None | |
def prettyPrint(data): | |
tx = data["data"]["newConfirmedTx"] | |
price = reg.search(tx['gasPrice']) | |
if not price: | |
return | |
parsed = parse(tx["pendingFor"]) | |
if not parsed: | |
return | |
handle.write(f'{float(price.group())}; {parsed}\n') | |
print(dumps(data, sort_keys=True, indent=2)) | |
def main(): | |
try: | |
client = GraphqlClient(endpoint="ws://localhost:7000/v1/graphql") | |
query = """ | |
subscription { | |
newConfirmedTx { | |
gasPrice | |
pendingFor | |
} | |
} | |
""" | |
with open('log.csv', 'w+') as fd: | |
global handle | |
handle = fd | |
print('Catching confirmed tx(s) ...') | |
run(client.subscribe(query=query, handle=prettyPrint)) | |
except Exception as e: | |
print(e) | |
except KeyboardInterrupt: | |
print('\nStopping') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pandas==1.1.2 | |
matplotlib==3.3.2 | |
python_graphql_client==0.4.3 | |
pytimeparse==1.1.8 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from matplotlib import pyplot as plt | |
from pandas.core.frame import DataFrame | |
def plot(df) -> bool: | |
try: | |
with plt.style.context('dark_background'): | |
fig = plt.Figure( | |
figsize=(18, 9), | |
dpi=100) | |
fig.gca().plot('x', 'y', 'wo ', data=df, markersize=1.0) | |
fig.gca().set_xlabel('Gas Price ( in Gwei )', labelpad=12) | |
fig.gca().set_ylabel('Waiting time in Mempool ( in Second )', labelpad=12) | |
fig.suptitle('Polygon Mempool Analysis', fontsize=20, y=1) | |
fig.savefig( | |
'plot.png', | |
bbox_inches='tight', | |
pad_inches=.5) | |
plt.close(fig) | |
return True | |
except Exception as e: | |
print(f'Error : {e}') | |
return False | |
def read() -> DataFrame: | |
fd = open('log.csv', 'r') | |
x = [] | |
y = [] | |
while True: | |
ln = fd.readline() | |
if not ln: | |
break | |
vals = [float(v.strip()) for v in ln.split(';')] | |
x.append(vals[0]) | |
y.append(vals[1]) | |
return DataFrame({'x': x, 'y': y}) | |
def main(): | |
if plot(read()): | |
print('Plotted !') | |
if __name__ == '__main__': | |
main() |
When you're done, consider disabling virtual environment
deactivate
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What's this ?
These scripts are supposed to be used in conjunction with
harmony
- Reduce Chaos in Mempool, for exploring mempool data.harmony
by following respective README.mdpython3 -m venv venv source venv/bin/activate
catch.py
for catching confirmed txs & logging them in./log.csv
python catch.py # let it run for sometime
log.csv
generated in same directoryviz.py
for generating scatter plot of GasPrice vs Tx Waiting Periodplot.png
in same directory