Last active
August 29, 2015 14:16
-
-
Save kanazux/f39a7d36faa443275dbb to your computer and use it in GitHub Desktop.
Crawler(zinho) com bs4 request e wget
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: ISO-8859-15 -*- | |
# | |
# strsprox.py | |
# Autor: Silvio Giunge a.k.a kanazuchi <[email protected]> | |
# | |
import os | |
import re | |
import sys | |
import time | |
import Queue | |
import signal | |
import argparse | |
import requests | |
import threading | |
from bs4 import BeautifulSoup as bs | |
def set_parser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-e", dest="error", action="store_true", default=False, | |
help="Show erros when this script try to get url if exixsts." | |
) | |
parser.add_argument( | |
"-u", dest="url", action="store", | |
help="Define url." | |
) | |
parser.add_argument( | |
"-t", dest="time", action="store", default="1", | |
help="Define the to run this test." | |
) | |
parser.add_argument( | |
"-tm", dest="th_max", action="store", default="5", | |
help="Define the max number of threads to load urls." | |
) | |
parser.add_argument( | |
"-s", dest="scrn", action="store_true", default=False, | |
help="Enable display output." | |
) | |
parser.add_argument( | |
"-p", dest="proxy", action="store_true", default=False, | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-w", dest="wget", action="store_true", default=False, | |
help="Enable wget method." | |
) | |
parser.add_argument( | |
"-pip", dest="ip", action="store", | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-pu", dest="user", action="store", | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-pa", dest="passwd", action="store", | |
help="Enable proxy connection." | |
) | |
parser.add_argument( | |
"-pp", dest="port", action="store", | |
help="Enable proxy connection." | |
) | |
return parser.parse_args() | |
def check_url(url): | |
try: | |
if(opts.proxy): | |
get_url = requests.get(url, auth=auth, proxies=proxy, timeout=5) | |
else: | |
get_url = requests.get(opts.url, timeout=3) | |
if(re.match(r'20[0-9]', str(get_url.status_code))): | |
if(opts.scrn): | |
print >> sys.stdout, "{}\nThreads: {}\nOk!\n".format(url,threading.activeCount()) | |
if(opts.wget): | |
wget = os.system( | |
'wget -cF -t 1 {} -o log --proxy-user={} --proxy-password={} 2>&1 > /dev/null'.format( | |
url,opts.user,opts.passwd)) | |
links = filter(None, | |
[ u.attrs['href'] for u in bs(get_url.text).find_all('a') if u.attrs['href'] not in url_list ]) | |
for link in links: | |
if(not re.match(r'^http:\/\/', link, re.IGNORECASE)): | |
if("/" not in link and not "." in link and link != "#"): | |
ret_url = os.path.join(url, link) | |
else: | |
ret_url = "http://{}".format(link) | |
else: | |
ret_url = re.sub(r"(http:\/\/https:\/\/)", "http:\/\/", link) | |
if(re.match(r'^(http:\/\/)(www)?.*\.[com|br|net|org|gov|it|info|co](\/.*)?', ret_url) and | |
ret_url not in url_list and | |
link not in url_tested and | |
"https" not in ret_url): | |
print >> open('urls_list','a'), ret_url | |
url_list.append(ret_url) | |
url_tested.append(link) | |
q_urls.put(ret_url) | |
except Exception, error: | |
if(opts.error): | |
print error | |
pass | |
def control_c(signal, frame): | |
finish_job() | |
def finish_job(): | |
opts.scrn = False | |
opts.error = False | |
while(threading.activeCount() > 2): | |
print(" Waiting thread finish the job...\n") | |
print(" Threads alive: {}\n".format(threading.activeCount())) | |
time.sleep(1) | |
print("\n\n") | |
print("####################################") | |
print("") | |
print(" This script runs for {} minutes.".format(opts.time)) | |
print("") | |
print(" {} urls was tested.".format(len(url_tested))) | |
print(" {} urls that pass in the test.".format(len(url_list))) | |
print("") | |
print("####################################\n\n") | |
os.kill(os.getpid(), 9) | |
def main(): | |
start_time = time.time() | |
while True: | |
if(not q_urls.empty()): | |
if(threading.activeCount() > int(opts.th_max)): | |
time.sleep(1) | |
else: | |
start_check = threading.Thread(target=check_url, args=[q_urls.get()]) | |
start_check.start() | |
else: | |
time.sleep(1) | |
if(time.time() > (start_time + (int(opts.time)*60))): | |
finish_job() | |
if(__name__ == "__main__"): | |
opts = set_parser() | |
url_list = [] | |
url_tested = [] | |
q_urls = Queue.Queue() | |
q_urls.put(opts.url) | |
signal.signal(signal.SIGINT, control_c) | |
if(opts.proxy): | |
proxy = { "http" : "http://{}:{}".format(opts.ip, opts.port) } | |
auth = requests.auth.HTTPProxyAuth(opts.user,opts.passwd) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment