Created
April 12, 2021 23:50
-
-
Save emadflash/bbf42c7930f215508a2c57a34d89575b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import re | |
import threading | |
import urllib.request | |
from urllib.error import HTTPError | |
from urllib.parse import urldefrag, urljoin, urlparse | |
from urllib.request import Request, urlopen | |
import requests | |
from bs4 import BeautifulSoup | |
all_mails = [ | |
"[email protected]" | |
] | |
_allowed_file_ext_for_soup_ = [ | |
"html", | |
"js", | |
] | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.3'} | |
def check_fragment(soup, fragment) -> bool: | |
if fragment: | |
return True if soup.select(f'#{fragment}') else False | |
else: | |
return True | |
def is_mailto_link(link) -> bool: | |
return True if re.search(r"^mailto:*", link) else False | |
def return_mail_address(link) -> str: | |
return link.split(":")[1] | |
def worker(parent_url, url): | |
try: | |
req = urllib.request.Request(url, headers=headers) | |
response = urlopen(req) | |
except HTTPError as e: | |
print(f"({e.code} error) {parent_url}, {url}") | |
class Crawler: | |
def __init__(self, site, exclude_external=False, | |
allowed_file_ext_for_soup=_allowed_file_ext_for_soup_): | |
self.site = site | |
self.exclude_external = exclude_external | |
self.allowed_file_ext_for_soup = allowed_file_ext_for_soup | |
self.site_namespace = urlparse(self.site).netloc | |
self.visted_urls = [] | |
self.visted_urls_soup = {} | |
self.threads = list() | |
# initialize soup | |
result = requests.get(self.site) | |
soup = BeautifulSoup(result.text, 'lxml') | |
self.urls = self.get_urls_from_page(soup) | |
def in_namespace(self, url) -> bool: | |
return urlparse(url).netloc == self.site_namespace | |
def get_urls_from_page(self, soup) -> list: | |
href_tags = soup.find_all(href=True) | |
hrefs = [tag.get("href") for tag in href_tags] | |
return hrefs | |
def in_allowed_file_soup(self, url) -> bool: | |
return urlparse(url).path.split( | |
'.')[-1] in self.allowed_file_ext_for_soup | |
def start_crawl(self): | |
self.crawl_and_report(self.site, self.urls) | |
for thread in self.threads: | |
thread.start() | |
for thread in self.threads: | |
thread.join() | |
def crawl_and_report(self, parent_url, urls): | |
for link in urls: | |
if is_mailto_link(link): | |
mail_link = return_mail_address(link) | |
if mail_link not in all_mails: | |
print(f"(unknown mail) {parent_url}, {mail_link}") | |
else: | |
url = urljoin(parent_url, link) | |
if url not in self.visted_urls: | |
self.visted_urls.append(url) | |
defraged_url, fragment = urldefrag(url) | |
if defraged_url not in self.visted_urls_soup: | |
if not self.in_namespace(defraged_url): | |
self.threads.append( | |
threading.Thread( | |
target=worker, args=(parent_url, | |
defraged_url))) | |
self.visted_urls_soup[defraged_url] = None | |
else: | |
response = requests.head(defraged_url) | |
if response.ok: | |
if self.in_allowed_file_soup(url): | |
result = requests.get(defraged_url) | |
self.visted_urls_soup[defraged_url] = BeautifulSoup( | |
result.text, 'lxml') | |
if not check_fragment( | |
self.visted_urls_soup[defraged_url], fragment): | |
print( | |
"(fragment missing)", parent_url, url, fragment) | |
urls_from_page = self.get_urls_from_page( | |
self.visted_urls_soup[defraged_url]) | |
if len(urls_from_page) != 0: | |
self.crawl_and_report( | |
response.url, urls_from_page) | |
else: | |
print( | |
f"({response.status_code} error) {parent_url}, {url}") | |
else: | |
soup = self.visted_urls_soup[defraged_url] | |
if soup is not None: # @ donot check fragment for external links | |
if not check_fragment(soup, fragment): | |
print( | |
"(fragment missing)", parent_url, url, fragment) | |
if __name__ == "__main__": | |
crawler = Crawler("http://localhost:8080/") | |
crawler.start_crawl() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment