Last active
February 2, 2018 12:46
-
-
Save kanazux/3d7129d70a115bbf82346efc7ee7d641 to your computer and use it in GitHub Desktop.
Teste proxy squid com wget e lib requests em python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python2.7 | |
# -*- coding: utf-8 -*- | |
# | |
# test_proxy.py | |
# Autor: Silvio Giunge a.k.a kanazuchi <[email protected]> | |
# Script to test proxy navigation | |
# Requeriments: lib requests, bs4 and colorama | |
# To use wget needs only install, wgetrc don't need changes | |
# | |
import os | |
import re | |
import time | |
import Queue | |
import signal | |
import requests | |
import threading | |
from colorama import Fore | |
from subprocess import call | |
from scrp_parse import set_parser | |
from bs4 import BeautifulSoup as bs | |
def kill_wget(t): | |
while True: | |
time.sleep(t) | |
call(['pkill -9 -af wget'], shell=True) | |
def control_c(signal, frame): | |
call(['pkill -9 -af wget'], shell=True) | |
os.kill(os.getpid(), 9) | |
def check_url(url): | |
try: | |
if(opts.proxy): | |
get_url = requests.get(url, auth=auth, proxies=proxy, timeout=15) | |
else: | |
get_url = requests.get(opts.url, timeout=3) | |
if(re.match(r'20[0-9]', str(get_url.status_code))): | |
if(opts.scrn): | |
print Fore.GREEN + "Site: {}\nUsuario: {}\n".format(url, user) + Fore.RESET | |
if(opts.wget): | |
_ = call([ | |
'wget --delete-after -bcrqF -l 2 -t 1 {} -o wget.log --proxy-user={} --proxy-password={}'.format( | |
re.sub(r"[\"|\']", '', url), user, passwd)], shell=True, stdout=-1, stderr=-1) | |
links = set(filter(None, [u.attrs['href'] for u in bs(get_url.text, "html.parser").find_all('a') | |
if 'href' in u.attrs if re.match(r'^https?:\/\/', u.attrs['href'], re.IGNORECASE) | |
if u.attrs['href'] not in url_list])) | |
for link in links: | |
if(re.match(r'^https?:\/\/', link, re.IGNORECASE)): | |
if(link not in url_list): | |
url_list.append(link) | |
q_urls.put(link) | |
else: | |
if(opts.scrn): | |
print Fore.RED + "Status code: {}\tSite: {}\nUsuario: {}\n".format(str(get_url.status_code), url, user) + Fore.RESET | |
except Exception, error: | |
if(opts.error): | |
print Fore.YELLOW + "Site: {}\nUsuario: {}\n{}\n".format(url, user, error) + Fore.RESET | |
pass | |
def main(): | |
start_time = time.time() | |
signal.signal(signal.SIGINT, control_c) | |
s_kill = threading.Thread(target=kill_wget, args=[60]) | |
s_kill.start() | |
while True: | |
if(not q_urls.empty()): | |
if(threading.activeCount() > int(opts.th_max)): | |
time.sleep(10) | |
else: | |
s_start_check = threading.Thread(target=check_url, args=[q_urls.get()]) | |
s_start_check.start() | |
else: | |
time.sleep(10) | |
if(time.time() > (start_time + (int(opts.time)*60))): | |
os.kill(os.getpid(), 9) | |
if(__name__ == "__main__"): | |
opts = set_parser() | |
url_list = [] | |
q_urls = Queue.Queue() | |
q_urls.put(opts.url) | |
if(opts.proxy): | |
user = opts.user | |
passwd = opts.passwd | |
auth = requests.auth.HTTPProxyAuth(user, passwd) | |
proxy = {"http": "http://{}:{}".format(opts.ip, opts.port)} | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python2.7 | |
# -*- coding: utf-8 -*- | |
# | |
# strsprox.py | |
# Autor: kanazuchi <[email protected]> | |
# | |
import argparse | |
def set_parser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-e", dest="error", action="store_true", default=False, | |
help="Show erros when this script try to get url if exixsts." | |
) | |
parser.add_argument( | |
"-u", dest="url", action="store", | |
help="Define url." | |
) | |
parser.add_argument( | |
"-t", dest="time", action="store", default="1", | |
help="Define the to run this test." | |
) | |
parser.add_argument( | |
"-tm", dest="th_max", action="store", default="5", | |
help="Define the max number of threads to load urls." | |
) | |
parser.add_argument( | |
"-s", dest="scrn", action="store_true", default=False, | |
help="Enable display output." | |
) | |
parser.add_argument( | |
"-p", dest="proxy", action="store_true", default=False, | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-w", dest="wget", action="store_true", default=False, | |
help="Enable wget method." | |
) | |
parser.add_argument( | |
"-pip", dest="ip", action="store", | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-pu", dest="user", action="store", | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-pa", dest="passwd", action="store", | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-pp", dest="port", action="store", | |
help="Enable proxy connection." | |
) | |
return parser.parse_args() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment