Skip to content

Instantly share code, notes, and snippets.

@SCP002
Last active April 26, 2021 13:19
Show Gist options
  • Save SCP002/ce8645810152606753df3cd5abd41534 to your computer and use it in GitHub Desktop.
Save SCP002/ce8645810152606753df3cd5abd41534 to your computer and use it in GitHub Desktop.
Python: concurrent Filter funtion example. Using Worker Pool.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import concurrent.futures
import datetime
import os
import sys
import time
import traceback
from concurrent.futures import Future
from typing import Any, Callable, List, Optional
class App:
# Define keep function type alias for convenience.
KeepFn = Callable[[int, Any], bool]
@staticmethod
def concurrent_filter(inp: List[Any], keep_fn: KeepFn, max_pool_size: int) -> List[Any]:
# Define task function.
def task(idx: int, val: Any) -> Optional[Any]:
# TODO: Printing status can be safely removed, used only for demonstration purposes.
# Using sys.stdout.write() here in favour of print() as using print() is not thread safe.
sys.stdout.write('Started ' + val + '\n')
keep: bool = keep_fn(idx, val)
sys.stdout.write('Done ' + val + '\n')
return val if keep else None
futures: List[Future[Any]] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_pool_size) as pool:
for i, v in enumerate(inp):
# Submit tasks.
future: Future[Optional[Any]] = pool.submit(task, i, v)
# Collect futures.
futures.append(future)
# Collect results.
out: List[str] = []
for fut in futures:
result: Any = fut.result()
if result is not None:
out.append(result)
return out
@staticmethod
def main() -> None:
inp: List[str] = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
def keep_fn(idx: int, val: Any) -> bool:
# Sleep for 2 seconds to simulate heavy task.
time.sleep(2)
# Keep every even element.
return idx % 2 != 0
start: datetime = datetime.datetime.now()
out: List[str] = App.concurrent_filter(inp, keep_fn, 5)
end: datetime = datetime.datetime.now()
print('Filtering took', (end - start).total_seconds(), 's')
print('Result is:', out)
input('Press <Enter> to exit...\n')
# Main start point.
if __name__ == '__main__':
# noinspection PyBroadException
try:
os.chdir(sys.path[0])
App.main()
except Exception:
traceback.print_exc()
input('Press <Enter> to exit...\n')
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment