Created
December 11, 2022 10:10
-
-
Save norohind/0d38fa1d4daace0643c6f2c396fada08 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import ujson as json | |
from multiprocessing import Process, Queue | |
from random import randint | |
# To measure performance | |
# pv galaxy.json.gz | pigz -d -p 2 | python3 systems_names.py | pv 2>/dev/pts/2 > /dev/null | |
# Without measuring performance | |
# cat galaxy.json.gz | pigz -d -p 2 | python3 systems_names.py | |
def eprint(*args, **kwargs): | |
print(*args, **kwargs, file=sys.stderr) | |
def worker(in_q: Queue, out_q: Queue): | |
batch = str() # Using immutable type to accumulate lines isn't a good choice | |
# But haven't problems with workers performance yet | |
limit = randint(15000, 20000) # random batch size to split Queues locks in time, haven't properly tested performance impact though | |
while lines := in_q.get(): | |
if randint(0, 1000) == 5: # For debug | |
eprint('worker', in_q.qsize()) | |
for line in lines: | |
if len(batch) > limit: | |
out_q.put(batch) | |
batch = str() | |
else: | |
batch += json.loads(line.rstrip(',\n'))['name'] + '\n' # TODO: handle closing ] | |
eprint('Worker stopped', lines) # For debug | |
def printer(in_q: Queue): | |
while lines := in_q.get(): | |
if randint(0, 1000) == 5: # For debug | |
eprint('printer', in_q.qsize()) | |
sys.stdout.write(lines) # TODO: measure performance over print() | |
eprint('Printer stopped') # For debug | |
def main(to_workers, from_workers, workers_count): | |
input() # To skip openning [ | |
workers = [] | |
for _ in range(workers_count): | |
thread = Process(target=worker, args=(to_workers, from_workers)) | |
thread.start() | |
workers.append(thread) | |
printer_thread = Process(target=printer, args=(from_workers,)) | |
printer_thread.start() | |
batch = [] | |
batch_limit = randint(14, 127) # random batch size to split Queues locks in time, haven't properly tested performance impact though | |
for line in sys.stdin: # Performance TODO: Use /dev/stdin and read it as a binary stream | |
if len(batch) > batch_limit: | |
to_workers.put(batch) | |
# batch.clear() | |
batch = [] # [] seems to be x5 times faster than list() | |
# We can't just clear old list as it's may still not be transferred to a worker process | |
# And then worker process will receive an empty list | |
batch_limit = randint(14, 127) | |
else: | |
batch.append(line) | |
# def main(): # Just to convert it to jsonl | |
# input() | |
# for line in sys.stdin: | |
# print(json.loads(line.rstrip(',\n'))['name']) | |
# # print(line.rstrip(',\n')) | |
if __name__ == '__main__': | |
# import signal | |
_to_workers = Queue() | |
_from_workers = Queue() | |
count = 5 | |
# def shutdown(_, __): # TODO: proper sigint, sigterm handling | |
# for _ in range(count): | |
# _to_workers.put(None) | |
# _from_workers.put(None) | |
# signal.signal(signal.SIGINT, shutdown) | |
# signal.signal(signal.SIGTERM, shutdown) | |
main(_to_workers, _from_workers, count) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment