Skip to content

Instantly share code, notes, and snippets.

@MtkN1
Created September 20, 2022 01:56
Show Gist options
  • Save MtkN1/2feb08d6a50ab7382ebf96f6c12aa54a to your computer and use it in GitHub Desktop.
Save MtkN1/2feb08d6a50ab7382ebf96f6c12aa54a to your computer and use it in GitHub Desktop.
import asyncio
from typing import List
import pybotters
import streamlit as st
async def watch_event(
store: pybotters.BybitUSDTDataStore, name: str, result: List, event: asyncio.Event
):
with store[name].watch() as stream:
async for change in stream:
result.insert(0, change.data)
event.set()
async def main():
async with pybotters.Client() as client:
# WebSocket 接続
store = pybotters.BybitUSDTDataStore()
await client.ws_connect(
"wss://stream.bybit.com/realtime_public",
send_json={
"op": "subscribe",
"args": ["trade.BTCUSDT", "candle.1.BTCUSDT"],
},
hdlr_json=store.onmessage,
)
# データハンドリングタスク
event = asyncio.Event()
data_1 = []
data_2 = []
asyncio.create_task(watch_event(store, "trade", data_1, event))
asyncio.create_task(watch_event(store, "kline", data_2, event))
# 初期描画
st.markdown("# Trade")
element_1 = st.empty()
st.markdown("# Kline")
element_2 = st.empty()
# 再描画
while True:
await event.wait()
element_1.dataframe(data_1)
element_2.dataframe(data_2)
event.clear()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment