Created
June 8, 2023 21:20
-
-
Save chessai/a48ca24ce76f954da631423b9e4c3064 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
import threading | |
import json | |
from queue import Queue | |
port_p2p = '1789' | |
port_service = '1848' | |
base_url_p2p = 'https://localhost:' + port_p2p + '/chainweb/0.0/mainnet01' | |
base_url_service = 'http://localhost:' + port_service + '/chainweb/0.0/mainnet01' | |
cut_endpoint = base_url_p2p + '/cut' | |
header_updates_endpoint = base_url_service + '/header/updates' | |
def get_repeatedly(url, sleep_seconds=5): | |
while True: | |
try: | |
response = requests.get(url, verify=False) | |
if response.status_code == 200: | |
yield response | |
time.sleep(sleep_seconds) | |
else: | |
print(f"Unable to connect, status code: {response.status_code}") | |
time.sleep(sleep_seconds) | |
except requests.exceptions.RequestException as e: | |
print(f"Request failed: {str(e)}") | |
time.sleep(sleep_seconds) | |
def event_stream(url, sleep_seconds=5): | |
while True: | |
try: | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
event = "" | |
for line in response.iter_lines(): | |
# filter out keep-alive new lines | |
if line: | |
event += line.decode() + "\n" | |
elif event: | |
yield event | |
event = "" | |
else: | |
print(f"Unable to connect, status code: {response.status_code}") | |
time.sleep(sleep_seconds) # wait before trying to reconnect | |
except requests.exceptions.RequestException as e: | |
print(f"Request failed: {str(e)}") | |
time.sleep(sleep_seconds) # wait before trying to reconnect | |
def cuts(queue): | |
for cut in get_repeatedly(cut_endpoint, sleep_seconds=60): | |
queue.put(f"Cut: {str(cut.json())}") | |
def header_updates(queue): | |
for update in event_stream(header_updates_endpoint): | |
queue.put(f"Header Update: {str(update.json())}") | |
def printer(queue): | |
while True: | |
message = queue.get() | |
print(message) | |
def just_do_it(): | |
queue = Queue() | |
printer_thread = threading.Thread(target=printer, args=(queue,), daemon=True, name='Printer') | |
cut_thread = threading.Thread(target=cuts, args=(queue,), name='P2P /cut') | |
header_update_thread = threading.Thread(target=header_updates, args=(queue,), name='Service /header/updates') | |
printer_thread.start() | |
cut_thread.start() | |
header_update_thread.start() | |
cut_thread.join() | |
header_update_thread.join() | |
try: | |
just_do_it() | |
except KeyboardInterrupt as e: | |
print("Cancelling...") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment