Last active
December 25, 2020 20:16
-
-
Save codemation/a3651cc704480dae34e5ba2dccb91b66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio, uuid, time | |
import subprocess | |
from fastapi import FastAPI | |
from easyrpc.server import EasyRpcServer | |
server = FastAPI() | |
work_queue = asyncio.Queue() | |
work_results = {} | |
rpc_server = EasyRpcServer(server, '/ws/jobs', server_secret='abcd1234') | |
@rpc_server.origin(namespace='jobs') | |
async def process_payload(payload, job_type='http'): | |
""" | |
loads payload into work_queue with generated id | |
returns id to requestor for polling | |
job_types: | |
- http | |
- subprocess | |
""" | |
# do the processing | |
request_id = str(uuid.uuid1()) | |
results = f'added payload {payload} to queue with request_id: {request_id}' | |
await work_queue.put((request_id, job_type, payload)) | |
return {"request_id": request_id} | |
@rpc_server.origin(namespace='jobs') | |
async def send_results(request_id, results): | |
""" | |
applies results to work_results with request_id | |
""" | |
work_results[request_id] = results | |
@rpc_server.origin(namespace='jobs') | |
async def get_results(request_id, timeout=5): | |
""" | |
applies results to work_results with request_id | |
""" | |
start = time.time() | |
while time.time() - start < 5: | |
if not request_id in work_results: | |
await asyncio.sleep(0.5) | |
continue | |
return work_results.pop(request_id) | |
return {"error": f"{request_id} not ready"} | |
async def worker(): | |
while True: | |
try: | |
job = await work_queue.get() | |
request_id, job_type, payload = job | |
if job_type == 'http': | |
results = await async_work_http(payload) | |
await send_results(request_id, results) | |
else: | |
await start_subprocess(request_id, payload) | |
except asyncio.CancelledError: | |
break | |
async def start_subprocess(request_id, payload): | |
""" | |
start a subprocess which should use an EasyRpcProxy connection | |
to 'send_results' of work | |
""" | |
p = subprocess.run(['echo', json.dumps(payload), '|', 'python', 'job_subprocess.py', request_id]) | |
async def async_work_http(payload): | |
# insert asynchronus http call here | |
return {'results': 'results'} | |
@server.on_event('startup') | |
async def startup(): | |
# create n mumber workers | |
for _ in range(2): | |
asyncio.create_task(worker()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment