Skip to content

Instantly share code, notes, and snippets.

@Madhivarman
Created January 26, 2020 18:47
Show Gist options
  • Save Madhivarman/d1960db57fdbf1788a7448ab2937f6ca to your computer and use it in GitHub Desktop.
Save Madhivarman/d1960db57fdbf1788a7448ab2937f6ca to your computer and use it in GitHub Desktop.
Sample script in Multiprocessing
import time
import datetime
import os
import pandas as pd
from multiprocessing import Process, Queue, current_process
def do_something(fromprocess):
time.sleep(1)
print("Operation Ended for Process:{}, process Id:{}".format(
current_process().name, os.getpid()
))
return "msg"
def submit_job(df, list_items, q, fromprocess):
a = []
for items in list_items:
oneitemdf = df[df['MinorCategory']==items]['FilePath'].values.tolist()
oneitemdf = [x for x in oneitemdf if x.endswith('.png')]
result = do_something(fromprocess)
if result == 'error_occured':
pass
else:
a. append(result)
q.put(a)
def get_n_process(as_batches, processes, df, q):
p = []
for i in range(processes):
work = Process(target=submit_job, args=(df, as_batches[i], q, i))
p.append(work)
work.start()
as_batches = as_batches[processes:]
return p, as_batches
if __name__ == '__main__':
#program start
start = time.perf_counter()
filepath = "F:/multiview_CNN/multiviewclassification/data/processed/train.csv"
df = pd.read_csv(filepath, sep=",")
#get unique categories
minor_categories = list(df.MinorCategory.value_counts().to_dict().keys())
#shared memory
as_batches = []
start_, end_ = 0, len(minor_categories)
#queue results
q = Queue()
print("Started creating batch out of the records...")
while(start_ <= end_):
as_batches.append(minor_categories[start_:start_+10])
start_ += 10 #assing 10 categories to each process
print("Finished creating batches!")
print("Total number of Batches found:{}".format(len(as_batches)))
while(len(as_batches) > 0):
t = []
#dynamically check the lists
if len(as_batches) > 8:
n_process = 8
else:
n_process = len(as_batches)
print("For this it Requries {} Process".format(n_process))
process_obj_inlist, as_batches = get_n_process(as_batches, n_process, df, q)
for ind_process in process_obj_inlist:
ind_process.join()
with open("logs.txt", "a") as f:
f.write("\n")
f.write("Log Recording at: {timestamp}, Remaining N = {remaining} yet to be processed".format(
timestamp=datetime.datetime.now(),
remaining = len(as_batches)
))
f.close()
finish = time.perf_counter()
print("Time took to execute the program:{}".format(round(finish - start, 2)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment