Skip to content

Instantly share code, notes, and snippets.

@zapalote
Last active April 2, 2024 11:31
Show Gist options
  • Save zapalote/30aa2d7b432a08e6a7d95e536e672494 to your computer and use it in GitHub Desktop.
Save zapalote/30aa2d7b432a08e6a7d95e536e672494 to your computer and use it in GitHub Desktop.
Example of multi-threading and memory mapped file processing.
# extraction pattern: ngram TAB year TAB match_count TAB volume_count NEWLINE
# out: unique_ngram TAB sum(match_count) NEWLINE
import re
import os, sys, mmap
from pathlib import Path
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
abv = re.compile(r'^(([A-Z]\.){1,})(_|[^\w])') # A.B.C.
word = re.compile(r'^(([a-zA-Z]){1,})(_|[^\w])') # ABC | Abc | abc
base = "googlebooks-eng-all-1gram-20120701-"
files = ['a','b','c','d','e','f','g','h','i','j',\
'k','l','m','n','o','p','q','r','s',\
't','u','v','w','x','y','z']
def process_file(file):
global base
vocab = {}
fsize = Path(base+file).stat().st_size
tot = 0
print(f"processing {base+file}")
with open(base+file, "r+b") as fp:
# use a progress bar
with tqdm(total=fsize, desc=file) as pbar:
# map the entire file into memory, normally much faster than buffered i/o
mm = mmap.mmap(fp.fileno(), 0)
# iterate over the block, until next newline
for line in iter(mm.readline, b""):
t = ''
# convert the bytes to a utf-8 string and split the fields
term = line.decode("utf-8").split("\t")
# catch patterns such as A.B.C. (old-style abbreviations)
m_abv = abv.findall(term[0])
if m_abv:
# remove punctuation
t = re.sub(r'[^\w]', '', m_abv.group(1))
else:
m_word = word.findall(term[0])
if m_word:
t = m_word.group(1)
# add it to dictionary if not yet included and add its match_count
if t in vocab:
vocab[t] += int(term[2])
else:
vocab[t] = int(term[2])
# update the progress bar
tot += len(line)
pbar.update(tot - pbar.n)
mm.close()
fp.close()
# output vocabulary and counts to csv file
outf = "gbooks-en-" + file + ".csv"
with open(outf, "a+") as fp:
for term in vocab:
fp.write(f"{term}\t{vocab[term]}\n")
fp.close()
# use as many threads as possible, default: min(32, os.cpu_count()+4)
with ThreadPoolExecutor() as executor:
result = executor.map(process_file, files)
@enricomerckaert
Copy link

you should do mm.close() or your memory will become full

@zapalote
Copy link
Author

Yes! Forgot it. Thanks

@zapalote
Copy link
Author

Thinking about it, and this always confuses me, doesn’t “with” take care of closing things?

@enricomerckaert
Copy link

I would expect the same thing but I used this for reading through a couple of MB of GPS data and my memory usage went over 8GB. It also became slower and slower but when I added the mm.close(), memory usage was just a few 100MB and performance was much better. Also the progress bar didn't work for me, in my case it got stuck in a loop.

@zapalote
Copy link
Author

Added mm.close()
Thanks Enrico.

@zapalote
Copy link
Author

Learnt that the "with" statement will release resources acquired with the "as" clause.
mm is allocated later and therefore needs to be released explicitly.

@skullknight31337
Copy link

Hi, you do not have multiple tqdm progress bar on your screen when running the threads? I find it hard for tqdm to maintain only one bar for each thread.

@zapalote
Copy link
Author

Yes, tdqm does show one bar per thread. See the movie at https://towardsdatascience.com/processing-large-data-files-with-python-multithreading-dbb916f6b58d

It works for me ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment