Skip to content

Instantly share code, notes, and snippets.

@tappoz
Created December 10, 2019 10:59
Show Gist options
  • Save tappoz/6e1c691f0c1224cc0256222b99ba60d8 to your computer and use it in GitHub Desktop.
Save tappoz/6e1c691f0c1224cc0256222b99ba60d8 to your computer and use it in GitHub Desktop.
A python script useful for I/O intensive scenarios

Python parallel threads

This pattern is useful for I/O intensive tasks (e.g. HTTP requests and file system interactions).

import concurrent.futures
import logging
import random
import sys
import time

def do_some_work(int_id, sleep_amount):
    logging.info(" >>> Starting to work for ID %d (sleeping amount: %f)" % (int_id, sleep_amount))
    time.sleep(sleep_amount)
    logging.info(" >>> Finished working for ID %d" % int_id)
    return (int_id, sleep_amount)

def parallel_work(worker_function, input_params_tuples_list):
    logging.info("Starting the parallel execution of the jobs...")
    with concurrent.futures.ThreadPoolExecutor(number_of_jobs) as tpe:
        jobs = []
        results_done = []
        for curr_params_tuple in input_params_tuples_list:
            jobs.append(tpe.submit(worker_function, *curr_params_tuple))
        logging.info("Done triggering the jobs exections...")
        for job in concurrent.futures.as_completed(jobs):
            result_done = job.result()
            results_done.append(result_done)
        logging.info("Done retrieving all the jobs results...")
        for idx, result in enumerate(results_done):
            logging.info("Iteration {}: the job with ID {} has this sleep amount: {}".format(idx, result[0], result[1]))
    logging.info("Done with the parallel execution of the jobs...")


if __name__ == "__main__":
    # logging configuration
    fmt = '%(asctime)s %(threadName)s %(funcName)s'
    fmt += ' %(levelname)s %(message)s'
    logging.basicConfig(
        # filename='',
        format=fmt,
        datefmt='%H:%M:%S',
        level=logging.DEBUG,
        stream=sys.stdout
    )
    # number of concurrent jobs
    number_of_jobs = 10
    # prepare the tuples representing the input parameters of each parallel worker
    params_tuples_list = []
    for curr_int_id in range(number_of_jobs):
        # if we don't add the offset here,
        # then the first job (ID 0) will sleep 0,
        # so it may finish even before all the threads (jobs) are instantiated
        # as it can be seen from the logs
        offset_sleep = random.random()
        total_sleep = random.random() * curr_int_id # + offset_sleep
        params_tuples_list.append((curr_int_id, total_sleep))
    # ------------------------------------------------------------------------ #
    # do some parallel work
    parallel_work(do_some_work, params_tuples_list)
    # ------------------------------------------------------------------------ #

Sample execution

$ python parallel_threads.py 
10:57:01 MainThread parallel_work INFO Starting the parallel execution of the jobs...
10:57:01 ThreadPoolExecutor-0_0 do_some_work INFO  >>> Starting to work for ID 0 (sleeping amount: 0.000000)
10:57:01 ThreadPoolExecutor-0_0 do_some_work INFO  >>> Finished working for ID 0
10:57:01 ThreadPoolExecutor-0_1 do_some_work INFO  >>> Starting to work for ID 1 (sleeping amount: 0.450596)
10:57:01 ThreadPoolExecutor-0_2 do_some_work INFO  >>> Starting to work for ID 2 (sleeping amount: 1.188063)
10:57:01 ThreadPoolExecutor-0_0 do_some_work INFO  >>> Starting to work for ID 3 (sleeping amount: 1.665251)
10:57:01 ThreadPoolExecutor-0_3 do_some_work INFO  >>> Starting to work for ID 4 (sleeping amount: 3.964050)
10:57:01 ThreadPoolExecutor-0_4 do_some_work INFO  >>> Starting to work for ID 5 (sleeping amount: 4.651413)
10:57:01 ThreadPoolExecutor-0_5 do_some_work INFO  >>> Starting to work for ID 6 (sleeping amount: 3.059467)
10:57:01 ThreadPoolExecutor-0_6 do_some_work INFO  >>> Starting to work for ID 7 (sleeping amount: 5.711722)
10:57:01 ThreadPoolExecutor-0_7 do_some_work INFO  >>> Starting to work for ID 8 (sleeping amount: 4.451390)
10:57:01 ThreadPoolExecutor-0_8 do_some_work INFO  >>> Starting to work for ID 9 (sleeping amount: 2.638906)
10:57:01 MainThread parallel_work INFO Done triggering the jobs exections...
10:57:01 ThreadPoolExecutor-0_1 do_some_work INFO  >>> Finished working for ID 1
10:57:02 ThreadPoolExecutor-0_2 do_some_work INFO  >>> Finished working for ID 2
10:57:02 ThreadPoolExecutor-0_0 do_some_work INFO  >>> Finished working for ID 3
10:57:03 ThreadPoolExecutor-0_8 do_some_work INFO  >>> Finished working for ID 9
10:57:04 ThreadPoolExecutor-0_5 do_some_work INFO  >>> Finished working for ID 6
10:57:05 ThreadPoolExecutor-0_3 do_some_work INFO  >>> Finished working for ID 4
10:57:05 ThreadPoolExecutor-0_7 do_some_work INFO  >>> Finished working for ID 8
10:57:05 ThreadPoolExecutor-0_4 do_some_work INFO  >>> Finished working for ID 5
10:57:06 ThreadPoolExecutor-0_6 do_some_work INFO  >>> Finished working for ID 7
10:57:06 MainThread parallel_work INFO Done retrieving all the jobs results...
10:57:06 MainThread parallel_work INFO Iteration 0: the job with ID 0 has this sleep amount: 0.0
10:57:06 MainThread parallel_work INFO Iteration 1: the job with ID 1 has this sleep amount: 0.45059579771809166
10:57:06 MainThread parallel_work INFO Iteration 2: the job with ID 2 has this sleep amount: 1.1880628170799388
10:57:06 MainThread parallel_work INFO Iteration 3: the job with ID 3 has this sleep amount: 1.6652511555820662
10:57:06 MainThread parallel_work INFO Iteration 4: the job with ID 9 has this sleep amount: 2.6389059370413874
10:57:06 MainThread parallel_work INFO Iteration 5: the job with ID 6 has this sleep amount: 3.0594672438487898
10:57:06 MainThread parallel_work INFO Iteration 6: the job with ID 4 has this sleep amount: 3.9640496717546148
10:57:06 MainThread parallel_work INFO Iteration 7: the job with ID 8 has this sleep amount: 4.451390001020335
10:57:06 MainThread parallel_work INFO Iteration 8: the job with ID 5 has this sleep amount: 4.651413175719882
10:57:06 MainThread parallel_work INFO Iteration 9: the job with ID 7 has this sleep amount: 5.711721789081359
10:57:06 MainThread parallel_work INFO Done with the parallel execution of the jobs...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment