"""Subprocess IPC communication test |
Determine the throughput and bandwidth of standard Python |
subprocess interprocess communication. |
""" |
import os |
import sys |
import time |
import pstats |
import logging |
import cProfile |
import argparse |
import subprocess |
logging.basicConfig(format="%(name)s: %(message)s") |
parser = argparse.ArgumentParser() |
parser.add_argument("-o", "--output") |
parser.add_argument("-i", "--iterations", default=100, type=int) |
parser.add_argument("-b", "--bytes", default=1000, type=int) |
parser.add_argument("-s", "--stats", action="store_true", |
help="Print additional stats") |
parser.add_argument("-p", "--plot", action="store_true") |
parser.add_argument("-q", "--quiet", action="store_true") |
opt = parser.parse_args() |
if not os.getenv("CHILD"): |
# Parent Process |
# |
# This block boots up another Python interpreter |
# so as to send messages back and forth. |
# ______________ |
# | | |
# | | |
# | PARENT | |
# | | |
# |______________| |
# |
log = logging.getLogger("<-- PY") |
log.setLevel(logging.INFO) |
log.info("Booting child..") |
popen = subprocess.Popen( |
# Open yourself, but run the below block |
[sys.executable, __file__], |
env=dict(os.environ, **{"CHILD": "1"}), |
stdin=subprocess.PIPE, |
stdout=subprocess.PIPE, |
bufsize=-1, |
universal_newlines=True, |
) |
# Wait for child to get ready.. |
response = popen.stdout.readline() |
assert response == "Ready\n", "Was: '%s'" % response |
# Data the size of opt.bytes |
data = "0".zfill(opt.bytes) |
bytes_ = sys.getsizeof(data) |
log.info("Created data {:,} bytes (len {})".format( |
bytes_, len(data) |
)) |
write_per_iteration = list() |
rtrip_per_iteration = list() # roundtrip |
child_wait_times = list() |
times = list() |
if opt.stats: |
profile = cProfile.Profile() |
profile.enable() |
tot0 = time.clock() |
for i in range(opt.iterations): |
# Write |
write0 = time.clock() |
popen.stdin.write(data) |
popen.stdin.write("%d\n" % i) |
popen.stdin.flush() |
write1 = time.clock() |
# Read |
read0 = time.clock() |
response = popen.stdout.readline() |
read1 = time.clock() |
write_per_iteration.append((write1 - write0) * 1000) |
rtrip_per_iteration.append((read1 - read0) * 1000) |
times.append(read1) |
index, wait = response.rsplit("-", 1) |
response = int(index) |
wait = float(wait) |
child_wait_times.append(wait) |
assert response == i, "Was %d, expected %d" % (response, i) |
if not opt.quiet and (opt.iterations > 1000 and i % 100 == 0): |
sys.stderr.write("\r%d/%d.." % (i, opt.iterations)) |
if not opt.quiet: |
sys.stderr.write("\r%d/%d..\n" % (opt.iterations, opt.iterations)) |
if opt.stats: |
profile.disable() |
stats = pstats.Stats(profile).sort_stats("time") |
stats.print_stats(20) |
tot1 = time.clock() |
totdur = tot1 - tot0 # s |
# Send kill signal |
popen.kill() |
popen.wait() |
def plot(): |
import pygal |
fname = os.path.join(os.getcwd(), "plot.svg") |
log.info("Plotting to %s.." % fname) |
assert rtrip_per_iteration |
assert write_per_iteration |
assert child_wait_times |
plot = pygal.Line(width=2000, height=500) |
plot.title = "Time per iteration" |
plot.add("Roundtrip (ms)", rtrip_per_iteration, show_dots=False) |
plot.add("Write to child (ms)", write_per_iteration, show_dots=False) |
plot.add("Child wait (ms)", child_wait_times, show_dots=False) |
# plot.x_labels = times |
plot.render_to_file(fname) |
if opt.plot: |
try: |
plot() |
except ImportError: |
log.info("Plotting skipped, could not find pygal") |
iterations = opt.iterations |
bpi = bytes_ * 2 |
totb = opt.iterations * bpi # in + out |
bps = totb / (totdur) |
avgtime = (sum(rtrip_per_iteration) / len(rtrip_per_iteration)) |
mintime = min(rtrip_per_iteration) |
maxtime = max(rtrip_per_iteration) |
deltime = maxtime - mintime |
results = ( |
("iterations", opt.iterations), |
("bpi", bytes_ * 2), |
("bps", bps), |
("avgtime", avgtime), |
("mintime", mintime), |
("maxtime", maxtime), |
("deltime", deltime), |
("totb", totb), |
("totdur", totdur), |
) |
if opt.output: |
import json |
with open(opt.output, "w") as f: |
log.info("Writing results to '%s'" % opt.output) |
json.dump(dict(results), f, indent=2, sort_keys=True) |
print("""\ |
Iterations: {iterations:,} |
Bytes/iteration: {bpi:,} b/i |
Bytes/second: {bps:,.0f} b/s |
Avarage roundtrip time: {avgtime:.3f} ms |
Min roundtrip time: {mintime:.3f} ms |
Max roundtrip time: {maxtime:.3f} ms |
Delta roundtrip time: {deltime:.3f} ms |
Total bytes: {totb:,} b |
Total time: {totdur:.3f} s |
""".format(**locals())) |
else: |
# Child Process |
# |
# This block represents the additional Python interpreter which |
# is receiving messages via sys.stdin from the above parent. |
# _____________ |
# | | |
# | | |
# | CHILD | |
# | | |
# |_____________| |
# |
log = logging.getLogger("--> MO") |
log.setLevel(logging.INFO) |
log.info("Ready for action..") |
sys.stdout.write("Ready\n") |
sys.stdout.flush() |
t0 = None |
while True: |
line = sys.stdin.readline().rstrip() |
# Wait time |
t1 = time.clock() |
duration = "-%.6f\n" % ((t1 - (t0 or t1)) * 1000) |
sys.stdout.write(line) |
sys.stdout.write(duration) |
sys.stdout.flush() |
t0 = time.clock() |
log.info("Dying..") |