Skip to content

Instantly share code, notes, and snippets.

@kanazux
Last active August 29, 2015 14:16
Show Gist options
  • Save kanazux/f39a7d36faa443275dbb to your computer and use it in GitHub Desktop.
Save kanazux/f39a7d36faa443275dbb to your computer and use it in GitHub Desktop.
Crawler(zinho) com bs4 request e wget
#!/usr/bin/python
# -*- coding: ISO-8859-15 -*-
#
# strsprox.py
# Autor: Silvio Giunge a.k.a kanazuchi <[email protected]>
#
import os
import re
import sys
import time
import Queue
import signal
import argparse
import requests
import threading
from bs4 import BeautifulSoup as bs
def set_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"-e", dest="error", action="store_true", default=False,
help="Show erros when this script try to get url if exixsts."
)
parser.add_argument(
"-u", dest="url", action="store",
help="Define url."
)
parser.add_argument(
"-t", dest="time", action="store", default="1",
help="Define the to run this test."
)
parser.add_argument(
"-tm", dest="th_max", action="store", default="5",
help="Define the max number of threads to load urls."
)
parser.add_argument(
"-s", dest="scrn", action="store_true", default=False,
help="Enable display output."
)
parser.add_argument(
"-p", dest="proxy", action="store_true", default=False,
help="Enable proxy connection."
)
parser.add_argument(
"-w", dest="wget", action="store_true", default=False,
help="Enable wget method."
)
parser.add_argument(
"-pip", dest="ip", action="store",
help="Enable proxy connection."
)
parser.add_argument(
"-pu", dest="user", action="store",
help="Enable proxy connection."
)
parser.add_argument(
"-pa", dest="passwd", action="store",
help="Enable proxy connection."
)
parser.add_argument(
"-pp", dest="port", action="store",
help="Enable proxy connection."
)
return parser.parse_args()
def check_url(url):
try:
if(opts.proxy):
get_url = requests.get(url, auth=auth, proxies=proxy, timeout=5)
else:
get_url = requests.get(opts.url, timeout=3)
if(re.match(r'20[0-9]', str(get_url.status_code))):
if(opts.scrn):
print >> sys.stdout, "{}\nThreads: {}\nOk!\n".format(url,threading.activeCount())
if(opts.wget):
wget = os.system(
'wget -cF -t 1 {} -o log --proxy-user={} --proxy-password={} 2>&1 > /dev/null'.format(
url,opts.user,opts.passwd))
links = filter(None,
[ u.attrs['href'] for u in bs(get_url.text).find_all('a') if u.attrs['href'] not in url_list ])
for link in links:
if(not re.match(r'^http:\/\/', link, re.IGNORECASE)):
if("/" not in link and not "." in link and link != "#"):
ret_url = os.path.join(url, link)
else:
ret_url = "http://{}".format(link)
else:
ret_url = re.sub(r"(http:\/\/https:\/\/)", "http:\/\/", link)
if(re.match(r'^(http:\/\/)(www)?.*\.[com|br|net|org|gov|it|info|co](\/.*)?', ret_url) and
ret_url not in url_list and
link not in url_tested and
"https" not in ret_url):
print >> open('urls_list','a'), ret_url
url_list.append(ret_url)
url_tested.append(link)
q_urls.put(ret_url)
except Exception, error:
if(opts.error):
print error
pass
def control_c(signal, frame):
finish_job()
def finish_job():
opts.scrn = False
opts.error = False
while(threading.activeCount() > 2):
print(" Waiting thread finish the job...\n")
print(" Threads alive: {}\n".format(threading.activeCount()))
time.sleep(1)
print("\n\n")
print("####################################")
print("")
print(" This script runs for {} minutes.".format(opts.time))
print("")
print(" {} urls was tested.".format(len(url_tested)))
print(" {} urls that pass in the test.".format(len(url_list)))
print("")
print("####################################\n\n")
os.kill(os.getpid(), 9)
def main():
start_time = time.time()
while True:
if(not q_urls.empty()):
if(threading.activeCount() > int(opts.th_max)):
time.sleep(1)
else:
start_check = threading.Thread(target=check_url, args=[q_urls.get()])
start_check.start()
else:
time.sleep(1)
if(time.time() > (start_time + (int(opts.time)*60))):
finish_job()
if(__name__ == "__main__"):
opts = set_parser()
url_list = []
url_tested = []
q_urls = Queue.Queue()
q_urls.put(opts.url)
signal.signal(signal.SIGINT, control_c)
if(opts.proxy):
proxy = { "http" : "http://{}:{}".format(opts.ip, opts.port) }
auth = requests.auth.HTTPProxyAuth(opts.user,opts.passwd)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment