Last active
April 2, 2024 11:31
-
-
Save zapalote/30aa2d7b432a08e6a7d95e536e672494 to your computer and use it in GitHub Desktop.
Example of multi-threading and memory mapped file processing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# extraction pattern: ngram TAB year TAB match_count TAB volume_count NEWLINE | |
# out: unique_ngram TAB sum(match_count) NEWLINE | |
import re | |
import os, sys, mmap | |
from pathlib import Path | |
from tqdm import tqdm | |
from concurrent.futures import ThreadPoolExecutor | |
abv = re.compile(r'^(([A-Z]\.){1,})(_|[^\w])') # A.B.C. | |
word = re.compile(r'^(([a-zA-Z]){1,})(_|[^\w])') # ABC | Abc | abc | |
base = "googlebooks-eng-all-1gram-20120701-" | |
files = ['a','b','c','d','e','f','g','h','i','j',\ | |
'k','l','m','n','o','p','q','r','s',\ | |
't','u','v','w','x','y','z'] | |
def process_file(file): | |
global base | |
vocab = {} | |
fsize = Path(base+file).stat().st_size | |
tot = 0 | |
print(f"processing {base+file}") | |
with open(base+file, "r+b") as fp: | |
# use a progress bar | |
with tqdm(total=fsize, desc=file) as pbar: | |
# map the entire file into memory, normally much faster than buffered i/o | |
mm = mmap.mmap(fp.fileno(), 0) | |
# iterate over the block, until next newline | |
for line in iter(mm.readline, b""): | |
t = '' | |
# convert the bytes to a utf-8 string and split the fields | |
term = line.decode("utf-8").split("\t") | |
# catch patterns such as A.B.C. (old-style abbreviations) | |
m_abv = abv.findall(term[0]) | |
if m_abv: | |
# remove punctuation | |
t = re.sub(r'[^\w]', '', m_abv.group(1)) | |
else: | |
m_word = word.findall(term[0]) | |
if m_word: | |
t = m_word.group(1) | |
# add it to dictionary if not yet included and add its match_count | |
if t in vocab: | |
vocab[t] += int(term[2]) | |
else: | |
vocab[t] = int(term[2]) | |
# update the progress bar | |
tot += len(line) | |
pbar.update(tot - pbar.n) | |
mm.close() | |
fp.close() | |
# output vocabulary and counts to csv file | |
outf = "gbooks-en-" + file + ".csv" | |
with open(outf, "a+") as fp: | |
for term in vocab: | |
fp.write(f"{term}\t{vocab[term]}\n") | |
fp.close() | |
# use as many threads as possible, default: min(32, os.cpu_count()+4) | |
with ThreadPoolExecutor() as executor: | |
result = executor.map(process_file, files) |
Hi, you do not have multiple tqdm progress bar on your screen when running the threads? I find it hard for tqdm to maintain only one bar for each thread.
Yes, tdqm does show one bar per thread. See the movie at https://towardsdatascience.com/processing-large-data-files-with-python-multithreading-dbb916f6b58d
It works for me ...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Learnt that the "with" statement will release resources acquired with the "as" clause.
mm is allocated later and therefore needs to be released explicitly.