Skip to content

Instantly share code, notes, and snippets.

@ashwoods
Created December 29, 2018 16:05
Show Gist options
  • Save ashwoods/0aecf5812c3109f1ccbbe8e5609214dd to your computer and use it in GitHub Desktop.
Save ashwoods/0aecf5812c3109f1ccbbe8e5609214dd to your computer and use it in GitHub Desktop.
import websockets
import asyncio
import threading
from threading import RLock, Thread
from enum import Enum
from time import sleep as sleep
import ssl
lock = threading.RLock()
class State(Enum):
OFFLINE = 0
ONLINE = 1
IN_CALL = 2
ERROR = -1
NUMBER = 1231239041234123
class Test:
"""Testing threaded async"""
def __init__(self):
self._loop = None
self._workers = dict()
self._terminate = dict()
self.state = State.OFFLINE
self.server = 'wss://webrtc.nirbheek.in:8443'
def start_worker(self, func):
if func in self._workers.keys():
raise Exception("Ops")
thread = Thread(target=func, args=())
thread.daemon = True
thread.start()
self._workers[func] = thread
def stop_worker(self, func):
self._terminate[self._workers[func]] = True
print('Waiting to join with %s (%s)' % (func, self._terminate))
self._workers[func].join()
print('Done waiting')
del self._workers[func]
def release_on_terminate(self):
"""this method is supposed to be called in a worker thread"""
t = threading.current_thread()
if self._terminate.get(t):
del self._terminate[t]
return true
def setup_call(self):
msg = 'SESSION 1'
loop = asyncio.new_event_loop()
loop.run_until_complete(self.conn.send(msg))
async def bye(self):
print("sending bye")
await self.conn.send('BYE %d' % NUMBER)
def send_bye(self):
asyncio.run_coroutine_threadsafe(self.bye(), self._loop)
async def connect(self):
print("connecting")
sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
self.conn = await websockets.connect(self.server, ssl=sslctx)
await self.conn.send('HELLO %d' % NUMBER)
async def loop(self):
print("loop hearbeat")
assert self.conn
async for message in self.conn:
print("message heartbeat %s" % message)
if message == 'HELLO':
self.state = State.ONLINE
elif message == 'SESSION_OK':
self.state = State.IN_CALL
elif message.startswith('ERROR'):
self.state = State.ERROR
print (message)
return 1
else:
await self.handle_sdp(message)
return 0
async def do_something_async_in_loop(self):
print("\n did something async", flush=True)
def make_do_something_async(self):
self._loop
def cancel_loop(self):
asyncio.gather(*asyncio.Task.all_tasks()).cancel()
self._loop.stop()
self._loop.close()
def do_something(self):
sleep(5)
print("something")
def eat_monkety(self):
sleep(5)
print("eat monkey")
def pipeline_worker(self):
"""This method should run in a worker thread"""
while True:
if self.release_on_terminate():
break
sleep(5)
print('pipeline heartbeat')
def websockets_worker(self):
"""This method should run in a worker thread"""
print("staring worker websockets")
self._loop = asyncio.new_event_loop()
self._loop.run_until_complete(self.connect())
self._loop.run_until_complete(self.loop())
if __name__ == '__main__':
t = Test()
sleep(2); print('.')
t.start_worker(t.pipeline_worker)
sleep(2); print('.')
t.start_worker(t.websockets_worker)
sleep(2); print('.')
t.do_something()
sleep(5)
t.setup_call()
sleep(5)
t.stop_worker(t.websockets_worker)
t.stop_worker(t.pipeline_worker)
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment