Created
June 22, 2019 21:03
-
-
Save 8enmann/5ee05981d42b5ef8e1f0abcc2868f8da to your computer and use it in GitHub Desktop.
Calculate ngrams as fast as possible
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Calculate N-gram counts as fast as possible for a large encoded file.""" | |
import numpy as np | |
from collections import deque | |
import multiprocessing as mp | |
from typing import Any, Iterable, Generator | |
import time | |
from collections import Counter | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
from functools import partial | |
data = np.random.randint(0, 10, size=(int(2e6))) | |
print(data.shape) | |
class Timer: | |
def __init__(self, title=None): | |
self.title = title | |
def __enter__(self): | |
if self.title: | |
print('<'*10, self.title) | |
self.start = time.time() | |
def __exit__(self, *args): | |
print('>'*10, f'{time.time() - self.start:4f}s') | |
def window(iterable: Iterable, n: int) -> Generator: | |
d = deque(maxlen=n) | |
it = iter(iterable) | |
# Fill deque | |
for _ in range(n): | |
d.append(next(it)) | |
# Return first chunk | |
yield tuple(d) | |
for i in it: | |
d.append(i) | |
yield tuple(d) | |
# Gut check | |
print(list(window(range(10), 3))) | |
def count(data, window_size: int = 5): | |
return Counter(window(data, window_size)) | |
# Single threaded | |
with Timer('single threaded'): | |
counts = count(data, 3) | |
print(counts.most_common(2)) | |
def use_pool(pool, data, window_size: int = 3): | |
with Timer(type(pool)): | |
futures = [pool.submit(partial(count, window_size=window_size), x) for x in np.array_split(data, mp.cpu_count())] | |
c = Counter() | |
for f in futures: | |
c.update(f.result()) | |
print(c.most_common(2)) | |
class DummyFuture: | |
def __init__(self, result): | |
self._result = result | |
def result(self): | |
return self._result | |
class DummyPool: | |
def submit(self, fn, iterable): | |
return DummyFuture(fn(iterable)) | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
pass | |
for pool_class in (ThreadPoolExecutor, ProcessPoolExecutor, DummyPool): | |
with pool_class() as p: | |
use_pool(p, data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment