Skip to content

Instantly share code, notes, and snippets.

@anomit
Created June 21, 2021 14:50
Show Gist options
  • Save anomit/cc897c492fe349034b22fd78a405edac to your computer and use it in GitHub Desktop.
Save anomit/cc897c492fe349034b22fd78a405edac to your computer and use it in GitHub Desktop.
import websockets
import asyncio
import json
import async_timeout
import sys
async def consumer_contract(retries_before_quitting=10):
counter = 0
async with websockets.connect(sys.argv[1]) as ws:
cmd = """{
"method": "eth_newFilter",
"id": 1,
"jsonrpc": "2.0",
"params": [
{
"fromBlock": "0xdbb5c8",
"toBlock": "latest",
"topics": [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
],
"address": ["0xEC9b657Cc8F506a3B1e56982637034f946a4e266"]
}
]
}
"""
await ws.send(cmd)
print('Sending...', cmd)
new_filter_confirmation = await ws.recv()
print(new_filter_confirmation)
filter_id = json.loads(new_filter_confirmation).get('result')
print('Got filter id: ', filter_id)
print('Sleeping for 15 seconds before we begin polling...\n\n')
await asyncio.sleep(15)
while True:
try:
print('Polling with filter id: ', filter_id)
poll_cmd = f"""
{{
"method": "eth_getFilterChanges",
"id": 1,
"jsonrpc": "2.0",
"params": [
"{filter_id}"
]
}}
"""
await ws.send(poll_cmd)
msg = await ws.recv()
print(msg)
counter += 1
except:
pass
finally:
if counter == retries_before_quitting:
if filter_id:
cmd = '{"method":"eth_uninstallFilter","params":["'\
+ \
filter_id + \
'"],"id":222,"jsonrpc":"2.0"}'
await ws.send(cmd)
print('Unsubscribing...', cmd)
unsub_response = await ws.recv()
print(unsub_response)
await ws.close()
return
print('\n\nSleeping for 5 seconds...\n\n')
await asyncio.sleep(5)
if __name__ == '__main__':
if len(sys.argv) < 2:
print('Usage: python ws_testclient_filters.py <ws-server-url> <tries-before-unsubscribing | OPTIONAL>')
sys.exit(0)
if len(sys.argv) == 3:
retries_before_quitting = int(sys.argv[2])
elif len(sys.argv) == 2:
retries_before_quitting = 5
asyncio.get_event_loop().run_until_complete(consumer_contract(retries_before_quitting))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment