Last active
June 16, 2017 10:25
-
-
Save maatthc/d1b3cba0ac135b3fb41d3c3d31314428 to your computer and use it in GitHub Desktop.
Read numbers from a big file and present the N largest numbers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from sys import argv | |
import multiprocessing as mp | |
import time | |
# Ajust for number of cpus/memory | |
# Each process will sort an array of size : ratio_per_process * size_array_largest | |
ratio_per_process = 30000 | |
# Lets consolidate the Sort after some number of interations | |
num_interations = 50 | |
def lines_to_list(lines): | |
new_list = [int(x.rstrip()) for x in lines] | |
return new_list | |
def my_sort(lines, size_array_largest, queue): | |
start_time = time.time() | |
numbers = lines_to_list(lines) | |
# Now lets sort and ignore the duplicate numbers | |
numbers_sorted = sorted(set(numbers)) | |
queue.put(numbers_sorted[-size_array_largest:]) | |
elapsed_time = time.time() - start_time | |
print(elapsed_time) | |
def consolidate(lines_read, jobs, largest): | |
# The remaining lines are now added to a concentrated list | |
results = lines_to_list(lines_read) | |
print("Consolidating.. Number of jobs created: %i" % len(jobs)) | |
for job in jobs: | |
results.extend(queue.get()) | |
job.join() | |
results.extend(largest) | |
#Return only the TOP Largest values | |
return sorted(set(results))[-size_array_largest:] | |
if __name__ == '__main__': | |
if len(argv) != 3: | |
print("Usage: %s FileName Number" % __file__) | |
exit() | |
bigfile_name = argv[1] | |
size_array_largest = int(argv[2]) | |
print ("Reading file %s and identifying the largest %i numbers.." % (bigfile_name, size_array_largest)) | |
# Array to keep the list of largest numbers | |
largest = [0] * size_array_largest | |
queue = mp.Queue() | |
# Each new process goes here | |
jobs = [] | |
interation = 0 | |
lines_read = [] | |
with open(bigfile_name,"r") as bigfile: | |
# Read line per line to not load the entire file to memory | |
for line in bigfile: | |
if interation == num_interations: | |
# time to consolidate | |
largest = consolidate(lines_read, jobs, largest) | |
#Reseting variables | |
lines_read = [] | |
interation = 0 | |
jobs = [] | |
if len(lines_read) > (size_array_largest * ratio_per_process): | |
# Enough data do start processing | |
p = mp.Process(target=my_sort, args=(lines_read, size_array_largest, queue)) | |
jobs.append(p) | |
p.daemon = True | |
p.start() | |
lines_read = [] | |
interation+=1 | |
lines_read.append(line) | |
bigfile.close() | |
largest = consolidate(lines_read, jobs, largest) | |
print (largest) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment