Last active
February 23, 2021 13:31
-
-
Save freol35241/704d3126cbbbf89bd2ab9eca8e7d8f32 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import enum | |
import time | |
import uuid | |
import logging | |
from typing import Any, Callable, Dict | |
from concurrent.futures import ProcessPoolExecutor, Future, Executor | |
class Status(enum.Enum): | |
ONGOING = "ongoing" | |
QUEUED = "queued" | |
FAILED = "failed" | |
SUCCESSFUL = "successful" | |
### RESULTS "DB" ### | |
RESULTS: Dict[uuid.UUID, Any] = dict() | |
def set_result(job_id: uuid.UUID, result: Any): | |
RESULTS[job_id] = result | |
def get_result(job_id: uuid.UUID) -> Any: | |
return RESULTS[job_id] | |
### | |
class Dispatcher: | |
jobs: Dict[uuid.UUID, Future] = dict() | |
def __init__(self, pool: Executor = ProcessPoolExecutor()): | |
self.pool = pool | |
def submit(self, fn: Callable, *args, **kwargs) -> uuid.UUID: | |
job_id = uuid.uuid4() | |
future = self.pool.submit(fn, *args, **kwargs) | |
def _callback(fut: Future): | |
try: | |
set_result(job_id, fut.result()) | |
except Exception: | |
logging.exception(f"Job with job_id {job_id} failed!") | |
future.add_done_callback(_callback) | |
self.jobs[job_id] = future | |
return job_id | |
def status(self, job_id: uuid.UUID) -> Status: | |
future: Future = self.jobs[job_id] # Will raise KeyError if job_id is invalid | |
if not future.done(): | |
if future.running(): | |
return Status.ONGOING | |
return Status.QUEUED | |
if future.exception(): | |
return Status.FAILED | |
return Status.SUCCESSFUL | |
if __name__ == "__main__": | |
# Blocking dummy task | |
def task(sleep): | |
time.sleep(sleep) | |
if sleep > 3: | |
raise ValueError | |
return f"Slept for {sleep} seconds!" | |
# Dispatcher with default ProcessPoolExecutor | |
d = Dispatcher() | |
# Submit 5 jobs | |
job_ids = [] | |
for i in range(5): | |
job_ids.append(d.submit(task, sleep=i)) | |
# Check if the last task has been started yet | |
print(Dispatcher().status(job_ids[-1])) | |
# Wait until finished | |
time.sleep(5) | |
# Print job statuses | |
for job_id in job_ids: | |
status = d.status(job_id) | |
print(f"{job_id}: {status}") | |
if status == Status.SUCCESSFUL: | |
print(get_result(job_id)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment