Created
August 21, 2020 18:29
-
-
Save MohamedKari/2097e9c3a48c68bcd3d72cea2b1a379b to your computer and use it in GitHub Desktop.
Sequence-Aware Threading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import importlib | |
from typing import List, Any | |
from enum import Enum | |
from collections import deque | |
from concurrent.futures import ThreadPoolExecutor | |
from abc import ABC, abstractmethod | |
import numpy as np | |
__author__ = "[email protected]" | |
# importlib.reload(logging) | |
# logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG) | |
class SchedulingException(Exception): | |
pass | |
class Constant(): | |
def __init__(self, constant): | |
self.constant = constant | |
def get(self): | |
return self.constant | |
def update(self, _): | |
pass | |
class MovingAverage(): | |
def __init__(self, window_size): | |
self.history = deque(maxlen=window_size) | |
def update(self, value): | |
self.history.appendleft(value) | |
def get(self): | |
if len(self.history) == 0: | |
return 0 | |
return np.sum(np.array(self.history)) / len(self.history) | |
class Resource(ABC): | |
def __init__(self, resource_id): | |
self.resource_id = resource_id | |
self.last_job_id = -1 | |
def _work(self, job_id, task_arg): | |
if job_id < self.last_job_id: | |
raise SchedulingException() | |
return self.work(task_arg) | |
@abstractmethod | |
def work(self, task_arg: Any) -> Any: | |
raise NotImplementedError() | |
class SleepResource(Resource): | |
def work(self, task_arg): | |
time.sleep(np.abs(np.random.normal(.5, .1))) | |
return task_arg | |
class CongestionHandlingMode(Enum): | |
BLOCK = 1 | |
DROP = 2 # no need for support right now | |
QUEUE = 3 # no need for support right now | |
class OrderedScheduler(): | |
def __init__(self, resources: List[Resource]): | |
self.next_result_id = None | |
self.resources = resources | |
self.resource_count = len(self.resources) | |
self.futures_by_job_id = dict() | |
self.thread_pool_executor = ThreadPoolExecutor(self.resource_count) | |
@staticmethod | |
def execute(resource: Resource, job_id, task_arg): | |
return resource._work(job_id, task_arg) # pylint: disable=protected-access | |
def submit(self, job_id, task_arg): | |
next_resource = self.resources[job_id % self.resource_count] | |
future = self.thread_pool_executor.submit(OrderedScheduler.execute, next_resource, job_id, task_arg) | |
self.futures_by_job_id[job_id] = future | |
def fetch(self, job_id): | |
return self.futures_by_job_id[job_id].result() | |
class LaggingOrderedScheduler(): | |
def __init__(self, | |
resources: List[Resource], | |
mean_processing_time: float = None, | |
waiting_risk_aversion_factor: float = 1.1): | |
""" | |
args: | |
mean_processing_time: Used for calculating a throttle time to avoid a batch-like processing pattern. | |
Set to None for moving average. Set to float to throttle by a constant rate. | |
The throttle time is calculated as (mean_processing_time / len(resources)). | |
If you have a reliable estimate of how long workers need for processing, | |
setting this the mean_processing_time can yield more predictable cycle times. | |
However, if you set this value lower than it actually is, will probably result in a batched processing pattern. | |
If you set it too high, you will spend time idling without exploiting resource capacities. | |
waiting_risk_aversion_factor (float): The WRAF smoothes out variance of worker durations. Defaults to 1.0. | |
Esp. interesting to tune, if a moving average is used for mean_processing_time estimation. | |
If you observe varying worker durations, increase this factor. | |
This will increase the average response time, | |
but decrease the occurence of peaks due to waiting for yet unbuffered results. | |
""" | |
self.resources = resources | |
self.resource_count = len(self.resources) | |
self.prebuffer_size = self.resource_count | |
self.futures_by_job_id = dict() | |
self.start_timestamp_by_future = dict() | |
self.thread_pool_executor = ThreadPoolExecutor(self.resource_count) | |
if mean_processing_time: | |
self.mean_processing_time = Constant(mean_processing_time) | |
else: | |
self.mean_processing_time = MovingAverage(None) | |
self.waiting_risk_aversion_factor = waiting_risk_aversion_factor | |
@staticmethod | |
def execute(resource: Resource, job_id, task_arg): | |
return resource._work(job_id, task_arg) # pylint: disable=protected-access | |
def submit_and_fetch_latest(self, job_id, task_arg): | |
logging.getLogger(__name__).info("[%s] Running submit_and_fetch_latest ...", job_id) | |
if task_arg is not None: | |
next_resource = self.resources[job_id % self.resource_count] | |
logging.getLogger(__name__).info("[%s] Submitting to resource %s ...", job_id, next_resource) | |
future = self.thread_pool_executor.submit(LaggingOrderedScheduler.execute, next_resource, job_id, task_arg) | |
logging.getLogger(__name__).info("[%s] Obtained future on resource job ...") | |
self.start_timestamp_by_future[future] = time.time() | |
future.add_done_callback(lambda fut: self.mean_processing_time.update(time.time() - self.start_timestamp_by_future[fut])) | |
self.futures_by_job_id[job_id] = future | |
latest_id = job_id - self.prebuffer_size | |
latest_result = None | |
logging.getLogger(__name__).info("[%s] latest_id is %s ...", job_id, latest_id) | |
if latest_id >= 0: | |
logging.getLogger(__name__).info("[%s] Awaiting buffered job for latest_id %s ...", job_id, latest_id) | |
latest_result = self.futures_by_job_id[latest_id].result() | |
logging.getLogger(__name__).info("[%s] Retrieved buffered job result for latest_id %s ...", job_id, latest_id) | |
throttle_duration = self.mean_processing_time.get() * self.waiting_risk_aversion_factor / self.resource_count | |
# TODO: subtract this from the actual elapsed time to account for delays in this call | |
logging.getLogger(__name__).info("[%s] Throttling by %s seconds ...", job_id, throttle_duration) | |
# time.sleep(throttle_duration) | |
return latest_id, latest_result | |
class LaggingClient(): | |
def __init__(self, n_resources): | |
self.scheduler = LaggingOrderedScheduler( | |
[SleepResource(i) for i in range(n_resources)] | |
) | |
self.n_resources = n_resources | |
print(self) | |
def __str__(self): | |
return f"LaggingClient(n_resources={self.n_resources})" | |
def run(self, job_id, task_arg): | |
return self.scheduler.submit_and_fetch_latest(job_id, task_arg) | |
def run_all(self, tasks): | |
start_all = time.time() | |
job_counter = 0 | |
while len(tasks) > 0: | |
task = tasks.pop() | |
start = time.time() | |
job_id, result = self.run(job_counter, task) | |
duration = time.time() - start | |
print(f"[job_id:{job_counter}/{job_id}]", f"result: {result}", f"({duration*1000:.3f}ms)") | |
job_counter += 1 | |
for _ in range(self.n_resources): | |
job_id, result = self.run(job_counter, None) | |
print(f"[job_id:{job_counter}/{job_id}]", f"result: {result}", f"({duration*1000:.3f}ms)") | |
job_counter += 1 | |
duration_all = time.time() - start_all | |
print("op/s", job_counter/duration_all) | |
class AsynchronousClient(): | |
def __init__(self, n_resources, batch_size): | |
self.ordered_scheduler = OrderedScheduler( | |
[SleepResource(i) for i in range(n_resources)] | |
) | |
self.batch_size = batch_size | |
self.n_resources = n_resources | |
print(self) | |
def __str__(self): | |
return ( | |
f"AsynchronousClient(" | |
f"n_resources={self.n_resources}, " | |
f"batch_size={self.batch_size}" | |
f")" | |
) | |
def run(self, job_id, task_arg): | |
self.ordered_scheduler.submit(job_id, task_arg) | |
def run_all(self, tasks): | |
start_all = time.time() | |
job_counter = 0 | |
while len(tasks) > 0: | |
job_ids_to_fetch_later = list() | |
for _ in range(min(self.batch_size, len(tasks))): | |
task = tasks.pop() | |
start = time.time() | |
self.run(job_counter, task) | |
job_ids_to_fetch_later.append(job_counter) | |
duration = time.time() - start | |
job_counter += 1 | |
for job_id in job_ids_to_fetch_later: | |
result = self.ordered_scheduler.fetch(job_id) | |
print(f"[job_id:{job_id}]", f"result: {result}", f"({duration*1000:.3f}ms)") | |
duration_all = time.time() - start_all | |
print("op/s", job_counter/duration_all) | |
class SynchronousClient(): | |
def __init__(self): | |
self.resource = SleepResource(0) | |
print("SynchronousClient") | |
def run(self, job_id, task_arg): | |
result = self.resource.work(task_arg) | |
return result | |
def run_all(self, tasks): | |
start_all = time.time() | |
job_counter = 0 | |
while len(tasks) > 0: | |
task = tasks.pop() | |
start = time.time() | |
result = self.run(job_counter, task) | |
duration = time.time() - start | |
print(f"[job_id:{job_counter}]", f"result: {result}", f"({duration*1000:.3f}ms)") | |
job_counter += 1 | |
duration_all = time.time() - start_all | |
print("op/s", job_counter/duration_all) | |
def showcase(): | |
n_tasks = 30 | |
tasks = list(range(n_tasks-1, -1, -1)) | |
# Synchronous | |
SynchronousClient().run_all(list(tasks)) | |
# Identical to synchronous | |
AsynchronousClient(n_resources=1, batch_size=1).run_all(list(tasks)) | |
# Distributed by effectively synchronous | |
AsynchronousClient(n_resources=5, batch_size=1).run_all(list(tasks)) | |
# Batched | |
AsynchronousClient(n_resources=5, batch_size=5).run_all(list(tasks)) | |
# Full batch | |
AsynchronousClient(n_resources=len(tasks), batch_size=len(tasks)).run_all(list(tasks)) | |
# Lagging Ordered | |
LaggingClient(n_resources=3).run_all(list(tasks)) | |
if __name__ == "__main__": | |
showcase() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment