Created
December 29, 2018 16:05
-
-
Save ashwoods/0aecf5812c3109f1ccbbe8e5609214dd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import websockets | |
import asyncio | |
import threading | |
from threading import RLock, Thread | |
from enum import Enum | |
from time import sleep as sleep | |
import ssl | |
lock = threading.RLock() | |
class State(Enum): | |
OFFLINE = 0 | |
ONLINE = 1 | |
IN_CALL = 2 | |
ERROR = -1 | |
NUMBER = 1231239041234123 | |
class Test: | |
"""Testing threaded async""" | |
def __init__(self): | |
self._loop = None | |
self._workers = dict() | |
self._terminate = dict() | |
self.state = State.OFFLINE | |
self.server = 'wss://webrtc.nirbheek.in:8443' | |
def start_worker(self, func): | |
if func in self._workers.keys(): | |
raise Exception("Ops") | |
thread = Thread(target=func, args=()) | |
thread.daemon = True | |
thread.start() | |
self._workers[func] = thread | |
def stop_worker(self, func): | |
self._terminate[self._workers[func]] = True | |
print('Waiting to join with %s (%s)' % (func, self._terminate)) | |
self._workers[func].join() | |
print('Done waiting') | |
del self._workers[func] | |
def release_on_terminate(self): | |
"""this method is supposed to be called in a worker thread""" | |
t = threading.current_thread() | |
if self._terminate.get(t): | |
del self._terminate[t] | |
return true | |
def setup_call(self): | |
msg = 'SESSION 1' | |
loop = asyncio.new_event_loop() | |
loop.run_until_complete(self.conn.send(msg)) | |
async def bye(self): | |
print("sending bye") | |
await self.conn.send('BYE %d' % NUMBER) | |
def send_bye(self): | |
asyncio.run_coroutine_threadsafe(self.bye(), self._loop) | |
async def connect(self): | |
print("connecting") | |
sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) | |
self.conn = await websockets.connect(self.server, ssl=sslctx) | |
await self.conn.send('HELLO %d' % NUMBER) | |
async def loop(self): | |
print("loop hearbeat") | |
assert self.conn | |
async for message in self.conn: | |
print("message heartbeat %s" % message) | |
if message == 'HELLO': | |
self.state = State.ONLINE | |
elif message == 'SESSION_OK': | |
self.state = State.IN_CALL | |
elif message.startswith('ERROR'): | |
self.state = State.ERROR | |
print (message) | |
return 1 | |
else: | |
await self.handle_sdp(message) | |
return 0 | |
async def do_something_async_in_loop(self): | |
print("\n did something async", flush=True) | |
def make_do_something_async(self): | |
self._loop | |
def cancel_loop(self): | |
asyncio.gather(*asyncio.Task.all_tasks()).cancel() | |
self._loop.stop() | |
self._loop.close() | |
def do_something(self): | |
sleep(5) | |
print("something") | |
def eat_monkety(self): | |
sleep(5) | |
print("eat monkey") | |
def pipeline_worker(self): | |
"""This method should run in a worker thread""" | |
while True: | |
if self.release_on_terminate(): | |
break | |
sleep(5) | |
print('pipeline heartbeat') | |
def websockets_worker(self): | |
"""This method should run in a worker thread""" | |
print("staring worker websockets") | |
self._loop = asyncio.new_event_loop() | |
self._loop.run_until_complete(self.connect()) | |
self._loop.run_until_complete(self.loop()) | |
if __name__ == '__main__': | |
t = Test() | |
sleep(2); print('.') | |
t.start_worker(t.pipeline_worker) | |
sleep(2); print('.') | |
t.start_worker(t.websockets_worker) | |
sleep(2); print('.') | |
t.do_something() | |
sleep(5) | |
t.setup_call() | |
sleep(5) | |
t.stop_worker(t.websockets_worker) | |
t.stop_worker(t.pipeline_worker) | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment