Last active
September 25, 2017 05:44
-
-
Save larytet/e53191b27c06f7a86f8f345d9a9da563 to your computer and use it in GitHub Desktop.
multiprocessing of a text file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
import random | |
import multiprocessing | |
import threading | |
autogen_file = "chrome-04.07-autogen-transfromed-data" | |
raw_file = "chrome-04.07-raw-transfromed-data" | |
# 100 lines only | |
#autogen_file = "autogen.100" | |
#raw_file = "raw.100" | |
print("Stage 1: collecting regex") | |
re_list = {} # Ohad wants to set missing elements defaultdict(lambda : []) | |
re_autogen_str = re.escape("[AUTOGEN]") | |
for line in open(autogen_file, "r"): | |
line_escaped = re.escape(line) | |
# create regex which contains [0-9a-z]+" instead of every [AUTOGEN] | |
re_key = line_escaped.replace(re_autogen_str, "([0-9a-z]+)") | |
# add to the dict of regexes if there is [AUTOGEN] | |
if re_key != line_escaped: | |
random_number = random.uniform(1, 1000) | |
if random_number > 4: # choose 0.3% of all lines | |
continue | |
re_list[re_key] = {"line": line, "re": re_key, "matches": [], "matching_lines": []} | |
print("Stage 1: completed, {0} regex".format(len(re_list))) | |
cpus = multiprocessing.cpu_count() | |
lines_count = 0 | |
raw_lines = [] | |
for line in open(raw_file, "r"): | |
lines_count = lines_count + 1 | |
raw_lines.append(line) | |
tasks = [] | |
lines_in_job = lines_count/cpus | |
for cpu in range(cpus): | |
tasks.append( {"cpu":cpu, "first_line":cpu*lines_in_job, "lines":lines_in_job, "re_list":re_list}) | |
def print_matches(re_list): | |
unique_matches = {} | |
for re_key in re_list: | |
re_entry = re_list[re_key] | |
matches = re_entry["matches"] | |
if len(matches) > 0: | |
skip_this = True | |
for m in matches: | |
skip_this = skip_this and (m in unique_matches) | |
unique_matches[m] = "" | |
if not skip_this: | |
print("lines={0}, matches={1}".format(re_entry["matching_lines"], re_entry["matches"])) | |
def job(args): | |
lines_count = 0 | |
first_line = args["first_line"] | |
lines = args["lines"] | |
cpu = args["cpu"] | |
re_list = args["re_list"] | |
for line_index in range(first_line, first_line+lines): | |
line = raw_lines[line_index] | |
line_escaped = re.escape(line) | |
for re_key in re_list: | |
found_matches = re.findall(re_key, line) | |
if found_matches: | |
re_entry = re_list[re_key] | |
re_entry_matches = re_entry["matches"] | |
for match in found_matches: | |
re_entry_matches.append(match) | |
re_entry["matching_lines"].append(line) | |
lines_count = lines_count + 1 | |
if lines_count % 100000 == 0: | |
print("Stage 2 cpu={0}: processed {1} lines".format(cpu, lines_count)) | |
pass | |
print_matches(re_list) | |
if __name__ == '__main__': | |
pool_processes = multiprocessing.Pool() | |
print("Stage 2 running on {0} cpus".format(cpus)) | |
pool_processes.map(job, tasks) | |
pool_processes.close() | |
pool_processes.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment