This pattern is useful for I/O intensive tasks (e.g. HTTP requests and file system interactions).
import concurrent.futures
import logging
import random
import sys
import time
def do_some_work(int_id, sleep_amount):
logging.info(" >>> Starting to work for ID %d (sleeping amount: %f)" % (int_id, sleep_amount))
time.sleep(sleep_amount)
logging.info(" >>> Finished working for ID %d" % int_id)
return (int_id, sleep_amount)
def parallel_work(worker_function, input_params_tuples_list):
logging.info("Starting the parallel execution of the jobs...")
with concurrent.futures.ThreadPoolExecutor(number_of_jobs) as tpe:
jobs = []
results_done = []
for curr_params_tuple in input_params_tuples_list:
jobs.append(tpe.submit(worker_function, *curr_params_tuple))
logging.info("Done triggering the jobs exections...")
for job in concurrent.futures.as_completed(jobs):
result_done = job.result()
results_done.append(result_done)
logging.info("Done retrieving all the jobs results...")
for idx, result in enumerate(results_done):
logging.info("Iteration {}: the job with ID {} has this sleep amount: {}".format(idx, result[0], result[1]))
logging.info("Done with the parallel execution of the jobs...")
if __name__ == "__main__":
# logging configuration
fmt = '%(asctime)s %(threadName)s %(funcName)s'
fmt += ' %(levelname)s %(message)s'
logging.basicConfig(
# filename='',
format=fmt,
datefmt='%H:%M:%S',
level=logging.DEBUG,
stream=sys.stdout
)
# number of concurrent jobs
number_of_jobs = 10
# prepare the tuples representing the input parameters of each parallel worker
params_tuples_list = []
for curr_int_id in range(number_of_jobs):
# if we don't add the offset here,
# then the first job (ID 0) will sleep 0,
# so it may finish even before all the threads (jobs) are instantiated
# as it can be seen from the logs
offset_sleep = random.random()
total_sleep = random.random() * curr_int_id # + offset_sleep
params_tuples_list.append((curr_int_id, total_sleep))
# ------------------------------------------------------------------------ #
# do some parallel work
parallel_work(do_some_work, params_tuples_list)
# ------------------------------------------------------------------------ #
$ python parallel_threads.py
10:57:01 MainThread parallel_work INFO Starting the parallel execution of the jobs...
10:57:01 ThreadPoolExecutor-0_0 do_some_work INFO >>> Starting to work for ID 0 (sleeping amount: 0.000000)
10:57:01 ThreadPoolExecutor-0_0 do_some_work INFO >>> Finished working for ID 0
10:57:01 ThreadPoolExecutor-0_1 do_some_work INFO >>> Starting to work for ID 1 (sleeping amount: 0.450596)
10:57:01 ThreadPoolExecutor-0_2 do_some_work INFO >>> Starting to work for ID 2 (sleeping amount: 1.188063)
10:57:01 ThreadPoolExecutor-0_0 do_some_work INFO >>> Starting to work for ID 3 (sleeping amount: 1.665251)
10:57:01 ThreadPoolExecutor-0_3 do_some_work INFO >>> Starting to work for ID 4 (sleeping amount: 3.964050)
10:57:01 ThreadPoolExecutor-0_4 do_some_work INFO >>> Starting to work for ID 5 (sleeping amount: 4.651413)
10:57:01 ThreadPoolExecutor-0_5 do_some_work INFO >>> Starting to work for ID 6 (sleeping amount: 3.059467)
10:57:01 ThreadPoolExecutor-0_6 do_some_work INFO >>> Starting to work for ID 7 (sleeping amount: 5.711722)
10:57:01 ThreadPoolExecutor-0_7 do_some_work INFO >>> Starting to work for ID 8 (sleeping amount: 4.451390)
10:57:01 ThreadPoolExecutor-0_8 do_some_work INFO >>> Starting to work for ID 9 (sleeping amount: 2.638906)
10:57:01 MainThread parallel_work INFO Done triggering the jobs exections...
10:57:01 ThreadPoolExecutor-0_1 do_some_work INFO >>> Finished working for ID 1
10:57:02 ThreadPoolExecutor-0_2 do_some_work INFO >>> Finished working for ID 2
10:57:02 ThreadPoolExecutor-0_0 do_some_work INFO >>> Finished working for ID 3
10:57:03 ThreadPoolExecutor-0_8 do_some_work INFO >>> Finished working for ID 9
10:57:04 ThreadPoolExecutor-0_5 do_some_work INFO >>> Finished working for ID 6
10:57:05 ThreadPoolExecutor-0_3 do_some_work INFO >>> Finished working for ID 4
10:57:05 ThreadPoolExecutor-0_7 do_some_work INFO >>> Finished working for ID 8
10:57:05 ThreadPoolExecutor-0_4 do_some_work INFO >>> Finished working for ID 5
10:57:06 ThreadPoolExecutor-0_6 do_some_work INFO >>> Finished working for ID 7
10:57:06 MainThread parallel_work INFO Done retrieving all the jobs results...
10:57:06 MainThread parallel_work INFO Iteration 0: the job with ID 0 has this sleep amount: 0.0
10:57:06 MainThread parallel_work INFO Iteration 1: the job with ID 1 has this sleep amount: 0.45059579771809166
10:57:06 MainThread parallel_work INFO Iteration 2: the job with ID 2 has this sleep amount: 1.1880628170799388
10:57:06 MainThread parallel_work INFO Iteration 3: the job with ID 3 has this sleep amount: 1.6652511555820662
10:57:06 MainThread parallel_work INFO Iteration 4: the job with ID 9 has this sleep amount: 2.6389059370413874
10:57:06 MainThread parallel_work INFO Iteration 5: the job with ID 6 has this sleep amount: 3.0594672438487898
10:57:06 MainThread parallel_work INFO Iteration 6: the job with ID 4 has this sleep amount: 3.9640496717546148
10:57:06 MainThread parallel_work INFO Iteration 7: the job with ID 8 has this sleep amount: 4.451390001020335
10:57:06 MainThread parallel_work INFO Iteration 8: the job with ID 5 has this sleep amount: 4.651413175719882
10:57:06 MainThread parallel_work INFO Iteration 9: the job with ID 7 has this sleep amount: 5.711721789081359
10:57:06 MainThread parallel_work INFO Done with the parallel execution of the jobs...