Last active
August 29, 2015 14:01
-
-
Save glenbot/e825ce14f06dc198926e to your computer and use it in GitHub Desktop.
Stress test websites
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Stress test a web site | |
Usage: | |
stress_test_replay [options] | |
Options: | |
--host=<host> The host to connect to [default: http://www.example.com]. | |
-n --num_workers=<num> The number of gevent workers to run [default: 2]. | |
-q --quiet Disable mass delisionary screen output | |
--help Show this screen. | |
""" | |
from gevent.monkey import patch_all | |
# patch the standard library | |
patch_all() | |
import sys | |
import time | |
import os | |
import curses | |
import gevent | |
import grequests | |
from docopt import docopt | |
worker_responses = {} | |
# A key/value of the worker id and a True/False | |
# value if it has finished reading data | |
finished_workers = {} | |
# A key/value of the worker id and total time it took to | |
# receive and process each message | |
worker_times = {} | |
# Default number of gevent workers to run | |
num_workers = 10 | |
def print_statuses(_window): | |
"""Print counts of clients receiving data via curses | |
:param _window: a curses window object | |
""" | |
# get the window size | |
_, screen_x = _window.getmaxyx() | |
while True: | |
finished = len(finished_workers.keys()) | |
# break out if all workers have finished | |
if num_workers == finished: | |
break | |
# the first line of the window displays how many workers finished | |
_window.addstr(0, 0, 'Workers finished: {}'.format(finished)) | |
max_value_size = '{{:>{}}}'.format(4) | |
# write out each worker id and the status codes | |
x, y = 0, 1 | |
for k, v in sorted(worker_responses.iteritems()): | |
worker_number = str(k).zfill(len(str(num_workers))) | |
item_length = len(worker_number) + 12 | |
if (x + item_length) > screen_x: | |
y += 1 | |
x = 0 | |
_window.addstr(y, x, '({}) -> {}'.format(worker_number, max_value_size.format(v))) | |
x += item_length | |
_window.refresh() | |
gevent.sleep(0.1) | |
def worker(worker_number, host): | |
"""Worker that connects to a host and checks response | |
:param worker_number: the id of the worker | |
:param host: the host to test | |
""" | |
start_time = time.time() | |
worker_responses[worker_number] = 0 | |
request = grequests.get(host) | |
response = request.send() | |
worker_responses[worker_number] = response.status_code | |
finished_workers[worker_number] = True | |
elapse_time = time.time() - start_time | |
worker_times[worker_number] = elapse_time | |
def run(num_workers, host, use_curses): | |
"""Run X workers to simulate many clients requesting all at once | |
:param num_workers: the number of workers to run | |
:param use_curses: to use or not to use curses | |
""" | |
jobs = [] | |
window = None | |
# create the jobs | |
for i in range(0, num_workers): | |
jobs.append(gevent.spawn(worker, i, host)) | |
# append the screen curses screen print out | |
if use_curses: | |
window = curses.initscr() | |
jobs.append(gevent.spawn(print_statuses, window)) | |
try: | |
start_time = time.time() | |
gevent.joinall(jobs) | |
except KeyboardInterrupt: | |
if use_curses: | |
curses.endwin() | |
if use_curses: | |
curses.endwin() | |
total_time = 0.00 | |
for _, value in worker_times.iteritems(): | |
total_time += value | |
print 'Workers ran in average of {} seconds'.format(total_time / num_workers) | |
print 'Total process time ran in {} seconds'.format(time.time() - start_time) | |
if __name__ == '__main__': | |
options = docopt(__doc__) | |
try: | |
num_workers = int(options['--num_workers']) | |
except ValueError: | |
pass | |
run(num_workers, options['--host'], not options['--quiet']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment