Last active
April 1, 2020 18:24
-
-
Save pinhopro/56f1b740e8447782c9e02ee9dacfbf84 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import websockets | |
import json | |
import time | |
import pprint | |
BLINKTRADE_API_ENDPOINT = 'wss://bitcambio_api.blinktrade.com/trade/' | |
BLINKTRADE_BROKER_ID = 11 | |
BLINKTRADE_API_KEY = '' | |
BLINKTRADE_API_PASSWORD = '' | |
class BlinktradeApiClient(object): | |
def __init__(self, ws): | |
self._ws = ws | |
async def testRequestMessage(self, request_id=int(time.time()*1000)): | |
# TestRequest message. # http://www.onixs.biz/fix-dictionary/4.4/msgType_1_1.html | |
msg = {'MsgType': '1', 'TestReqID': request_id } | |
# send a test request message | |
await self._ws.send(json.dumps(msg)) | |
# get the heartbeat response message | |
return json.loads( await self._ws.recv() ) | |
async def sendLimitedBuyOrder(self, symbol, qty, price, clientOrderId, brokerID ): | |
if not symbol or not qty or not qty or not price or not clientOrderId: | |
raise ValueError('Invalid parameters') | |
if qty <= 0 or price <= 0: | |
raise ValueError('Invalid qty or price') | |
msg = { | |
"MsgType" : 'D', | |
"ClOrdID" : str(clientOrderId), | |
"Symbol" : symbol, | |
"Side" : '1', | |
"OrdType" : '2', | |
"Price" : int(price), | |
"OrderQty" : int(qty), | |
"BrokerID" : brokerID | |
} | |
await self._ws.send(json.dumps(msg)) | |
async def sendCancelOrder(self, clientOrderId=None, orderId=None ): | |
if clientOrderId is None and orderId is None: | |
raise ValueError('Invalid parameters') | |
msg = {"MsgType" : 'F'} | |
if clientOrderId is not None: | |
msg["ClOrdID"] = str(clientOrderId) | |
elif orderId is not None: | |
msg["OrderID"] = str(clientOrderId) | |
await self._ws.send(json.dumps(msg)) | |
async def login(self, broker_id, api_key, api_password, request_id=int(time.time()*1000)): | |
msg = { | |
"MsgType" : "BE", | |
"UserReqID" : request_id, | |
"BrokerID" : broker_id, | |
"Username" : api_key, | |
"Password" : api_password, | |
"UserReqTyp" : '1', | |
"UserAgent" : "BlinktradeApi", | |
"UserAgentLanguage" : "en", | |
"UserAgentTimezoneOffset" : 0, | |
"UserAgentPlatform" : "Python3", | |
"FingerPrint" : 8888 | |
} | |
await self._ws.send(json.dumps(msg)) | |
login_response_message = json.loads(await self._ws.recv()) | |
assert (login_response_message["MsgType"] == "BF") | |
return login_response_message | |
async def requestMarketData(self, symbol_list, entry_types, subscription_type='1', market_depth=0, update_type = '1', | |
request_id=int(time.time()*1000)): | |
# Market Data request. # http://www.onixs.biz/fix-dictionary/4.4/msgType_V_86.html | |
if not symbol_list or not entry_types: | |
raise ValueError('Invalid parameters') | |
msg = { | |
'MsgType' : 'V', | |
'MDReqID': request_id, | |
'SubscriptionRequestType': subscription_type, | |
'MarketDepth': market_depth, | |
'MDUpdateType': update_type, # | |
'MDEntryTypes': entry_types, # bid , offer, trade | |
'Instruments': symbol_list | |
} | |
await self._ws.send(json.dumps(msg)) | |
market_data_response_message = json.loads(await self._ws.recv()) | |
assert (market_data_response_message["MsgType"] == 'W') | |
return market_data_response_message | |
async def subscribeSecurityStatus(self, symbol_list, requestId=int(time.time()*1000)): | |
msg = { | |
"MsgType": 'e', | |
"SecurityStatusReqID": requestId, | |
"SubscriptionRequestType": '1', | |
"Instruments": symbol_list | |
} | |
await self._ws.send(json.dumps(msg)) | |
security_status_list_response = [] | |
for x in range(len(symbol_list)): | |
security_status = json.loads(await self._ws.recv()) | |
assert (security_status["MsgType"] == 'f') | |
security_status_list_response.append(security_status) | |
return security_status_list_response | |
async def requestSecurityList(self, market="BLINK", requestId=int(time.time()*1000)): | |
msg = { | |
'MsgType': 'x', | |
'SecurityReqID': requestId, | |
'SecurityListRequestType': 0, | |
'Market': market, | |
'SecurityRequestResult': 0 | |
} | |
await self._ws.send(json.dumps(msg)) | |
security_list_response = json.loads(await self._ws.recv()) | |
# Security List Response - http://www.onixs.biz/fix-dictionary/4.4/msgType_y_121.html | |
assert (security_list_response["MsgType"] == 'y' ) | |
return security_list_response | |
# this class contains the order book, trades, and ticker information | |
class MarketData(object): | |
def __init__(self): | |
self.market_data = {} | |
self.security_status = {} | |
def get_order_book(self, symbol): | |
return self.market_data.get(symbol) | |
def get_ticker(self, symbol, market="BLINK"): | |
market = self.security_status.get(market) | |
if not market: | |
return None | |
return self.security_status.get(market).get(symbol) | |
def process_market_data_full_refresh(self, msg): | |
# Market Data - Snapshot/Full Refresh - http://www.onixs.biz/fix-dictionary/4.4/msgType_W_87.html | |
symbol = msg.get("Symbol") | |
self.market_data[symbol] = {'bid': [], 'ask': [], 'trades': []} # clear the order book dictionaries | |
group = msg.get('MDFullGrp') | |
for entry in group: | |
entry_type = entry.get('MDEntryType') | |
if entry_type == '0' or entry_type == '1': | |
self._handle_market_data_on_book_new_order(symbol, entry) | |
continue | |
if entry_type == '2': | |
self._handle_market_data_on_trade(symbol, entry) | |
continue | |
def process_market_data_incremental_refresh(self,msg): | |
# Market Data - Incremental Refresh <X> message = http://www.onixs.biz/fix-dictionary/4.4/msgType_X_88.html | |
if msg.get("MDBkTyp") == '3': # Order Depth | |
group = msg.get("MDIncGrp") | |
for entry in group: | |
entry_type = entry.get("MDEntryType") | |
symbol = entry.get("Symbol") | |
if entry_type == '0' or entry_type == '1': | |
update_action = entry.get("MDUpdateAction") | |
if update_action == '0': | |
self._handle_market_data_on_book_new_order(symbol, entry) | |
elif update_action == '1': | |
self._handle_market_data_on_book_update_order(symbol, entry) | |
elif update_action == '2': | |
self._handle_market_data_on_book_delete_order(symbol, entry) | |
elif update_action == '3': | |
self._handle_market_data_on_book_delete_orders_thru(symbol, entry) | |
elif entry_type == '2': | |
self._handle_market_data_on_trade(symbol, entry) | |
def process_security_status_response(self,msg): | |
# Security Status - http://www.onixs.biz/fix-dictionary/4.4/msgType_f_102.html | |
if msg.get('Market') not in self.security_status: | |
self.security_status[msg.get('Market')] = {} | |
if msg.get('Symbol') not in self.security_status[msg.get('Market')]: | |
self.security_status[msg.get('Market')][msg.get('Symbol')] = {} | |
self.security_status[msg.get('Market')][msg.get('Symbol')] = msg | |
def _handle_market_data_on_book_new_order(self, symbol, msg): | |
index = msg.get('MDEntryPositionNo') - 1 | |
order = { | |
'price': msg.get("MDEntryPx"), | |
'qty': msg.get("MDEntrySize"), | |
'user_id': msg.get("UserID"), | |
'broker': msg.get("Broker"), | |
'order_id': msg.get("OrderID"), | |
'side': msg.get("MDEntryType"), | |
'order_time': msg.get("MDEntryTime"), | |
'order_date': msg.get("MDEntryDate") | |
} | |
if msg.get('MDEntryType') == '0': # bid | |
self.market_data[symbol]["bid"].insert(index, order) | |
elif msg.get('MDEntryType') == '1': # sell | |
self.market_data[symbol]["ask"].insert(index, order) | |
def _handle_market_data_on_book_update_order(self, symbol, msg): | |
index = msg.get('MDEntryPositionNo') - 1 | |
order = { | |
'price': msg.get("MDEntryPx"), | |
'qty': msg.get("MDEntrySize"), | |
'user_id': msg.get("UserID"), | |
'broker': msg.get("Broker"), | |
'order_id': msg.get("OrderID"), | |
'side': msg.get("MDEntryType"), | |
'order_time': msg.get("MDEntryTime"), | |
'order_date': msg.get("MDEntryDate") | |
} | |
if msg.get('MDEntryType') == '0': # sell | |
self.market_data[symbol]["bid"][index] = order | |
elif msg.get('MDEntryType') == '1': # sell | |
self.market_data[symbol]["ask"][index] = order | |
def _handle_market_data_on_book_delete_order(self, symbol, msg): | |
index = msg.get('MDEntryPositionNo') - 1 | |
side = msg.get('MDEntryType') | |
if side == '0': | |
del self.market_data[symbol]["bid"][index] | |
elif side == '1': | |
del self.market_data[symbol]['ask'][index] | |
def _handle_market_data_on_book_delete_orders_thru(self, symbol, msg): | |
index = msg.get('MDEntryPositionNo') | |
side = msg.get('MDEntryType') | |
if side == '0': | |
del self.market_data[symbol]["bid"][index:] | |
elif side == '1': | |
del self.market_data[symbol]['ask'][index:] | |
def _handle_market_data_on_trade(self, symbol, msg): | |
trade = { | |
"id": msg.get("TradeID"), | |
"price": msg.get("MDEntryPx"), | |
"symbol": msg.get("Symbol"), | |
"size": msg.get("MDEntrySize"), | |
"trade_date": msg.get("MDEntryDate"), | |
"trade_time": msg.get("MDEntryTime"), | |
"order_id": msg.get("OrderID"), | |
"side": msg.get("Side"), | |
"counter_order_id": msg.get("SecondaryOrderID"), | |
"buyer_id": msg.get("MDEntryBuyerID"), | |
"seller_id": msg.get("MDEntrySellerID") | |
} | |
self.market_data[symbol]['trades'].append(trade) | |
async def main(api_endpoint, broker_id, api_key, api_password): | |
async with websockets.connect(api_endpoint) as ws: | |
market_data = MarketData() | |
client = BlinktradeApiClient(ws) | |
initial_heart_beat_message = json.loads(await ws.recv()) | |
assert (initial_heart_beat_message["MsgType"] == '0') | |
# login | |
login_response = await client.login(broker_id,api_key,api_password) | |
if login_response["UserStatus"] == 1: | |
print("success") | |
# subscribe to security status (aka ticker) for every security we have available. | |
security_list_response = await client.requestSecurityList() | |
symbol_list = [] | |
for instrument_symbol in security_list_response['Instruments']: | |
symbol_list.append(instrument_symbol["Symbol"]) | |
security_status_response_list = await client.subscribeSecurityStatus(symbol_list) | |
for security_status in security_status_response_list: | |
market_data.process_security_status_response(security_status) | |
# request market data for BTCBRL. We will receive the full order book and all trades. | |
market_data.process_market_data_full_refresh(await client.requestMarketData(["BTCBRL"], ['0', '1', '2'])) | |
# keep sending a test request message every 30 seconds | |
async def blinktrade_send_test_request_task(blinktrade_client): | |
while True: | |
await blinktrade_client.testRequestMessage() | |
await asyncio.sleep(30) | |
asyncio.ensure_future(blinktrade_send_test_request_task(client)) | |
# send an order, wait 3 seconds and then cancel the order | |
await client.sendLimitedBuyOrder("BTCBRL", 0.001 * 1e8, 20000 * 1e8, "MyUniqueID2", broker_id) | |
await asyncio.sleep(3) | |
await client.sendCancelOrder("MyUniqueID2") | |
while True: | |
raw_message = await ws.recv() | |
try: | |
msg = json.loads(raw_message) | |
except Exception as e: | |
print("error", str(e), "parsing message.") | |
continue | |
msg_type = msg["MsgType"] | |
if msg_type == 'f': | |
market_data.process_security_status_response(msg) | |
continue | |
if msg_type == 'X': | |
market_data.process_market_data_incremental_refresh(msg) | |
continue | |
print(raw_message) | |
if __name__ == '__main__': | |
asyncio.get_event_loop().run_until_complete( | |
main(BLINKTRADE_API_ENDPOINT,BLINKTRADE_BROKER_ID,BLINKTRADE_API_KEY,BLINKTRADE_API_PASSWORD )) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment