Last active
February 4, 2023 21:51
-
-
Save Koed00/3f6de495ea6af455e95e to your computer and use it in GitHub Desktop.
Multi horse worker stable test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Queue, current_process | |
from random import randint | |
from time import sleep | |
import sys | |
def something(): | |
x = randint(1, 20) | |
if x == 9: | |
# cause some trouble | |
raise InterruptedError | |
sleep(x) | |
return True | |
class Worker(object): | |
def __init__(self): | |
# TODO stable size should be settable | |
self.stable_size = 4 | |
self.stable = [] | |
self.task_queue = Queue() | |
self.done_queue = Queue() | |
for i in range(self.stable_size): | |
self._spawn_horse() | |
def _spawn_horse(self): | |
# This is just for PyCharm to not crash. Ignore it. | |
if not hasattr(sys.stdin, 'close'): | |
def dummy_close(): | |
pass | |
sys.stdin.close = dummy_close | |
p = Process(target=self.horse, args=(self.task_queue, self.done_queue)) | |
self.stable.append(p) | |
p.start() | |
@staticmethod | |
def horse(queue_in, queue_out): | |
name = current_process().name | |
print(name, 'Ready for work at {}'.format(current_process().pid)) | |
for func in iter(queue_in.get, 'STOP'): | |
print(name, 'Starting Job {}'.format(func)) | |
result = something() | |
queue_out.put(result) | |
print(name, 'Exiting Job {}'.format(func)) | |
print(name, 'Stopped') | |
def work(self, job): | |
# Everyone still alive? | |
self.stable_boy() | |
# Put the job in the queue | |
self.task_queue.put(job) | |
def stable_boy(self): | |
# Check if all the horses are alive | |
for p in list(self.stable): | |
if not p.is_alive(): | |
# Be humane | |
p.terminate() | |
self.stable.remove(p) | |
# Replace it with a fresh one | |
self._spawn_horse() | |
def exit(self): | |
# Send the STOP signal to the stable | |
for i in range(self.stable_size): | |
self.task_queue.put('STOP') | |
# Optional: Delete everything in the queue and then add STOP | |
# Wait for all the workers to finish the queue | |
for p in self.stable: | |
p.join() | |
print('All horses done.') | |
def test_worker(self): | |
for i in range(20): | |
print('Queuing Job {}'.format(i)) | |
self.work(i) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have kept working on this the last two weeks and added recycling and timeout capability.
You can have a look at it here https://github.com/Koed00/django-q in cluster.py. Feedback is much appreciated.