Last active
September 8, 2024 05:50
-
-
Save igotit-anything/a0cc5d28f87b2635e2416b0dfde594ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pybit.unified_trading import WebSocket | |
from time import sleep | |
# multi symbol | |
symbols_to_get_tick = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"] | |
category = "linear" | |
# 모든 심볼의 최신 틱 데이터를 저장할 dictionary. key=symbol name | |
symbol_tick_last = {} | |
# Tick 데이터 형식 정의 (symbol 제외) | |
tick_dict = { | |
'timestamp_ms': int, # 밀리초 단위의 타임스탬프 | |
'price' : str, # 시장가 체결가격 | |
'side': int, # 매수/매도 측 (Buy: +1, Sell: -1로 처리) | |
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리) | |
'block_trade': bool # 블록 거래 여부 (True/False로 처리) | |
} | |
# 메시지 핸들러 함수 정의 | |
def handle_message(message): | |
# 틱 데이터 처리 | |
if 'data' in message: | |
data = message['data'] | |
for tick_data in data: | |
symbol = tick_data.get('s', '') | |
# side를 정수형으로 변환: Buy는 +1, Sell은 -1 | |
side = tick_data.get('S', '') | |
side_int = +1 if side == 'Buy' else -1 if side == 'Sell' else 0 | |
# tick_data를 tick_dict 형식으로 변환 (symbol 제외) | |
tick_info = { | |
'timestamp_ms': int(tick_data.get('T', 0)), # 타임스탬프는 정수형 | |
'price' : str(tick_data.get('p', '')), | |
'side': side_int, # 매수/매도 측은 정수형 | |
'volume_base': str(tick_data.get('v', '')), # 거래량(기본 통화)은 문자열 | |
'block_trade': bool(tick_data.get('BT', False)) # 블록 거래 여부는 bool로 변환 | |
} | |
# symbol_tick_last에 최신 데이터만 저장 (symbol 없이 저장) | |
symbol_tick_last[symbol] = tick_info | |
# 출력 또는 이후 처리 | |
print(f"Updated Tick Data for {symbol}: {tick_info}") | |
def main(): | |
# WebSocket 객체 생성 | |
ws = WebSocket( | |
testnet=False, # | |
channel_type=category # 리니어 채널 설정 | |
) | |
ws.trade_stream( | |
symbol=symbols_to_get_tick, # 구독할 심볼 설정 | |
callback=handle_message # 데이터 수신 시 호출될 콜백 함수 | |
) | |
# 데이터 수신 대기 | |
try: | |
while True: | |
sleep(1) # 1초마다 반복 | |
except KeyboardInterrupt: | |
print("Interrupted by user") | |
finally: | |
ws.exit() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
bybit : https://partner.bybit.com/b/igotit_home
related posting : https://igotit.tistory.com/5831