Skip to content

Instantly share code, notes, and snippets.

@igotit-anything
Last active September 8, 2024 03:06
Show Gist options
  • Save igotit-anything/7272bd4da03de4fe99b811ed270c7da2 to your computer and use it in GitHub Desktop.
Save igotit-anything/7272bd4da03de4fe99b811ed270c7da2 to your computer and use it in GitHub Desktop.
from pybit.unified_trading import WebSocket
from time import sleep
import queue
import threading
# multi symbol
symbols_to_get_tick = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"]
category = "linear"
# 모든 심볼의 최신 틱 데이터를 저장할 dictionary. key=symbol name
symbol_tick_last = {}
# Tick 데이터 형식 정의 (symbol 제외)
tick_dict = {
'timestamp_ms': int, # 밀리초 단위의 타임스탬프
'price': str, # 시장가 체결가격
'side': int, # 매수/매도 측 (Buy: +1, Sell: -1로 처리)
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리)
'block_trade': bool # 블록 거래 여부 (True/False로 처리)
}
# 메시지 큐
message_queue = queue.Queue()
def handle_message(message):
"""WebSocket으로부터 수신된 메시지를 큐에 추가하는 함수."""
if 'data' in message:
data = message['data']
message_queue.put(data) # 메시지를 큐에 넣음
def process_messages():
"""큐에 쌓인 메시지를 처리하는 함수."""
while True:
try:
data = message_queue.get(timeout=1) # 큐에서 메시지를 가져옴
if data:
for tick_data in data:
symbol = tick_data.get('s', '')
# side를 정수형으로 변환: Buy는 +1, Sell은 -1
side = tick_data.get('S', '')
side_int = +1 if side == 'Buy' else -1 if side == 'Sell' else 0
# tick_data를 tick_dict 형식으로 변환 (symbol 제외)
tick_info = {
'timestamp_ms': int(tick_data.get('T', 0)), # 타임스탬프는 정수형
'price': str(tick_data.get('p', '')),
'side': side_int, # 매수/매도 측은 정수형
'volume_base': str(tick_data.get('v', '')), # 거래량(기본 통화)은 문자열
'block_trade': bool(tick_data.get('BT', False)) # 블록 거래 여부는 bool로 변환
}
# symbol_tick_last에 최신 데이터만 저장 (symbol 없이 저장)
symbol_tick_last[symbol] = tick_info
# 출력 또는 이후 처리
print(f"Updated Tick Data for {symbol}: {tick_info}")
except queue.Empty:
continue
def main():
# 메시지 처리 스레드 시작
processing_thread = threading.Thread(target=process_messages, daemon=True)
processing_thread.start()
# WebSocket 객체 생성
ws = WebSocket(
testnet=False, #
channel_type=category # 리니어 채널 설정
)
ws.trade_stream(
symbol=symbols_to_get_tick, # 구독할 심볼 설정
callback=handle_message # 데이터 수신 시 호출될 콜백 함수
)
# 데이터 수신 대기
try:
while True:
sleep(1) # 1초마다 반복
except KeyboardInterrupt:
print("Interrupted by user")
finally:
ws.close()
if __name__ == "__main__":
main()
@igotit-anything
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment