Skip to content

Instantly share code, notes, and snippets.

@nfl0
Created December 8, 2024 00:38
Show Gist options
  • Save nfl0/0cc816345dbc67d6a0c9120083719fd5 to your computer and use it in GitHub Desktop.
Save nfl0/0cc816345dbc67d6a0c9120083719fd5 to your computer and use it in GitHub Desktop.
monitor zcash price w/ binance websocket
import websocket
import json
import psutil
import os
import curses
import threading
import time
# Global variables to store dynamic values
current_price = "Fetching..."
accumulated_sent = 0
accumulated_received = 0
def on_message(ws, message):
global current_price
data = json.loads(message)
if 'p' in data:
current_price = f"{float(data['p']):,.2f} USDT"
def on_error(ws, error):
print(f"Error: {error}")
def on_close(ws, close_status_code, close_msg):
print("WebSocket closed")
def on_open(ws):
# Subscribe to the ZEC/USDT ticker
payload = {
"method": "SUBSCRIBE",
"params": [
"zecusdt@trade"
],
"id": 1
}
ws.send(json.dumps(payload))
def monitor_system_usage(screen):
global accumulated_sent, accumulated_received
# Get the current process
process = psutil.Process(os.getpid())
previous_network = psutil.net_io_counters()
while True:
# Memory usage of the current process
memory_info = process.memory_info()
memory_usage = memory_info.rss / (1024 ** 2) # Resident Set Size in MB
# Network usage for the script
current_network = psutil.net_io_counters()
bytes_sent = current_network.bytes_sent - previous_network.bytes_sent
bytes_recv = current_network.bytes_recv - previous_network.bytes_recv
accumulated_sent += bytes_sent
accumulated_received += bytes_recv
previous_network = current_network
# Update the display
screen.clear()
screen.addstr(0, 0, "Zcash Price Monitor (ZEC/USDT)")
screen.addstr(1, 0, f"Memory Usage: {memory_usage:.2f} MB")
screen.addstr(2, 0, f"Network Sent: {accumulated_sent / 1024:.2f} KB")
screen.addstr(3, 0, f"Network Received: {accumulated_received / 1024:.2f} KB")
screen.addstr(5, 0, f"Zcash Price: {current_price}")
screen.refresh()
time.sleep(1) # Update every second
if __name__ == "__main__":
url = "wss://stream.binance.com:9443/ws"
# Start the WebSocket in a separate thread
ws = websocket.WebSocketApp(url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
threading.Thread(target=ws.run_forever, daemon=True).start()
# Start the curses display for monitoring
curses.wrapper(monitor_system_usage)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment