Last active
May 31, 2023 21:18
-
-
Save pkdavies/da0ed6cc357c25565d2f1ab8afaf32aa to your computer and use it in GitHub Desktop.
Recursively find text within a site using multi-threading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from bs4 import BeautifulSoup | |
import threading | |
from queue import Queue | |
from fake_useragent import UserAgent | |
URL = 'https://example.com' # Replace with the website you want to start at | |
SEARCH_STRING = 'example' # Replace with the string you want to search for | |
MAX_THREADS = 10 # Set the maximum number of threads | |
# Initialize the UserAgent object | |
ua = UserAgent() | |
def process_url(url, found_links, to_visit, mutex): | |
try: | |
headers = {'User-Agent': ua.random} | |
response = requests.get(url, headers=headers) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
for link in soup.find_all('a'): | |
href = link.get('href') | |
with mutex: | |
if href not in found_links and href is not None: | |
found_links.add(href) | |
to_visit.put(href) | |
# Skip NoneType href | |
if href is None: | |
continue | |
if SEARCH_STRING in href: | |
print(f"Found link: {href}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error processing a URL: {e}") | |
def crawler(start_url, search_string): | |
found_links = set([start_url]) | |
to_visit = Queue() | |
to_visit.put(start_url) | |
thread_list = [] | |
mutex = threading.Lock() | |
while not to_visit.empty(): | |
while len(thread_list) < MAX_THREADS and not to_visit.empty(): | |
url = to_visit.get() | |
thread = threading.Thread(target=process_url, args=(url, found_links, to_visit, mutex)) | |
thread.start() | |
thread_list.append(thread) | |
for t in thread_list: | |
t.join() | |
thread_list.remove(t) | |
print(f"Threads running: {len(thread_list)} || Queue size: {to_visit.qsize()}") | |
if __name__ == '__main__': | |
crawler(URL, SEARCH_STRING) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment