Skip to content

Instantly share code, notes, and snippets.

@snaga
Last active August 29, 2015 14:10
Show Gist options
  • Select an option

  • Save snaga/1830666bb0be96a7611f to your computer and use it in GitHub Desktop.

Select an option

Save snaga/1830666bb0be96a7611f to your computer and use it in GitHub Desktop.
EzParallel.py - A Simple Library for Parallel Processing
# -*- coding: utf-8 -*-
# EzParallel.py
#
# Copyright(C) 2014 Satoshi Nagayasu
# Copyright(C) 2014 Uptime Technologies, LLC.
import fcntl
from time import strftime, localtime
import os
import sys
import time
class EzParallel:
pidfilename = ".ez.pids"
lockfilename = ".ez.lock"
MaxProcs = 8;
debug = 0
def __init__(self, maxprocs):
self.MaxProcs = maxprocs
def _log(self, msg):
ts = strftime("%H:%M:%S", localtime())
print "[" + ts + "] [" + str(os.getpid()) + "] LOG: " + msg;
def _debug(self, msg):
if self.debug == 1:
ts = strftime("%H:%M:%S", localtime())
print "[" + ts + "] [" + str(os.getpid()) + "] DEBUG: " + msg;
def init(self):
self._log("init")
pidfile = open(self.pidfilename, "w")
pidfile.close()
# sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
def lock_pid(self):
self._debug("lock_pid");
self.lockfile = open(self.lockfilename, "w")
fcntl.flock(self.lockfile.fileno(), 2);
def _get_num_procs(self):
self._debug("_get_num_procs")
pids = [];
pidfile = open(self.pidfilename, "r")
lines = pidfile.readlines()
pidfile.close()
for pid in lines:
pid = pid.rstrip('\n')
pids.append(int(pid))
return len(pids)
def is_slot_available(self):
num_procs = self._get_num_procs()
if num_procs < self.MaxProcs:
return 1
return 0
def add_pid(self, mypid):
self._debug("add_pid " + str(mypid))
self._state()
pidfile = open(self.pidfilename, "a")
pidfile.write(str(mypid) + "\n")
pidfile.close()
self._state()
self._debug("add_pid done.")
def remove_pid(self, mypid):
self._debug("remove_pid " + str(mypid));
self._state()
pidfile = open(self.pidfilename, "r")
lines = pidfile.readlines()
pidfile.close()
pids = [];
for pid in lines:
pid = pid.rstrip('\n')
if int(pid) != mypid:
pids.append(pid)
pidfile = open(self.pidfilename, "w")
for pid in pids:
pidfile.write(str(pid) + "\n")
pidfile.close()
self._state()
self._debug("remove_pid done.")
def unlock_pid(self):
self._debug("unlock_pid")
self.lockfile.close()
def _state(self):
numprocs = self._get_num_procs()
self._debug("num_procs = " + str(numprocs) + ", max_procs = " + str(self.MaxProcs));
def state(self):
numprocs = self._get_num_procs()
if numprocs > 0:
self._log("num_procs = " + str(numprocs) + ", max_procs = " + str(self.MaxProcs));
def wait_all(self):
self._log("wait_all")
while True:
self.lock_pid()
numprocs = self._get_num_procs()
self.unlock_pid()
self._log("num_procs = " + str(numprocs) + ", max_procs = " + str(self.MaxProcs));
if numprocs == 0:
break
time.sleep(1)
self.lockfile.close()
self._log("wait_all done.")
def cleanup(self):
self._log("cleanup")
os.unlink(self.lockfilename)
os.unlink(self.pidfilename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment