Last active
September 27, 2017 01:11
-
-
Save armonge/0fac49f21723a5581652 to your computer and use it in GitHub Desktop.
Gets the list of all urls in a website, outputs a CSV with url, title. Uses python3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
beautifulsoup4==4.3.2 | |
requests==2.3.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from urllib import parse | |
from concurrent import futures | |
import csv | |
import functools | |
from bs4 import BeautifulSoup | |
import requests | |
DOMAINS = ('example.com', 'www.example.com') | |
START_URL = 'http://example.com/' | |
VISITED_URLS = {} | |
def url_in_domains(url): | |
'''Checks if link is from the expected domain''' | |
o = parse.urlparse(url) | |
url_domain = o.netloc.split(':')[0] | |
return url_domain == '' or any(domain == url_domain for domain in DOMAINS) | |
def get_urls(html): | |
soup = BeautifulSoup(html) | |
links = soup.find_all('a', href=True) | |
for link in links: | |
if url_in_domains(link.attrs['href']): | |
yield parse.urljoin(START_URL, parse.urldefrag(link.attrs['href']).url) | |
def get_title(html): | |
soup = BeautifulSoup(html) | |
try: | |
return soup.title.text | |
except AttributeError: | |
return '' | |
def request_url(url, session): | |
try: | |
response = session.head(url, verify=False) | |
response.raise_for_status() | |
if 'text/html' in response.headers['content-type']: | |
response = session.get(url, verify=False) | |
response.raise_for_status() | |
return url, response.text | |
except requests.exceptions.RequestException: | |
pass | |
return url, '' | |
def process_urls(urls, session): | |
with futures.ThreadPoolExecutor(max_workers=10) as e: | |
i = 0 | |
_request_url = functools.partial(request_url, session=session) | |
for url, html in e.map(_request_url, urls): | |
# print(url) | |
i += 1 | |
print('{},'.format(i), end='', flush=True) | |
if html: | |
title = get_title(html) | |
VISITED_URLS[url] = title | |
yield from get_urls(html) | |
if __name__ == '__main__': | |
session = requests.Session() | |
response = session.get(START_URL) | |
response.raise_for_status() | |
processed_urls = set(START_URL) | |
urls = set(get_urls(response.text)) | |
urls_to_process = urls - processed_urls | |
level = 1 | |
print('Level {}, {} urls'.format(level, len(urls_to_process))) | |
try: | |
while urls_to_process: | |
new_urls = process_urls(urls_to_process, session) | |
processed_urls.update(urls_to_process) | |
urls_to_process = set(new_urls) - processed_urls | |
level += 1 | |
print('\nLevel {}, {} urls'.format(level, len(urls_to_process))) | |
if 0 < len(urls_to_process) < 10: | |
import pprint; pprint.pprint(urls_to_process) | |
except KeyboardInterrupt: | |
print('-------------------') | |
print('Stopping Scrapping') | |
print('-------------------') | |
with open('urls.csv', 'w') as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(('url', 'title')) | |
writer.writerows(VISITED_URLS.items()) | |
print('Output written in urls.csv') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment