Last active
August 29, 2015 14:10
-
-
Save snaga/1830666bb0be96a7611f to your computer and use it in GitHub Desktop.
EzParallel.py - A Simple Library for Parallel Processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| # EzParallel.py | |
| # | |
| # Copyright(C) 2014 Satoshi Nagayasu | |
| # Copyright(C) 2014 Uptime Technologies, LLC. | |
| import fcntl | |
| from time import strftime, localtime | |
| import os | |
| import sys | |
| import time | |
| class EzParallel: | |
| pidfilename = ".ez.pids" | |
| lockfilename = ".ez.lock" | |
| MaxProcs = 8; | |
| debug = 0 | |
| def __init__(self, maxprocs): | |
| self.MaxProcs = maxprocs | |
| def _log(self, msg): | |
| ts = strftime("%H:%M:%S", localtime()) | |
| print "[" + ts + "] [" + str(os.getpid()) + "] LOG: " + msg; | |
| def _debug(self, msg): | |
| if self.debug == 1: | |
| ts = strftime("%H:%M:%S", localtime()) | |
| print "[" + ts + "] [" + str(os.getpid()) + "] DEBUG: " + msg; | |
| def init(self): | |
| self._log("init") | |
| pidfile = open(self.pidfilename, "w") | |
| pidfile.close() | |
| # sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) | |
| def lock_pid(self): | |
| self._debug("lock_pid"); | |
| self.lockfile = open(self.lockfilename, "w") | |
| fcntl.flock(self.lockfile.fileno(), 2); | |
| def _get_num_procs(self): | |
| self._debug("_get_num_procs") | |
| pids = []; | |
| pidfile = open(self.pidfilename, "r") | |
| lines = pidfile.readlines() | |
| pidfile.close() | |
| for pid in lines: | |
| pid = pid.rstrip('\n') | |
| pids.append(int(pid)) | |
| return len(pids) | |
| def is_slot_available(self): | |
| num_procs = self._get_num_procs() | |
| if num_procs < self.MaxProcs: | |
| return 1 | |
| return 0 | |
| def add_pid(self, mypid): | |
| self._debug("add_pid " + str(mypid)) | |
| self._state() | |
| pidfile = open(self.pidfilename, "a") | |
| pidfile.write(str(mypid) + "\n") | |
| pidfile.close() | |
| self._state() | |
| self._debug("add_pid done.") | |
| def remove_pid(self, mypid): | |
| self._debug("remove_pid " + str(mypid)); | |
| self._state() | |
| pidfile = open(self.pidfilename, "r") | |
| lines = pidfile.readlines() | |
| pidfile.close() | |
| pids = []; | |
| for pid in lines: | |
| pid = pid.rstrip('\n') | |
| if int(pid) != mypid: | |
| pids.append(pid) | |
| pidfile = open(self.pidfilename, "w") | |
| for pid in pids: | |
| pidfile.write(str(pid) + "\n") | |
| pidfile.close() | |
| self._state() | |
| self._debug("remove_pid done.") | |
| def unlock_pid(self): | |
| self._debug("unlock_pid") | |
| self.lockfile.close() | |
| def _state(self): | |
| numprocs = self._get_num_procs() | |
| self._debug("num_procs = " + str(numprocs) + ", max_procs = " + str(self.MaxProcs)); | |
| def state(self): | |
| numprocs = self._get_num_procs() | |
| if numprocs > 0: | |
| self._log("num_procs = " + str(numprocs) + ", max_procs = " + str(self.MaxProcs)); | |
| def wait_all(self): | |
| self._log("wait_all") | |
| while True: | |
| self.lock_pid() | |
| numprocs = self._get_num_procs() | |
| self.unlock_pid() | |
| self._log("num_procs = " + str(numprocs) + ", max_procs = " + str(self.MaxProcs)); | |
| if numprocs == 0: | |
| break | |
| time.sleep(1) | |
| self.lockfile.close() | |
| self._log("wait_all done.") | |
| def cleanup(self): | |
| self._log("cleanup") | |
| os.unlink(self.lockfilename) | |
| os.unlink(self.pidfilename) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment