Skip to content

Instantly share code, notes, and snippets.

@maximusfox
Last active March 6, 2023 09:28
Show Gist options
  • Select an option

  • Save maximusfox/7413ce85448236ea232c to your computer and use it in GitHub Desktop.

Select an option

Save maximusfox/7413ce85448236ea232c to your computer and use it in GitHub Desktop.
Multi-thread urls scanner
#!/usr/bin/env python3
import argparse
import re
import queue
import requests
import threading
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"urls_file",
help="File with list of URLs to scan"
)
parser.add_argument(
"suffix_file",
help="File with list of suffixes to append to URLs"
)
parser.add_argument(
"regex_file",
help="File with list of regular expressions to match against URLs' content"
)
parser.add_argument(
"-t", "--threads",
type=int,
default=100,
help="Number of threads to use"
)
parser.add_argument(
"-w", "--timeout",
type=int,
default=5,
help="Request timeout in seconds"
)
return parser.parse_args()
def load_list_from_file(file_name):
with open(file_name) as file:
return [line.strip() for line in file]
def load_regex_list_from_file(file_name):
with open(file_name) as file:
return [re.compile(line.strip()) for line in file]
def scanner_thread(queue, regex_list, timeout):
while True:
try:
url = queue.get(timeout=1)
except queue.Empty:
break
try:
response = requests.get(url, timeout=timeout)
decoded_content = response.content.decode("utf-8")
except requests.exceptions.Timeout:
print(f"[TIMEOUT] {url}")
continue
except Exception as e:
print(f"[ERROR] {url}: {str(e)}")
continue
for regex in regex_list:
if regex.match(decoded_content):
print(f"[GOOD] {url}")
break
else:
print(f"[BAD] {url}")
def main():
args = get_args()
urls = load_list_from_file(args.urls_file)
suffixes = load_list_from_file(args.suffix_file)
queue_pull = queue.Queue()
for url in urls:
for suffix in suffixes:
url_suffix = url + suffix
print(f"[ADD TO QUEUE] {url_suffix}")
queue_pull.put(url_suffix)
regex_list = load_regex_list_from_file(args.regex_file)
threads_pull = []
for _ in range(args.threads):
thread = threading.Thread(target=scanner_thread, args=(queue_pull, regex_list, args.timeout))
threads_pull.append(thread)
thread.start()
for thread in threads_pull:
thread.join()
print("[INFO] Work finished!")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment