Created
September 3, 2024 18:40
-
-
Save progeroffline/dabc2ceaafea8c3d194c81f10ab4bf73 to your computer and use it in GitHub Desktop.
Python example for connecting to lis-skins websockets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import requests | |
from centrifuge import ( | |
Client, | |
ClientEventHandler, | |
ConnectedContext, | |
PublicationContext, | |
SubscriptionEventHandler, | |
) | |
class CustomClientEventHandler(ClientEventHandler): | |
async def on_connected(self, ctx: ConnectedContext): | |
print(ctx) | |
class CustomSubscriptionEventHandler(SubscriptionEventHandler): | |
async def on_publication(self, ctx: PublicationContext): | |
print(ctx) | |
async def main(): | |
client = Client( | |
address="wss://ws.lis-skins.ru/connection/websocket", | |
events=CustomClientEventHandler(), | |
token=requests.get( | |
"https://api.lis-skins.ru/v1/user/get-ws-token", | |
headers={ | |
"Authorization": "Bearer YOUR_API_KEY", | |
}, | |
).json()["data"]["token"], | |
) | |
await client.connect() | |
subscription = client.new_subscription( | |
"public:obtained-skins", | |
CustomSubscriptionEventHandler(), | |
) | |
await subscription.subscribe() | |
if __name__ == "__main__": | |
loop = asyncio.new_event_loop() | |
loop.create_task(main()) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Install by this command