Created
May 29, 2017 16:52
-
-
Save Cr4sh/b93ea76a936427fbeaf9052cb698fea4 to your computer and use it in GitHub Desktop.
SOCKS5 server stress test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, os, time | |
import grequests | |
CHECK_URL = 'http://www.google.com' | |
CHECK_TIMEOUT = 120 | |
CHECK_CONNECTS = 200 | |
VERBOSE = False | |
class AsyncRequest(grequests.AsyncRequest): | |
def send(self, **kwargs): | |
t = time.time() | |
ret = super(AsyncRequest, self).send(**kwargs) | |
t = time.time() - t | |
self._stat.total += 1 | |
if self.is_ok(): | |
print('+ #%d %f' % (self._stat.total, t)) | |
self._stat.success += 1 | |
else: | |
print('! #%d %f' % (self._stat.total, t)) | |
self._stat.fail += 1 | |
return ret | |
def is_ok(self): | |
return self.response is not None and self.response.ok | |
def test(addr, port, num): | |
class Stat(object): | |
def __init__(self): | |
self.total, self.success, self.fail = 0, 0, 0 | |
def _exception_handler(r, e): | |
if VERBOSE: print('ERROR: ' + str(e)) | |
requests, stat = [], Stat() | |
# create HTTP requests | |
for i in range(0, num): | |
proxy = '%s:%d' % (addr, port) | |
r = AsyncRequest('HEAD', CHECK_URL, | |
timeout = CHECK_TIMEOUT, | |
proxies = { 'http': 'socks5://' + proxy, | |
'https': 'socks5://' + proxy }) | |
r._stat = stat | |
requests.append(r) | |
# run requests | |
results = grequests.map(requests, size = CHECK_CONNECTS, exception_handler = _exception_handler) | |
return stat.total, stat.success, stat.fail | |
def main(): | |
print(test(sys.argv[1], int(sys.argv[2]), int(sys.argv[3]))) | |
return 0 | |
if __name__ == '__main__': | |
exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment