Skip to content

Instantly share code, notes, and snippets.

@maatthc
Last active June 16, 2017 10:25
Show Gist options
  • Save maatthc/d1b3cba0ac135b3fb41d3c3d31314428 to your computer and use it in GitHub Desktop.
Save maatthc/d1b3cba0ac135b3fb41d3c3d31314428 to your computer and use it in GitHub Desktop.
Read numbers from a big file and present the N largest numbers
#!/usr/bin/python
from sys import argv
import multiprocessing as mp
import time
# Ajust for number of cpus/memory
# Each process will sort an array of size : ratio_per_process * size_array_largest
ratio_per_process = 30000
# Lets consolidate the Sort after some number of interations
num_interations = 50
def lines_to_list(lines):
new_list = [int(x.rstrip()) for x in lines]
return new_list
def my_sort(lines, size_array_largest, queue):
start_time = time.time()
numbers = lines_to_list(lines)
# Now lets sort and ignore the duplicate numbers
numbers_sorted = sorted(set(numbers))
queue.put(numbers_sorted[-size_array_largest:])
elapsed_time = time.time() - start_time
print(elapsed_time)
def consolidate(lines_read, jobs, largest):
# The remaining lines are now added to a concentrated list
results = lines_to_list(lines_read)
print("Consolidating.. Number of jobs created: %i" % len(jobs))
for job in jobs:
results.extend(queue.get())
job.join()
results.extend(largest)
#Return only the TOP Largest values
return sorted(set(results))[-size_array_largest:]
if __name__ == '__main__':
if len(argv) != 3:
print("Usage: %s FileName Number" % __file__)
exit()
bigfile_name = argv[1]
size_array_largest = int(argv[2])
print ("Reading file %s and identifying the largest %i numbers.." % (bigfile_name, size_array_largest))
# Array to keep the list of largest numbers
largest = [0] * size_array_largest
queue = mp.Queue()
# Each new process goes here
jobs = []
interation = 0
lines_read = []
with open(bigfile_name,"r") as bigfile:
# Read line per line to not load the entire file to memory
for line in bigfile:
if interation == num_interations:
# time to consolidate
largest = consolidate(lines_read, jobs, largest)
#Reseting variables
lines_read = []
interation = 0
jobs = []
if len(lines_read) > (size_array_largest * ratio_per_process):
# Enough data do start processing
p = mp.Process(target=my_sort, args=(lines_read, size_array_largest, queue))
jobs.append(p)
p.daemon = True
p.start()
lines_read = []
interation+=1
lines_read.append(line)
bigfile.close()
largest = consolidate(lines_read, jobs, largest)
print (largest)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment