Skip to content

Instantly share code, notes, and snippets.

@MohamedKari
Created August 21, 2020 18:29
Show Gist options
  • Save MohamedKari/2097e9c3a48c68bcd3d72cea2b1a379b to your computer and use it in GitHub Desktop.
Save MohamedKari/2097e9c3a48c68bcd3d72cea2b1a379b to your computer and use it in GitHub Desktop.
Sequence-Aware Threading
import time
import logging
import importlib
from typing import List, Any
from enum import Enum
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from abc import ABC, abstractmethod
import numpy as np
__author__ = "[email protected]"
# importlib.reload(logging)
# logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG)
class SchedulingException(Exception):
pass
class Constant():
def __init__(self, constant):
self.constant = constant
def get(self):
return self.constant
def update(self, _):
pass
class MovingAverage():
def __init__(self, window_size):
self.history = deque(maxlen=window_size)
def update(self, value):
self.history.appendleft(value)
def get(self):
if len(self.history) == 0:
return 0
return np.sum(np.array(self.history)) / len(self.history)
class Resource(ABC):
def __init__(self, resource_id):
self.resource_id = resource_id
self.last_job_id = -1
def _work(self, job_id, task_arg):
if job_id < self.last_job_id:
raise SchedulingException()
return self.work(task_arg)
@abstractmethod
def work(self, task_arg: Any) -> Any:
raise NotImplementedError()
class SleepResource(Resource):
def work(self, task_arg):
time.sleep(np.abs(np.random.normal(.5, .1)))
return task_arg
class CongestionHandlingMode(Enum):
BLOCK = 1
DROP = 2 # no need for support right now
QUEUE = 3 # no need for support right now
class OrderedScheduler():
def __init__(self, resources: List[Resource]):
self.next_result_id = None
self.resources = resources
self.resource_count = len(self.resources)
self.futures_by_job_id = dict()
self.thread_pool_executor = ThreadPoolExecutor(self.resource_count)
@staticmethod
def execute(resource: Resource, job_id, task_arg):
return resource._work(job_id, task_arg) # pylint: disable=protected-access
def submit(self, job_id, task_arg):
next_resource = self.resources[job_id % self.resource_count]
future = self.thread_pool_executor.submit(OrderedScheduler.execute, next_resource, job_id, task_arg)
self.futures_by_job_id[job_id] = future
def fetch(self, job_id):
return self.futures_by_job_id[job_id].result()
class LaggingOrderedScheduler():
def __init__(self,
resources: List[Resource],
mean_processing_time: float = None,
waiting_risk_aversion_factor: float = 1.1):
"""
args:
mean_processing_time: Used for calculating a throttle time to avoid a batch-like processing pattern.
Set to None for moving average. Set to float to throttle by a constant rate.
The throttle time is calculated as (mean_processing_time / len(resources)).
If you have a reliable estimate of how long workers need for processing,
setting this the mean_processing_time can yield more predictable cycle times.
However, if you set this value lower than it actually is, will probably result in a batched processing pattern.
If you set it too high, you will spend time idling without exploiting resource capacities.
waiting_risk_aversion_factor (float): The WRAF smoothes out variance of worker durations. Defaults to 1.0.
Esp. interesting to tune, if a moving average is used for mean_processing_time estimation.
If you observe varying worker durations, increase this factor.
This will increase the average response time,
but decrease the occurence of peaks due to waiting for yet unbuffered results.
"""
self.resources = resources
self.resource_count = len(self.resources)
self.prebuffer_size = self.resource_count
self.futures_by_job_id = dict()
self.start_timestamp_by_future = dict()
self.thread_pool_executor = ThreadPoolExecutor(self.resource_count)
if mean_processing_time:
self.mean_processing_time = Constant(mean_processing_time)
else:
self.mean_processing_time = MovingAverage(None)
self.waiting_risk_aversion_factor = waiting_risk_aversion_factor
@staticmethod
def execute(resource: Resource, job_id, task_arg):
return resource._work(job_id, task_arg) # pylint: disable=protected-access
def submit_and_fetch_latest(self, job_id, task_arg):
logging.getLogger(__name__).info("[%s] Running submit_and_fetch_latest ...", job_id)
if task_arg is not None:
next_resource = self.resources[job_id % self.resource_count]
logging.getLogger(__name__).info("[%s] Submitting to resource %s ...", job_id, next_resource)
future = self.thread_pool_executor.submit(LaggingOrderedScheduler.execute, next_resource, job_id, task_arg)
logging.getLogger(__name__).info("[%s] Obtained future on resource job ...")
self.start_timestamp_by_future[future] = time.time()
future.add_done_callback(lambda fut: self.mean_processing_time.update(time.time() - self.start_timestamp_by_future[fut]))
self.futures_by_job_id[job_id] = future
latest_id = job_id - self.prebuffer_size
latest_result = None
logging.getLogger(__name__).info("[%s] latest_id is %s ...", job_id, latest_id)
if latest_id >= 0:
logging.getLogger(__name__).info("[%s] Awaiting buffered job for latest_id %s ...", job_id, latest_id)
latest_result = self.futures_by_job_id[latest_id].result()
logging.getLogger(__name__).info("[%s] Retrieved buffered job result for latest_id %s ...", job_id, latest_id)
throttle_duration = self.mean_processing_time.get() * self.waiting_risk_aversion_factor / self.resource_count
# TODO: subtract this from the actual elapsed time to account for delays in this call
logging.getLogger(__name__).info("[%s] Throttling by %s seconds ...", job_id, throttle_duration)
# time.sleep(throttle_duration)
return latest_id, latest_result
class LaggingClient():
def __init__(self, n_resources):
self.scheduler = LaggingOrderedScheduler(
[SleepResource(i) for i in range(n_resources)]
)
self.n_resources = n_resources
print(self)
def __str__(self):
return f"LaggingClient(n_resources={self.n_resources})"
def run(self, job_id, task_arg):
return self.scheduler.submit_and_fetch_latest(job_id, task_arg)
def run_all(self, tasks):
start_all = time.time()
job_counter = 0
while len(tasks) > 0:
task = tasks.pop()
start = time.time()
job_id, result = self.run(job_counter, task)
duration = time.time() - start
print(f"[job_id:{job_counter}/{job_id}]", f"result: {result}", f"({duration*1000:.3f}ms)")
job_counter += 1
for _ in range(self.n_resources):
job_id, result = self.run(job_counter, None)
print(f"[job_id:{job_counter}/{job_id}]", f"result: {result}", f"({duration*1000:.3f}ms)")
job_counter += 1
duration_all = time.time() - start_all
print("op/s", job_counter/duration_all)
class AsynchronousClient():
def __init__(self, n_resources, batch_size):
self.ordered_scheduler = OrderedScheduler(
[SleepResource(i) for i in range(n_resources)]
)
self.batch_size = batch_size
self.n_resources = n_resources
print(self)
def __str__(self):
return (
f"AsynchronousClient("
f"n_resources={self.n_resources}, "
f"batch_size={self.batch_size}"
f")"
)
def run(self, job_id, task_arg):
self.ordered_scheduler.submit(job_id, task_arg)
def run_all(self, tasks):
start_all = time.time()
job_counter = 0
while len(tasks) > 0:
job_ids_to_fetch_later = list()
for _ in range(min(self.batch_size, len(tasks))):
task = tasks.pop()
start = time.time()
self.run(job_counter, task)
job_ids_to_fetch_later.append(job_counter)
duration = time.time() - start
job_counter += 1
for job_id in job_ids_to_fetch_later:
result = self.ordered_scheduler.fetch(job_id)
print(f"[job_id:{job_id}]", f"result: {result}", f"({duration*1000:.3f}ms)")
duration_all = time.time() - start_all
print("op/s", job_counter/duration_all)
class SynchronousClient():
def __init__(self):
self.resource = SleepResource(0)
print("SynchronousClient")
def run(self, job_id, task_arg):
result = self.resource.work(task_arg)
return result
def run_all(self, tasks):
start_all = time.time()
job_counter = 0
while len(tasks) > 0:
task = tasks.pop()
start = time.time()
result = self.run(job_counter, task)
duration = time.time() - start
print(f"[job_id:{job_counter}]", f"result: {result}", f"({duration*1000:.3f}ms)")
job_counter += 1
duration_all = time.time() - start_all
print("op/s", job_counter/duration_all)
def showcase():
n_tasks = 30
tasks = list(range(n_tasks-1, -1, -1))
# Synchronous
SynchronousClient().run_all(list(tasks))
# Identical to synchronous
AsynchronousClient(n_resources=1, batch_size=1).run_all(list(tasks))
# Distributed by effectively synchronous
AsynchronousClient(n_resources=5, batch_size=1).run_all(list(tasks))
# Batched
AsynchronousClient(n_resources=5, batch_size=5).run_all(list(tasks))
# Full batch
AsynchronousClient(n_resources=len(tasks), batch_size=len(tasks)).run_all(list(tasks))
# Lagging Ordered
LaggingClient(n_resources=3).run_all(list(tasks))
if __name__ == "__main__":
showcase()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment