Last active
September 8, 2024 03:06
-
-
Save igotit-anything/7272bd4da03de4fe99b811ed270c7da2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pybit.unified_trading import WebSocket | |
from time import sleep | |
import queue | |
import threading | |
# multi symbol | |
symbols_to_get_tick = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"] | |
category = "linear" | |
# 모든 심볼의 최신 틱 데이터를 저장할 dictionary. key=symbol name | |
symbol_tick_last = {} | |
# Tick 데이터 형식 정의 (symbol 제외) | |
tick_dict = { | |
'timestamp_ms': int, # 밀리초 단위의 타임스탬프 | |
'price': str, # 시장가 체결가격 | |
'side': int, # 매수/매도 측 (Buy: +1, Sell: -1로 처리) | |
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리) | |
'block_trade': bool # 블록 거래 여부 (True/False로 처리) | |
} | |
# 메시지 큐 | |
message_queue = queue.Queue() | |
def handle_message(message): | |
"""WebSocket으로부터 수신된 메시지를 큐에 추가하는 함수.""" | |
if 'data' in message: | |
data = message['data'] | |
message_queue.put(data) # 메시지를 큐에 넣음 | |
def process_messages(): | |
"""큐에 쌓인 메시지를 처리하는 함수.""" | |
while True: | |
try: | |
data = message_queue.get(timeout=1) # 큐에서 메시지를 가져옴 | |
if data: | |
for tick_data in data: | |
symbol = tick_data.get('s', '') | |
# side를 정수형으로 변환: Buy는 +1, Sell은 -1 | |
side = tick_data.get('S', '') | |
side_int = +1 if side == 'Buy' else -1 if side == 'Sell' else 0 | |
# tick_data를 tick_dict 형식으로 변환 (symbol 제외) | |
tick_info = { | |
'timestamp_ms': int(tick_data.get('T', 0)), # 타임스탬프는 정수형 | |
'price': str(tick_data.get('p', '')), | |
'side': side_int, # 매수/매도 측은 정수형 | |
'volume_base': str(tick_data.get('v', '')), # 거래량(기본 통화)은 문자열 | |
'block_trade': bool(tick_data.get('BT', False)) # 블록 거래 여부는 bool로 변환 | |
} | |
# symbol_tick_last에 최신 데이터만 저장 (symbol 없이 저장) | |
symbol_tick_last[symbol] = tick_info | |
# 출력 또는 이후 처리 | |
print(f"Updated Tick Data for {symbol}: {tick_info}") | |
except queue.Empty: | |
continue | |
def main(): | |
# 메시지 처리 스레드 시작 | |
processing_thread = threading.Thread(target=process_messages, daemon=True) | |
processing_thread.start() | |
# WebSocket 객체 생성 | |
ws = WebSocket( | |
testnet=False, # | |
channel_type=category # 리니어 채널 설정 | |
) | |
ws.trade_stream( | |
symbol=symbols_to_get_tick, # 구독할 심볼 설정 | |
callback=handle_message # 데이터 수신 시 호출될 콜백 함수 | |
) | |
# 데이터 수신 대기 | |
try: | |
while True: | |
sleep(1) # 1초마다 반복 | |
except KeyboardInterrupt: | |
print("Interrupted by user") | |
finally: | |
ws.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
bybit : https://partner.bybit.com/b/igotit_home
related posting : https://igotit.tistory.com/5831