Skip to content

Instantly share code, notes, and snippets.

@igotit-anything
Last active September 7, 2024 11:13
Show Gist options
  • Save igotit-anything/566d9a44cbb9369789c3d0a105f7f265 to your computer and use it in GitHub Desktop.
Save igotit-anything/566d9a44cbb9369789c3d0a105f7f265 to your computer and use it in GitHub Desktop.
from pybit.unified_trading import WebSocket
from time import sleep
# Tick 데이터 형식 정의
tick_dict = {
'timestamp_ms': int, # 밀리초 단위의 타임스탬프
'symbol': str, # 거래 심볼
'side': str, # 매수/매도 측 (예: "Buy", "Sell")
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리)
'volume_quote': str, # 기준 통화의 거래량 (문자열로 처리)
'block_trade': str # 블록 거래 여부 (문자열로 처리)
}
# 메시지 핸들러 함수 정의
def handle_message(message):
if 'data' in message:
data = message['data']
for tick_data in data:
# 필요한 항목만 추출하여 dictionary로 정리
tick_info = {
'timestamp_ms': int(tick_data.get('T', 0)), # 정수형
'symbol': str(tick_data.get('s', '')), # 문자열
'side': str(tick_data.get('S', '')), # 문자열
'volume_base': str(tick_data.get('v', '')), # 문자열
'volume_quote': str(tick_data.get('p', '')), # 문자열
'block_trade': str(tick_data.get('BT', '')) # 문자열
}
# 출력 또는 이후 처리
print("Processed Tick Data:", tick_info)
else:
print("Received Message without 'data' field:", message)
def main():
# WebSocket 객체 생성
ws = WebSocket(
testnet=False, # 실제 거래가 아닌 테스트넷을 사용할 경우 True로 설정
channel_type="linear" # 리니어 채널 설정
)
# 다수의 거래 데이터 구독
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"]
ws.trade_stream(
symbol=symbols, # 구독할 심볼 설정
callback=handle_message # 데이터 수신 시 호출될 콜백 함수
)
# 데이터 수신 대기
try:
while True:
sleep(1) # 1초마다 반복
except KeyboardInterrupt: # CTRL C
print("Interrupted by user")
if __name__ == "__main__":
main()
@igotit-anything
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment