Last active
February 4, 2023 21:51
-
-
Save Koed00/3f6de495ea6af455e95e to your computer and use it in GitHub Desktop.
Multi horse worker stable test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Queue, current_process | |
from random import randint | |
from time import sleep | |
import sys | |
def something(): | |
x = randint(1, 20) | |
if x == 9: | |
# cause some trouble | |
raise InterruptedError | |
sleep(x) | |
return True | |
class Worker(object): | |
def __init__(self): | |
# TODO stable size should be settable | |
self.stable_size = 4 | |
self.stable = [] | |
self.task_queue = Queue() | |
self.done_queue = Queue() | |
for i in range(self.stable_size): | |
self._spawn_horse() | |
def _spawn_horse(self): | |
# This is just for PyCharm to not crash. Ignore it. | |
if not hasattr(sys.stdin, 'close'): | |
def dummy_close(): | |
pass | |
sys.stdin.close = dummy_close | |
p = Process(target=self.horse, args=(self.task_queue, self.done_queue)) | |
self.stable.append(p) | |
p.start() | |
@staticmethod | |
def horse(queue_in, queue_out): | |
name = current_process().name | |
print(name, 'Ready for work at {}'.format(current_process().pid)) | |
for func in iter(queue_in.get, 'STOP'): | |
print(name, 'Starting Job {}'.format(func)) | |
result = something() | |
queue_out.put(result) | |
print(name, 'Exiting Job {}'.format(func)) | |
print(name, 'Stopped') | |
def work(self, job): | |
# Everyone still alive? | |
self.stable_boy() | |
# Put the job in the queue | |
self.task_queue.put(job) | |
def stable_boy(self): | |
# Check if all the horses are alive | |
for p in list(self.stable): | |
if not p.is_alive(): | |
# Be humane | |
p.terminate() | |
self.stable.remove(p) | |
# Replace it with a fresh one | |
self._spawn_horse() | |
def exit(self): | |
# Send the STOP signal to the stable | |
for i in range(self.stable_size): | |
self.task_queue.put('STOP') | |
# Optional: Delete everything in the queue and then add STOP | |
# Wait for all the workers to finish the queue | |
for p in self.stable: | |
p.join() | |
print('All horses done.') | |
def test_worker(self): | |
for i in range(20): | |
print('Queuing Job {}'.format(i)) | |
self.work(i) |
I have kept working on this the last two weeks and added recycling and timeout capability.
You can have a look at it here https://github.com/Koed00/django-q in cluster.py. Feedback is much appreciated.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this! In RQ's built in worker, the parent process is responsible for fetching job from the queue before feeding them to the work horse. Looking at your code above, it seems like each horse will be responsible for doing the fetching and storing result into Redis, is that correct?
If we were to move into a concurrent worker model, I think this is also the right move. If we turn the parent process into a supervisor that periodically checks whether it's horses are alive, RQ will also be more reliable since it can detect horses that die because of unexpected errors (there's also an issue for this somewhere).