Created
April 10, 2011 15:32
-
-
Save guyromm/912442 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from gevent import monkey; monkey.patch_socket() | |
from urllib2 import Request | |
from datetime import datetime# for time measurement | |
import unittest | |
import gevent | |
import urllib2, httplib, cookielib | |
import json | |
from config import HOST_URL, PORT | |
REQUEST_COUNT = 0 | |
SUCCESS_REQUEST_COUNT = 0 | |
GREENLETS = 600 | |
MAX_REQUESTS=5000 | |
TEST_TIMEOUT=60 | |
LONGEST_REQUEST=None | |
SHORTEST_REQUEST=None | |
END_TEST=None | |
BEGIN_TEST=None | |
class GameTester(object): | |
def __init__(self, uid, host_url, port): | |
self.uid = uid # unique id for Tester class | |
self.jar = cookielib.CookieJar() | |
handler = urllib2.HTTPCookieProcessor(self.jar) | |
self.opener = urllib2.build_opener(handler) | |
def __call__(self): | |
global REQUEST_COUNT | |
global LONGEST_REQUEST,SHORTEST_REQUEST,END_TEST | |
global SUCCESS_REQUEST_COUNT | |
if self.uid and GREENLETS/10 % self.uid ==0: print "Hi, i'm tester #%s" % self.uid | |
request_count = 0 | |
while True: # endless cycle | |
REQUEST_COUNT += 1 | |
index_url = HOST_URL + ":%i" % PORT | |
try: | |
begin = datetime.now() | |
resp = self.opener.open(index_url,timeout=30) #increased timeout! | |
end = datetime.now() | |
delta = end - begin | |
if not LONGEST_REQUEST or delta>LONGEST_REQUEST: LONGEST_REQUEST=delta | |
if not SHORTEST_REQUEST or delta<SHORTEST_REQUEST: SHORTEST_REQUEST=delta | |
#request_count += 1 | |
SUCCESS_REQUEST_COUNT += 1 | |
#print datetime.now(), "#%i: Get response after" % self.uid, delta | |
except Exception, e: | |
raise | |
#print datetime.now(), "#%i: Exception" % self.uid, e | |
#return | |
# Detect here if server is dead | |
# print "#%i: Server is dead" % self.uid | |
# return | |
#resp.close() | |
#print self.jar | |
#return "Tester #%i has done his job" % self.uid | |
remres = REQUEST_COUNT % 1000 #MAX_REQUESTS % int(REQUEST_COUNT/100) | |
#print 'checking if %s rem %s == 0 ? %s'%(MAX_REQUESTS/10,REQUEST_COUNT,remres==0) | |
if remres==0 : print 'run %s; '%REQUEST_COUNT | |
if REQUEST_COUNT >= MAX_REQUESTS: | |
if not END_TEST: END_TEST = datetime.now() | |
print '%s done %s; '%(self.uid,REQUEST_COUNT), | |
return | |
class SmartDDOSTest(unittest.TestCase): | |
def test(self): | |
"Run main test here" | |
BEGIN_TEST = datetime.now() | |
print 'Begin test on', BEGIN_TEST | |
#tester1 = GameTester(1, HOST_URL, PORT) | |
#tester1() | |
#tester2 = GameTester(2, HOST_URL, PORT) | |
jobs = [gevent.spawn(GameTester(uid, HOST_URL, PORT)) for uid in range(0,GREENLETS)] | |
# g1 = gevent.spawn(tester1) | |
# g2 = gevent.spawn(tester2) | |
gevent.joinall(jobs, timeout=TEST_TIMEOUT) | |
print '\nEnd test on %s (in %s for requests proper)'%(END_TEST,END_TEST-BEGIN_TEST) | |
print 'Success requests:', SUCCESS_REQUEST_COUNT | |
print 'All requests:', REQUEST_COUNT | |
print 'Longest request: %s'%LONGEST_REQUEST | |
print 'Shortest request: %s'%SHORTEST_REQUEST | |
return | |
def somefunct(self): | |
print 'i\'m some funct' | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment