Skip to content

Instantly share code, notes, and snippets.

@jg75
Created November 26, 2018 16:23
Show Gist options
  • Save jg75/b062ee54b9a59395504f978cf4763e8e to your computer and use it in GitHub Desktop.
Save jg75/b062ee54b9a59395504f978cf4763e8e to your computer and use it in GitHub Desktop.
"""Multiprocessing and threading."""
import logging
import multiprocessing.pool
import random
import threading
import time
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(name)s:%(threadName)s:%(message)s"
)
logger = logging.getLogger("ThreadPool")
class Heartbeat(threading.Timer):
"""Execute a function at an interval until canceled."""
def beat():
"""Print."""
logger.info("♡")
def __init__(self, interval=10, function=beat, *args, **kwargs):
"""Override with default values."""
super().__init__(interval, function, *args, **kwargs)
def __enter__(self):
"""Enter context."""
self.start()
def __exit__(self, *args):
"""Exit context."""
self.cancel()
def run(self):
"""Execute timer function until .cancel() is called."""
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
def worker(amount):
"""Do the work."""
logger.info("Started")
with Heartbeat():
time.sleep(amount)
logger.info(amount)
return amount
def start_thread_pool(count):
"""Start thread pool."""
logger.info("Started")
data = [random.randint(2, 10) for i in range(count)]
pool = multiprocessing.pool.ThreadPool()
results = pool.map(worker, data)
pool.close()
pool.join()
logger.info(results)
return results
if __name__ == "__main__":
start_thread_pool(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment