Skip to content

Instantly share code, notes, and snippets.

@chessai
Created June 8, 2023 21:20
Show Gist options
  • Save chessai/a48ca24ce76f954da631423b9e4c3064 to your computer and use it in GitHub Desktop.
Save chessai/a48ca24ce76f954da631423b9e4c3064 to your computer and use it in GitHub Desktop.
import requests
import time
import threading
import json
from queue import Queue
port_p2p = '1789'
port_service = '1848'
base_url_p2p = 'https://localhost:' + port_p2p + '/chainweb/0.0/mainnet01'
base_url_service = 'http://localhost:' + port_service + '/chainweb/0.0/mainnet01'
cut_endpoint = base_url_p2p + '/cut'
header_updates_endpoint = base_url_service + '/header/updates'
def get_repeatedly(url, sleep_seconds=5):
while True:
try:
response = requests.get(url, verify=False)
if response.status_code == 200:
yield response
time.sleep(sleep_seconds)
else:
print(f"Unable to connect, status code: {response.status_code}")
time.sleep(sleep_seconds)
except requests.exceptions.RequestException as e:
print(f"Request failed: {str(e)}")
time.sleep(sleep_seconds)
def event_stream(url, sleep_seconds=5):
while True:
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
event = ""
for line in response.iter_lines():
# filter out keep-alive new lines
if line:
event += line.decode() + "\n"
elif event:
yield event
event = ""
else:
print(f"Unable to connect, status code: {response.status_code}")
time.sleep(sleep_seconds) # wait before trying to reconnect
except requests.exceptions.RequestException as e:
print(f"Request failed: {str(e)}")
time.sleep(sleep_seconds) # wait before trying to reconnect
def cuts(queue):
for cut in get_repeatedly(cut_endpoint, sleep_seconds=60):
queue.put(f"Cut: {str(cut.json())}")
def header_updates(queue):
for update in event_stream(header_updates_endpoint):
queue.put(f"Header Update: {str(update.json())}")
def printer(queue):
while True:
message = queue.get()
print(message)
def just_do_it():
queue = Queue()
printer_thread = threading.Thread(target=printer, args=(queue,), daemon=True, name='Printer')
cut_thread = threading.Thread(target=cuts, args=(queue,), name='P2P /cut')
header_update_thread = threading.Thread(target=header_updates, args=(queue,), name='Service /header/updates')
printer_thread.start()
cut_thread.start()
header_update_thread.start()
cut_thread.join()
header_update_thread.join()
try:
just_do_it()
except KeyboardInterrupt as e:
print("Cancelling...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment