Last active
June 22, 2019 20:14
-
-
Save 8enmann/31f15cc1616baf7c6acc117745871408 to your computer and use it in GitHub Desktop.
Benchmark reading files with threading and multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Read files using threading and multiprocessing. | |
Execute on https://coderpad.io/sandbox | |
""" | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from multiprocessing.dummy import Pool | |
import multiprocessing | |
from collections import Counter | |
import glob | |
import time | |
import os | |
from functools import reduce | |
from typing import List, Iterable | |
import numpy as np | |
class Timer: | |
"""Simple class to print elapsed time in its context.""" | |
def __enter__(self): | |
self.t = time.time() | |
def __exit__(self, *args): | |
print(f'{time.time() - self.t:.4f}s elapsed') | |
def header(text): | |
"""Print a section header.""" | |
print('='*10, text, '='*10) | |
def counts_from_file(filename: str, sleep_seconds=0): | |
"""Compute counts of whitespace separated tokens in a file. | |
If the file can't be opened or read, return empty Counter. | |
""" | |
time.sleep(sleep_seconds) | |
try: | |
with open(filename) as f: | |
text = f.read() | |
except (UnicodeDecodeError, PermissionError): | |
# Skip binary files and files without permission | |
return Counter() | |
return Counter(text.split()) | |
# Get a list of non directory files. | |
files = list(filter(lambda x: not os.path.isdir(x), glob.glob('/etc/**/*'))) | |
print('found', len(files), 'files') | |
def print_sizes(files: List[str], first: int = 10): | |
"""Print the largest files and their sizes in bytes.""" | |
sizes = map(os.path.getsize, files) | |
print(sorted(zip(files, sizes), key=lambda x: -x[1])[:10]) | |
# Test on the first file. | |
print(counts_from_file(files[0]).most_common(10)) | |
header('single thread') | |
with Timer(): | |
c = Counter() | |
for f in files: | |
c.update(counts_from_file(f)) | |
print(c.most_common(10)) | |
header('thread pool') | |
with Timer(): | |
c.clear() | |
with ThreadPoolExecutor() as e: | |
futures = [e.submit(counts_from_file, f) for f in files] | |
#c = reduce(lambda a,b: a + b.result(), as_completed(futures), Counter()) | |
for fut in as_completed(futures): | |
c.update(fut.result()) | |
print(c.most_common(10)) | |
header('map reduce thread pool') | |
with Timer(): | |
c.clear() | |
with Pool() as p: | |
for result in p.imap(counts_from_file, files): | |
c.update(result) | |
#c = reduce(lambda a,b: a + b, p.imap(counts_from_file, files)) | |
print(c.most_common(10)) | |
header('map reduce proc pool') | |
with Timer(): | |
c.clear() | |
with multiprocessing.Pool() as p: | |
# c = reduce(lambda a,b: a + b, p.imap(counts_from_file, files)) | |
for result in p.imap(counts_from_file, files): | |
c.update(result) | |
print(c.most_common(10)) | |
print(f'vocab size {len(c)}, total tokens {sum(c.values())}') | |
# Insert extra markers | |
EOF = '<<EOF>>' | |
UNK = '<<UNK>>' | |
c.update([EOF, UNK]) | |
# Make a lookup table for our vocab | |
vocab = dict(zip(c, range(len(c)))) | |
reverse = dict([(v,k) for k,v in vocab.items()]) | |
EOF_EMB = vocab[EOF] | |
UNK_EMB = vocab[UNK] | |
import json | |
# Write vocab to disk so we can reuse it. | |
with open('vocab.json', 'w') as f: | |
f.write(json.dumps(vocab)) | |
def encode(filename: str, sleep_seconds=0) -> Iterable[int]: | |
"""Embed the files according to the vocab. | |
If the file can't be opened or read, return None. | |
If the word isn't in the vocab, replace with UNK. | |
Append EOF to the end of the file. | |
""" | |
time.sleep(sleep_seconds) | |
try: | |
with open(filename) as f: | |
text = f.read() | |
except (UnicodeDecodeError, PermissionError): | |
# Skip binary files and files without permission | |
return None | |
return [vocab.get(x, UNK_EMB) for x in text.split()] + [EOF_EMB] | |
def decode(arr: Iterable[int]) -> List[str]: | |
return ' '.join([reverse[x] for x in arr]).split(EOF) | |
print('\n' * 3) | |
header('encode') | |
with Timer(): | |
res = list(filter(lambda x: x, map(encode, files))) | |
combined = np.concatenate(res) | |
print(len(res), len(combined)) | |
np.savez('data.npz', combined) | |
header('thread pool') | |
with Timer(): | |
with ThreadPoolExecutor() as e: | |
res = list(filter(lambda x: x, map(lambda x: x.result(), [e.submit(encode, f) for f in files]))) | |
combined = np.concatenate(res) | |
print(len(res), len(combined)) | |
# Try decoding. | |
with Timer(): | |
out = decode(combined) | |
print(len(out), out[0][:1000]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment