Skip to content

Instantly share code, notes, and snippets.

@sawara7
Last active May 1, 2019 08:44
Show Gist options
  • Save sawara7/4a0ce23622264074cf0e5449476591bc to your computer and use it in GitHub Desktop.
Save sawara7/4a0ce23622264074cf0e5449476591bc to your computer and use it in GitHub Desktop.
bitFlyerのRealtime API(JSON-RPC 2.0 over WebSocket)をaiohttpのWebsokectで利用する
# -*- coding: utf-8 -*-
"""
+++ bitFlyerのRealtime API(JSON-RPC 2.0 over WebSocket)をaiohttpのWebsokectを用いて取得する
"""
import aiohttp
import asyncio
import json
class bfDataReceiver(object):
def __init__(self,channel):
self.channel = channel
self.collect_proc_list = []
self.parse_message_proc = None
async def run(self):
session = aiohttp.ClientSession()
async with session.ws_connect('wss://ws.lightstream.bitflyer.com/json-rpc') as ws:
await ws.send_str(json.dumps(
{"method": "subscribe",
"params": {"channel" : self.channel}}))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
m = json.loads(msg[1])
self.collect_message(m)
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
def collect_message(self, msg):
if self.parse_message_proc:
data = self.parse_message_proc(msg)
if type(data) == list:
[proc(d) for proc in self.collect_proc_list for d in data]
else:
[proc(data) for proc in self.collect_proc_list]
if __name__ == '__main__':
def collect_proc(data):
print(data)
def parse_message_proc(msg):
return msg['params']['message']
ticker = bfDataReceiver('lightning_ticker_BTCJPY29JUN2018')
execution = bfDataReceiver('lightning_executions_BTCJPY29JUN2018')
board = bfDataReceiver('lightning_board_BTCJPY29JUN2018')
for ws in (ticker,execution,board):
ws.parse_message_proc = parse_message_proc
ws.collect_proc_list.append(collect_proc)
asyncio.ensure_future(ws.run())
async def wait():
await asyncio.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(wait())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment