-
-
Save oeway/7f58937eb9a7a86f43533b779c2a798d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import janus | |
import asyncio | |
import time | |
import threading | |
from bson.objectid import ObjectId | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.asyncio.wamp import ApplicationRunner | |
import uuid | |
task_q = janus.Queue() | |
result_q = janus.Queue() | |
futureDict = {} | |
# This is y2 | |
async def process_result(): | |
while True: | |
result = result_q.async_q.get() | |
if result['task_id'] in futureDict: | |
future = futureDict[result['task_id']] | |
if result['success']: | |
future.set_result(result['result']) | |
else: | |
future.set_exception(result['exception']) | |
del futureDict[result['task_id']] | |
def heavy_calc(*args, **kwargs): | |
time.sleep(2) | |
return [1, 2, 4] | |
# this is t3 | |
def process_tasks(): | |
while True: | |
task = task_q.sync_q.get() | |
if task['name'] == 'heavy_calc': | |
try: | |
ret = heavy_calc(**task['kwargs']) | |
result_q.sync_q.put({'task_id': task['id'], 'success': True, 'result': ret}) | |
except Exception as e: | |
result_q.sync_q.put({'task_id': task['id'], 'success': False, 'exception': e}) | |
else: | |
result_q.sync_q.put({'task_id': task['id'], 'success': False, 'exception': NotImplemented }) | |
class FrontendSession(ApplicationSession): | |
def onConnect(self): | |
self.join(self.config.realm, [u"ticket"], u'backend') | |
def onChallenge(self, challenge): | |
if challenge.method == u"ticket": | |
return u'sg-ai.com' | |
else: | |
raise Exception("Invalid authmethod {}".format(challenge.method)) | |
async def onJoin(self, details): | |
print(self._session_id) | |
asyncio.ensure_future(process_result()) | |
@wamp.register('com.example.heavy_calculation') | |
async def heavy_calculation(self, fname, reg): | |
new_id = str(uuid.uuid4()) | |
task_q.async_q.put({'id': new_id, 'name': 'heavy_calc', 'kwargs': {'a': 1, 'b': 2}}) | |
future = asyncio.Future() | |
futureDict[new_id] = future | |
return await future | |
def onLeave(self, details): | |
print("Client session left: {}".format(details)) | |
self.disconnect() | |
def onDisconnect(self): | |
print("Client session disconnected.") | |
if __name__ == '__main__': | |
t = threading.Thread(target=process_tasks) | |
t.setDaemon(True) | |
t.start() | |
# this is y1 | |
runner = ApplicationRunner(url='wss://dcrossbar.sg-ai.com/ws', realm='realm1') | |
runner.run(FrontendSession, log_level='debug') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment