Last active
April 26, 2021 13:19
-
-
Save SCP002/ce8645810152606753df3cd5abd41534 to your computer and use it in GitHub Desktop.
Python: concurrent Filter funtion example. Using Worker Pool.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
import concurrent.futures | |
import datetime | |
import os | |
import sys | |
import time | |
import traceback | |
from concurrent.futures import Future | |
from typing import Any, Callable, List, Optional | |
class App: | |
# Define keep function type alias for convenience. | |
KeepFn = Callable[[int, Any], bool] | |
@staticmethod | |
def concurrent_filter(inp: List[Any], keep_fn: KeepFn, max_pool_size: int) -> List[Any]: | |
# Define task function. | |
def task(idx: int, val: Any) -> Optional[Any]: | |
# TODO: Printing status can be safely removed, used only for demonstration purposes. | |
# Using sys.stdout.write() here in favour of print() as using print() is not thread safe. | |
sys.stdout.write('Started ' + val + '\n') | |
keep: bool = keep_fn(idx, val) | |
sys.stdout.write('Done ' + val + '\n') | |
return val if keep else None | |
futures: List[Future[Any]] = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_pool_size) as pool: | |
for i, v in enumerate(inp): | |
# Submit tasks. | |
future: Future[Optional[Any]] = pool.submit(task, i, v) | |
# Collect futures. | |
futures.append(future) | |
# Collect results. | |
out: List[str] = [] | |
for fut in futures: | |
result: Any = fut.result() | |
if result is not None: | |
out.append(result) | |
return out | |
@staticmethod | |
def main() -> None: | |
inp: List[str] = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'] | |
def keep_fn(idx: int, val: Any) -> bool: | |
# Sleep for 2 seconds to simulate heavy task. | |
time.sleep(2) | |
# Keep every even element. | |
return idx % 2 != 0 | |
start: datetime = datetime.datetime.now() | |
out: List[str] = App.concurrent_filter(inp, keep_fn, 5) | |
end: datetime = datetime.datetime.now() | |
print('Filtering took', (end - start).total_seconds(), 's') | |
print('Result is:', out) | |
input('Press <Enter> to exit...\n') | |
# Main start point. | |
if __name__ == '__main__': | |
# noinspection PyBroadException | |
try: | |
os.chdir(sys.path[0]) | |
App.main() | |
except Exception: | |
traceback.print_exc() | |
input('Press <Enter> to exit...\n') | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment