Skip to content

Instantly share code, notes, and snippets.

@eggman
Created December 7, 2017 04:30
Show Gist options
  • Save eggman/2b71e7294f02a879b604f7c383d26f74 to your computer and use it in GitHub Desktop.
Save eggman/2b71e7294f02a879b604f7c383d26f74 to your computer and use it in GitHub Desktop.
#
# https://www.bitmex.com/app/wsAPI
#
import websocket
import sys
from datetime import datetime, timedelta, timezone
import sched, time
import json
JST = timezone(timedelta(hours=+9), 'JST')
class BitmexStream():
endpoint = "wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD"
def __init__(self):
#websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(
BitmexStream.endpoint,
on_message = self.on_message,
on_error = self.on_error,
on_close = self.on_close
)
self.ws.on_open = self.on_open
try:
self.run()
except KeyboardInterrupt:
self.ws.close()
def run(self):
print("### run ###")
self.ws.run_forever()
pass
def on_message(self, ws, message):
now = datetime.now(JST)
print(str(now), message)
def on_error(self, ws, error):
print(error)
sys.exit()
def on_close(self, ws):
print("### closed ###")
def on_open(self, ws):
print("### open ###")
if __name__=="__main__":
BitmexStream()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment