Last active
November 24, 2021 07:48
-
-
Save jabb3rd/8348eb92b7eed4b4273061ae36e1a3e8 to your computer and use it in GitHub Desktop.
A multi-threaded http/socks4/socks5 proxy checker based on urllib3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import urllib3 | |
import urllib3.contrib.socks | |
import threading | |
from queue import Queue | |
import sys | |
import argparse | |
TIMEOUT = 5.0 | |
URL = 'http://icanhazip.com/' | |
number_of_threads = 200 | |
OK = None | |
quiet_mode = False | |
log = False | |
lock = threading.Lock() | |
proxy_handlers = {'socks5': urllib3.contrib.socks.SOCKSProxyManager, 'socks4': urllib3.contrib.socks.SOCKSProxyManager, 'http': urllib3.ProxyManager} | |
results = [] | |
def check_proxy(address, port): | |
result = None | |
for ph in proxy_handlers: | |
try: | |
proxy = proxy_handlers[ph]('%s://%s:%s' % (ph, address, int(port))) | |
if not quiet_mode: | |
with lock: | |
print('[*] Checking the proxy: %s://%s:%s' % (ph, address, int(port))) | |
response = proxy.request('GET', URL, timeout = TIMEOUT, headers = {'User-Agent': 'Mozilla/5.0'}, preload_content = False) | |
read_data = b'' | |
for chunk in response.stream(32): | |
read_data += chunk | |
response.release_conn() | |
data = read_data.decode('utf-8').strip('\n') | |
if (OK is not None and data == OK) or (OK is None and data == address): | |
result = '%s://%s:%s' % (ph, address, port) | |
break | |
except: | |
pass | |
return result | |
def worker(): | |
while True: | |
address, port = q.get() | |
if not quiet_mode: | |
with lock: | |
print('[*] Adding to queue: %s:%s' % (address, port)) | |
result = check_proxy(address, port) | |
if result is not None: | |
results.append(result) | |
if not quiet_mode: | |
with lock: | |
print('\033[32m[+] Check OK: %s\033[39m' % result) | |
else: | |
if not quiet_mode: | |
with lock: | |
print('[-] Check failed: %s:%s' % (address, port)) | |
q.task_done() | |
parser = argparse.ArgumentParser(description='Read <ip>:<port> pairs one per line from stdin then perform the proxy checks with the SOCKS5/SOCKS5/HTTP protocols.') | |
parser.add_argument('--log', help = 'Write the log to a specified file', required = False) | |
parser.add_argument('-u', '--url', help = 'URL to request for through the proxies, default = %s' % URL, required = False) | |
parser.add_argument('-q', '--quiet', action = 'store_true', help = 'Quiet mode', required = False) | |
parser.add_argument('-n', '--threads', type = int, help = 'Number of threads for parallel processing', required = False) | |
parser.add_argument('--timeout', help = 'Set the network I/O timeout (default = 5.0s)', type = float, required = False) | |
parser.add_argument('--ok', help = 'The string returned from the server to be treated as a good result', required = False) | |
parser.add_argument('-5', help = 'Check SOCKS5 proxies', action = 'store_true', required = False) | |
parser.add_argument('-4', help = 'Check SOCKS4 proxies', action = 'store_true', required = False) | |
parser.add_argument('--http', help = 'Check HTTP proxies', action = 'store_true', required = False) | |
args = vars(parser.parse_args()) | |
if args['5'] or args['4'] or args['http']: | |
proxy_handlers = {} | |
if args['5']: | |
proxy_handlers['socks5'] = urllib3.contrib.socks.SOCKSProxyManager | |
if args['4']: | |
proxy_handlers['socks4'] = urllib3.contrib.socks.SOCKSProxyManager | |
if args['http']: | |
proxy_handlers['http'] = urllib3.ProxyManager | |
if args['threads']: | |
number_of_threads = args['threads'] | |
if args['log']: | |
log = True | |
log_filename = args['log'] | |
log_file = open(log_filename, 'a') | |
if args['quiet']: | |
quiet_mode = True | |
if args['url']: | |
URL = args['url'] | |
if args['timeout']: | |
TIMEOUT = float(args['timeout']) | |
if args['ok']: | |
OK = args['ok'] | |
proxies = sys.stdin.readlines() | |
proxies_count = len(proxies) | |
if proxies_count < number_of_threads: | |
number_of_threads = proxies_count | |
q = Queue() | |
if not quiet_mode: | |
print('[*] Starting with %s threads' % number_of_threads) | |
for i in range(number_of_threads): | |
t = threading.Thread(target = worker) | |
t.daemon = True | |
t.start() | |
for proxy in proxies: | |
try: | |
p = proxy.strip('\n').split(':') | |
q.put(p) | |
except: | |
pass | |
if not quiet_mode: | |
print('[*] Starting the queue...') | |
q.join() | |
if not quiet_mode: | |
print('[!] Finished!') | |
if len(results) > 0: | |
if log: | |
log_file = open(log_filename, 'a') | |
print('Good results:') | |
for r in results: | |
if log: | |
log_file.write(r + '\n') | |
print(r) | |
if log: | |
log_file.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment