Skip to content

Instantly share code, notes, and snippets.

@doseeing
Last active November 8, 2019 08:54
Show Gist options
  • Save doseeing/9d8a2795570025e06761c41efa23b322 to your computer and use it in GitHub Desktop.
Save doseeing/9d8a2795570025e06761c41efa23b322 to your computer and use it in GitHub Desktop.
huya open api example
import jwt
import time
import asyncio
import websockets
from websockets.extensions import permessage_deflate
from config import huya_app_id, huya_app_secret
async def keep_alive(ws):
while True:
print('sending ping')
await ws.send('ping')
await asyncio.sleep(10)
async def start(room_id):
now = int(time.time())
expire = now + 600
payload = {
"iat":now,
"exp":expire,
"appId":huya_app_id
}
sToken = jwt.encode(payload, huya_app_secret, algorithm='HS256').decode()
url = f"ws://ws-apiext.huya.com/index.html?do=comm&roomId={room_id}&appId={huya_app_id}&iat={now}&exp={expire}&sToken={sToken}"
print(url)
ws = await websockets.connect(url, ping_interval=None)
print('connected')
asyncio.ensure_future(keep_alive(ws))
req_id = str(int(time.time()))
await ws.send('{"command":"subscribeNotice","data":["getMessageNotice"],"reqId":"'+req_id+'"}')
while True:
_data = await ws.recv()
print(_data)
if __name__ == '__main__':
asyncio.ensure_future(start('521000'))
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment