|
"""Subprocess IPC communication test |
|
|
|
Determine the throughput and bandwidth of standard Python |
|
subprocess interprocess communication. |
|
|
|
""" |
|
|
|
import os |
|
import sys |
|
import time |
|
import pstats |
|
import logging |
|
import cProfile |
|
import argparse |
|
import subprocess |
|
|
|
logging.basicConfig(format="%(name)s: %(message)s") |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("-o", "--output") |
|
parser.add_argument("-i", "--iterations", default=100, type=int) |
|
parser.add_argument("-b", "--bytes", default=1000, type=int) |
|
parser.add_argument("-s", "--stats", action="store_true", |
|
help="Print additional stats") |
|
parser.add_argument("-p", "--plot", action="store_true") |
|
parser.add_argument("-q", "--quiet", action="store_true") |
|
opt = parser.parse_args() |
|
|
|
if not os.getenv("CHILD"): |
|
# Parent Process |
|
# |
|
# This block boots up another Python interpreter |
|
# so as to send messages back and forth. |
|
# ______________ |
|
# | | |
|
# | | |
|
# | PARENT | |
|
# | | |
|
# |______________| |
|
# |
|
log = logging.getLogger("<-- PY") |
|
log.setLevel(logging.INFO) |
|
|
|
log.info("Booting child..") |
|
popen = subprocess.Popen( |
|
|
|
# Open yourself, but run the below block |
|
[sys.executable, __file__], |
|
|
|
env=dict(os.environ, **{"CHILD": "1"}), |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
bufsize=-1, |
|
universal_newlines=True, |
|
) |
|
|
|
# Wait for child to get ready.. |
|
response = popen.stdout.readline() |
|
assert response == "Ready\n", "Was: '%s'" % response |
|
|
|
# Data the size of opt.bytes |
|
data = "0".zfill(opt.bytes) |
|
bytes_ = sys.getsizeof(data) |
|
log.info("Created data {:,} bytes (len {})".format( |
|
bytes_, len(data) |
|
)) |
|
|
|
write_per_iteration = list() |
|
rtrip_per_iteration = list() # roundtrip |
|
child_wait_times = list() |
|
times = list() |
|
|
|
if opt.stats: |
|
profile = cProfile.Profile() |
|
profile.enable() |
|
|
|
tot0 = time.clock() |
|
for i in range(opt.iterations): |
|
|
|
# Write |
|
write0 = time.clock() |
|
popen.stdin.write(data) |
|
popen.stdin.write("%d\n" % i) |
|
popen.stdin.flush() |
|
write1 = time.clock() |
|
|
|
# Read |
|
read0 = time.clock() |
|
response = popen.stdout.readline() |
|
read1 = time.clock() |
|
|
|
write_per_iteration.append((write1 - write0) * 1000) |
|
rtrip_per_iteration.append((read1 - read0) * 1000) |
|
times.append(read1) |
|
|
|
index, wait = response.rsplit("-", 1) |
|
response = int(index) |
|
wait = float(wait) |
|
child_wait_times.append(wait) |
|
assert response == i, "Was %d, expected %d" % (response, i) |
|
|
|
if not opt.quiet and (opt.iterations > 1000 and i % 100 == 0): |
|
sys.stderr.write("\r%d/%d.." % (i, opt.iterations)) |
|
if not opt.quiet: |
|
sys.stderr.write("\r%d/%d..\n" % (opt.iterations, opt.iterations)) |
|
|
|
if opt.stats: |
|
profile.disable() |
|
stats = pstats.Stats(profile).sort_stats("time") |
|
stats.print_stats(20) |
|
|
|
tot1 = time.clock() |
|
totdur = tot1 - tot0 # s |
|
|
|
# Send kill signal |
|
popen.kill() |
|
popen.wait() |
|
|
|
def plot(): |
|
import pygal |
|
fname = os.path.join(os.getcwd(), "plot.svg") |
|
log.info("Plotting to %s.." % fname) |
|
assert rtrip_per_iteration |
|
assert write_per_iteration |
|
assert child_wait_times |
|
|
|
plot = pygal.Line(width=2000, height=500) |
|
plot.title = "Time per iteration" |
|
plot.add("Roundtrip (ms)", rtrip_per_iteration, show_dots=False) |
|
plot.add("Write to child (ms)", write_per_iteration, show_dots=False) |
|
plot.add("Child wait (ms)", child_wait_times, show_dots=False) |
|
# plot.x_labels = times |
|
plot.render_to_file(fname) |
|
|
|
if opt.plot: |
|
try: |
|
plot() |
|
except ImportError: |
|
log.info("Plotting skipped, could not find pygal") |
|
|
|
iterations = opt.iterations |
|
bpi = bytes_ * 2 |
|
totb = opt.iterations * bpi # in + out |
|
bps = totb / (totdur) |
|
avgtime = (sum(rtrip_per_iteration) / len(rtrip_per_iteration)) |
|
mintime = min(rtrip_per_iteration) |
|
maxtime = max(rtrip_per_iteration) |
|
deltime = maxtime - mintime |
|
|
|
results = ( |
|
("iterations", opt.iterations), |
|
("bpi", bytes_ * 2), |
|
("bps", bps), |
|
("avgtime", avgtime), |
|
("mintime", mintime), |
|
("maxtime", maxtime), |
|
("deltime", deltime), |
|
("totb", totb), |
|
("totdur", totdur), |
|
) |
|
|
|
if opt.output: |
|
import json |
|
with open(opt.output, "w") as f: |
|
log.info("Writing results to '%s'" % opt.output) |
|
json.dump(dict(results), f, indent=2, sort_keys=True) |
|
|
|
print("""\ |
|
Iterations: {iterations:,} |
|
Bytes/iteration: {bpi:,} b/i |
|
Bytes/second: {bps:,.0f} b/s |
|
Avarage roundtrip time: {avgtime:.3f} ms |
|
Min roundtrip time: {mintime:.3f} ms |
|
Max roundtrip time: {maxtime:.3f} ms |
|
Delta roundtrip time: {deltime:.3f} ms |
|
Total bytes: {totb:,} b |
|
Total time: {totdur:.3f} s |
|
""".format(**locals())) |
|
|
|
else: |
|
# Child Process |
|
# |
|
# This block represents the additional Python interpreter which |
|
# is receiving messages via sys.stdin from the above parent. |
|
# _____________ |
|
# | | |
|
# | | |
|
# | CHILD | |
|
# | | |
|
# |_____________| |
|
# |
|
log = logging.getLogger("--> MO") |
|
log.setLevel(logging.INFO) |
|
|
|
log.info("Ready for action..") |
|
sys.stdout.write("Ready\n") |
|
sys.stdout.flush() |
|
|
|
t0 = None |
|
while True: |
|
line = sys.stdin.readline().rstrip() |
|
|
|
# Wait time |
|
t1 = time.clock() |
|
duration = "-%.6f\n" % ((t1 - (t0 or t1)) * 1000) |
|
sys.stdout.write(line) |
|
sys.stdout.write(duration) |
|
sys.stdout.flush() |
|
t0 = time.clock() |
|
|
|
log.info("Dying..") |