Skip to content

Instantly share code, notes, and snippets.

@AoJ
Created August 4, 2020 07:30
Show Gist options
  • Save AoJ/cb11d1d71a32a27d82d961a3fde18dc1 to your computer and use it in GitHub Desktop.
Save AoJ/cb11d1d71a32a27d82d961a3fde18dc1 to your computer and use it in GitHub Desktop.
Python parallel threads
#!/usr/bin/env python3
"""Summary
Attributes:
format_logger (str): Description
logger (TYPE): Description
"""
import os
import sys
from threading import Timer, Thread
from datetime import datetime
from random import randint
from multiprocessing import Process
import subprocess
from time import sleep
import atexit
import logging
import multiprocessing
from multiprocessing import JoinableQueue, SimpleQueue
from typing import NamedTuple, List
format_logger = '%(asctime)s.%(msecs)d %(levelname)s [%(filename)s:%(lineno)d] %(message)s'
logging.basicConfig(format=format_logger,
datefmt='%Y-%m-%d:%H:%M:%S',
level=logging.INFO)
logger = logging.getLogger(__name__)
class Command(NamedTuple):
"""immutable class
"""
args: List[str]
timeout: int = 60
exit_code: int = -1
stdout: List[str] = []
stderr: List[str] = []
spend: int = -1
def check_version():
"""Summary
Raises:
Exception: Description
"""
if sys.version_info < (3,6,0):
raise Exception("Must be using Python 3.6 at least.")
#
# HELPER FUNCTIONS TO RUN SYSTEM COMMANDS
# ==========================================================
#
def _command_worker(queue: JoinableQueue, results: SimpleQueue):
"""Consume command one by one from queue and run it
Args:
queue (JoinableQueue): Description
results (SimpleQueue): Description
queue (JoinableQueue)
"""
try:
while True:
results.put(_os_run(queue.get()))
queue.task_done()
sleep(0.005)
except KeyboardInterrupt:
pass
def _os_run(command: Command, env={}, timeout=60):
"""Run and control command
Args:
command (Command): Description
env (dict, optional): Description
timeout (int, optional): Description
command (List[str])
env (dict, optional)
timeout (int, optional)
Returns:
TYPE: Description
"""
proc_env = os.environ.copy()
proc_env.update(env)
exit_code = None
stdout = []
stderr = []
uid = "%i" % randint(100000, 999999)
logger.debug("Executing #{}: {}".format(uid, " ".join(command.args)))
try:
proc = subprocess.Popen(command.args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=proc_env,
shell=False
)
# Can't run process
# catch errors like No such file or directory
except OSError as e:
proc = None
exit_code = -1
stdout.append("%s" % e.child_traceback)
stderr.append("%s" % e)
# wait to process exit
# send sigterm if timeout occured
# wait another 5 seconds and send sigkill
spend = 0
terminating = False
killed = False
if proc:
try:
while proc.poll() is None:
sleep(.1)
spend += .1
if killed == False:
if spend >= timeout and terminating == False:
proc.terminate()
terminating = True
if spend + 5 >= timeout and terminating == True:
logger.debug(msg="Force killing after timeout #{}: {}".format(
uid, " ".join(command)))
proc.kill()
killed = True
except KeyboardInterrupt:
proc.terminate()
sleep(0.1)
proc.kill()
killed = True
# read whole buffer at once
stdout += proc.stdout.read().decode().strip().split("\n")
stderr += filter(None, proc.stderr.read().decode().split("\n"))
# remove empty item if no out
if len(stdout) and stdout[-1] == '':
stdout.pop()
if len(stderr) and stderr[-1] == '':
stderr.pop()
exit_code = proc.returncode
if killed:
stderr.append("App timeouted and killed")
exit_code = 250
elif terminating:
stderr.append("App timeouted")
exit_code = 251
if exit_code > 0:
logger.error('Command #{} failed with exit code {}: {}'
.format(uid, exit_code, " ".join(stderr)))
else:
logger.info("Command #{} done in {:.1f}s and output {} lines"
.format(uid, spend, len(stdout)))
return command._replace(
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
spend=spend)
def run_commands(commands: List[Command], parallel=10):
"""Run multiple commands in parallel
Args:
commands (List[Command]): Description
parallel (int, optional): Description
commands (List[command])
parallel (int, optional)
Returns:
TYPE: Description
"""
input = multiprocessing.JoinableQueue()
output = multiprocessing.SimpleQueue()
try:
procs = []
for i in range(min(parallel, len(commands))):
proc = Process(
target=_command_worker,
name='Thread-{}'.format(i),
args=(input,output))
proc.start()
procs.append(proc)
for command in commands:
input.put(command)
# wait for queue is empty
input.join()
except KeyboardInterrupt:
log('^C received, shutting down server')
sleep(1) # wait for proc shutdown gracefully, TODO
finally:
for proc in procs:
proc.terminate()
proc.join()
proc.close()
results = []
while not output.empty():
results.append(output.get())
input.close()
return results
def main():
"""Summary
"""
results = run_commands([
Command(["ssh", "server.tld", "cat", "/etc/passwd"]),
Command(["ssh", "server.tld", "echo -n", ""]),
Command(["ssh", "server.tld", "echo", "3"]),
Command(["ssh", "server.tld", "echo", "4"]),
Command(["ssh", "server.tld", "echo", "5"])
])
#print(results)
if __name__ == '__main__':
check_version()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment