Skip to content

Instantly share code, notes, and snippets.

@Cr4sh
Created May 29, 2017 16:52
Show Gist options
  • Save Cr4sh/b93ea76a936427fbeaf9052cb698fea4 to your computer and use it in GitHub Desktop.
Save Cr4sh/b93ea76a936427fbeaf9052cb698fea4 to your computer and use it in GitHub Desktop.
SOCKS5 server stress test
import sys, os, time
import grequests
CHECK_URL = 'http://www.google.com'
CHECK_TIMEOUT = 120
CHECK_CONNECTS = 200
VERBOSE = False
class AsyncRequest(grequests.AsyncRequest):
def send(self, **kwargs):
t = time.time()
ret = super(AsyncRequest, self).send(**kwargs)
t = time.time() - t
self._stat.total += 1
if self.is_ok():
print('+ #%d %f' % (self._stat.total, t))
self._stat.success += 1
else:
print('! #%d %f' % (self._stat.total, t))
self._stat.fail += 1
return ret
def is_ok(self):
return self.response is not None and self.response.ok
def test(addr, port, num):
class Stat(object):
def __init__(self):
self.total, self.success, self.fail = 0, 0, 0
def _exception_handler(r, e):
if VERBOSE: print('ERROR: ' + str(e))
requests, stat = [], Stat()
# create HTTP requests
for i in range(0, num):
proxy = '%s:%d' % (addr, port)
r = AsyncRequest('HEAD', CHECK_URL,
timeout = CHECK_TIMEOUT,
proxies = { 'http': 'socks5://' + proxy,
'https': 'socks5://' + proxy })
r._stat = stat
requests.append(r)
# run requests
results = grequests.map(requests, size = CHECK_CONNECTS, exception_handler = _exception_handler)
return stat.total, stat.success, stat.fail
def main():
print(test(sys.argv[1], int(sys.argv[2]), int(sys.argv[3])))
return 0
if __name__ == '__main__':
exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment