Last active
June 22, 2021 01:09
-
-
Save vsraptor/6f11bc0b8c489bef43f747142496e4f2 to your computer and use it in GitHub Desktop.
Parallel text processing ++
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Three things : | |
1. Processing text file via memory-mapped generator ... two birds with one stone | |
- great for large files | |
- great if file is used in multiple processes | |
2. Process lazily iterator, range or list in CHUNKS, you can use this to pass the data in chunks | |
3. Use (1) and RAY to process files asyncrously in Parallel | |
I use this to compare every line of one file with every other line of the same file. | |
The outer loop is the main process the inner loop is N actors simultaneously open the same file N-times. | |
""" | |
#MMapped file iterator | |
def file_iter(file, upto=5, display_steps=None): | |
with open(file,'r+b') as f : | |
mm = mmap.mmap(f.fileno(), 0, flags=mmap.MAP_SHARED) | |
for i,bin_line in enumerate(iter(mm.readline,b'')) : | |
if display_steps is not None and i % display_steps == 0 : print(f'> {i} ', end="\n") | |
if upto is not None and i > upto : break | |
line = bin_line.decode("utf-8").strip() | |
yield line | |
mm.close() | |
#returns lazily an iterator,range,list in chunks | |
def chunker(it,size): | |
rv = [] | |
for i,el in enumerate(it,1) : | |
rv.append(el) | |
if i % size == 0 : | |
yield rv | |
rv = [] | |
if rv : yield rv | |
import ray | |
from ray.util import ActorPool | |
if not ray.is_initialized() : ray.init() | |
#create the pool of actors, for map | |
a1, a2 = Actor.remote(), Actor.remote() | |
pool = ActorPool([a1, a2]) | |
res = pool.map_unordered( | |
#specify the actor method to call | |
lambda a,line: a.actor_method.remote(line), | |
#pass the lines of the file one by one to the waiting actors | |
file_iter(file,outer_upto,display_steps) | |
) | |
for x in res : | |
print(list(res)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment