Skip to content

Instantly share code, notes, and snippets.

@larytet
Last active September 25, 2017 05:44
Show Gist options
  • Save larytet/e53191b27c06f7a86f8f345d9a9da563 to your computer and use it in GitHub Desktop.
Save larytet/e53191b27c06f7a86f8f345d9a9da563 to your computer and use it in GitHub Desktop.
multiprocessing of a text file
import re
import os
import random
import multiprocessing
import threading
autogen_file = "chrome-04.07-autogen-transfromed-data"
raw_file = "chrome-04.07-raw-transfromed-data"
# 100 lines only
#autogen_file = "autogen.100"
#raw_file = "raw.100"
print("Stage 1: collecting regex")
re_list = {} # Ohad wants to set missing elements defaultdict(lambda : [])
re_autogen_str = re.escape("[AUTOGEN]")
for line in open(autogen_file, "r"):
line_escaped = re.escape(line)
# create regex which contains [0-9a-z]+" instead of every [AUTOGEN]
re_key = line_escaped.replace(re_autogen_str, "([0-9a-z]+)")
# add to the dict of regexes if there is [AUTOGEN]
if re_key != line_escaped:
random_number = random.uniform(1, 1000)
if random_number > 4: # choose 0.3% of all lines
continue
re_list[re_key] = {"line": line, "re": re_key, "matches": [], "matching_lines": []}
print("Stage 1: completed, {0} regex".format(len(re_list)))
cpus = multiprocessing.cpu_count()
lines_count = 0
raw_lines = []
for line in open(raw_file, "r"):
lines_count = lines_count + 1
raw_lines.append(line)
tasks = []
lines_in_job = lines_count/cpus
for cpu in range(cpus):
tasks.append( {"cpu":cpu, "first_line":cpu*lines_in_job, "lines":lines_in_job, "re_list":re_list})
def print_matches(re_list):
unique_matches = {}
for re_key in re_list:
re_entry = re_list[re_key]
matches = re_entry["matches"]
if len(matches) > 0:
skip_this = True
for m in matches:
skip_this = skip_this and (m in unique_matches)
unique_matches[m] = ""
if not skip_this:
print("lines={0}, matches={1}".format(re_entry["matching_lines"], re_entry["matches"]))
def job(args):
lines_count = 0
first_line = args["first_line"]
lines = args["lines"]
cpu = args["cpu"]
re_list = args["re_list"]
for line_index in range(first_line, first_line+lines):
line = raw_lines[line_index]
line_escaped = re.escape(line)
for re_key in re_list:
found_matches = re.findall(re_key, line)
if found_matches:
re_entry = re_list[re_key]
re_entry_matches = re_entry["matches"]
for match in found_matches:
re_entry_matches.append(match)
re_entry["matching_lines"].append(line)
lines_count = lines_count + 1
if lines_count % 100000 == 0:
print("Stage 2 cpu={0}: processed {1} lines".format(cpu, lines_count))
pass
print_matches(re_list)
if __name__ == '__main__':
pool_processes = multiprocessing.Pool()
print("Stage 2 running on {0} cpus".format(cpus))
pool_processes.map(job, tasks)
pool_processes.close()
pool_processes.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment