Skip to content

Instantly share code, notes, and snippets.

@nfl0
Created December 8, 2024 00:47
Show Gist options
  • Save nfl0/645ebed2d86b6ac17198fcd0c85b71b2 to your computer and use it in GitHub Desktop.
Save nfl0/645ebed2d86b6ac17198fcd0c85b71b2 to your computer and use it in GitHub Desktop.
import websocket
import json
import os
import threading
import time
import subprocess
# Global variables to store dynamic values
current_price = "Fetching..."
accumulated_sent = 0
accumulated_received = 0
def on_message(ws, message):
global current_price
data = json.loads(message)
if 'p' in data:
current_price = f"{float(data['p']):,.2f} USDT"
def on_error(ws, error):
print(f"Error: {error}")
def on_close(ws, close_status_code, close_msg):
print("WebSocket closed")
def on_open(ws):
# Subscribe to the ZEC/USDT ticker
payload = {
"method": "SUBSCRIBE",
"params": [
"zecusdt@trade"
],
"id": 1
}
ws.send(json.dumps(payload))
def monitor_system_usage():
global accumulated_sent, accumulated_received
while True:
# Get memory usage using Termux `free` command
mem_info = subprocess.check_output("free -m", shell=True).decode()
mem_lines = mem_info.splitlines()
mem_values = mem_lines[1].split()
total_memory = mem_values[1]
used_memory = mem_values[2]
# Use `ifconfig` or `ip -s link` to check network usage
net_info = subprocess.check_output("ip -s link", shell=True).decode()
lines = net_info.splitlines()
tx_line = next((line for line in lines if "TX:" in line), None)
rx_line = next((line for line in lines if "RX:" in line), None)
if tx_line and rx_line:
tx_values = [int(x) for x in tx_line.split() if x.isdigit()]
rx_values = [int(x) for x in rx_line.split() if x.isdigit()]
accumulated_sent += tx_values[0]
accumulated_received += rx_values[0]
# Display the data
os.system("clear")
print("Zcash Price Monitor (ZEC/USDT)")
print(f"Memory Usage: {used_memory}/{total_memory} MB")
print(f"Network Sent: {accumulated_sent / 1024:.2f} KB")
print(f"Network Received: {accumulated_received / 1024:.2f} KB")
print(f"Zcash Price: {current_price}")
time.sleep(1) # Update every second
if __name__ == "__main__":
url = "wss://stream.binance.com:9443/ws"
# Start the WebSocket in a separate thread
ws = websocket.WebSocketApp(url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
threading.Thread(target=ws.run_forever, daemon=True).start()
# Start the system monitoring
monitor_system_usage()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment