Last active
January 6, 2025 20:46
-
-
Save V3ntus/f3c37ca44cb15b353c8fa15ef7de5298 to your computer and use it in GitHub Desktop.
Shodan ALPR Scraper and Enumerator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Based on Matt Brown's ALPR videos: https://www.youtube.com/watch?v=BQTy9XVeSaE | |
This script can: | |
1. Grab public ALPRs from Shodan using the query below then save the IPs to a file. | |
2. Enumerate a file of ALPR IPs and attempt to find all cameras associated to the ALPR aggregator found on Shodan. | |
Supply your paid account Shodan API key with the "SHODAN_API_KEY" environment variable. | |
https://account.shodan.io/billing | |
### USE THIS SCRIPT AT YOUR OWN RISK ### | |
ALPRs are often used by law enforcement. What you do with the information you find is at your own responsibility. | |
https://www.shodan.io/search?query=port%3A8080+country%3AUS+http.html%3A%22PLease+contact+administrator+to+get+correct+stream+name%22 | |
""" | |
import re | |
import os | |
import sys | |
from multiprocessing.pool import ThreadPool | |
import requests | |
import time | |
BASE_API_URL = "https://api.shodan.io/shodan" | |
ALPR_QUERY = 'port:8080 country:US http.html:"PLease contact administrator to get correct stream name"' | |
MAX_CAM_NUMBER = 10 # the amount of cams to try for each IP | |
def search(ip_path: str): | |
IP_ADDRESSES: list[str] = [] | |
def _request(page: int = 1) -> dict: | |
print("[search] Requesting IP addresses from Shodan...") | |
return requests.get(f"{BASE_API_URL}/host/search?query={ALPR_QUERY}&key={API_KEY}&page={page}").json() | |
def _parse(matches: list[dict]): | |
for match in matches: | |
IP_ADDRESSES.append(match['ip_str']) | |
# Make initial search | |
res = _request(page=1) | |
# Calculate pages | |
total_results: int = res["total"] | |
pages: int = (total_results // 100) + 1 | |
print(f"[search] Parsing {len(res['matches'])} results on page 1...") | |
_parse(res['matches']) | |
for page in range(2, pages + 1): | |
res = _request(page=page)["matches"] | |
print(f"[search] Parsing {len(res)} results on page {page}...") | |
_parse(res) | |
time.sleep(3) | |
print(f"[search] Done parsing. Writing to {ip_path}...") | |
with open(ip_path, "a") as f: | |
f.writelines([f"{ip}\n" for ip in IP_ADDRESSES]) | |
print(f"[search] Done writing {len(IP_ADDRESSES)} IP addresses to {ip_path}.") | |
def enum_cams(ip_list: list[str], http_path: str): | |
all_results: list[str] = [] | |
def _executor(ip: str) -> list[str]: | |
http_cam_links: list[str] = [] | |
print(f"[enum] {f'[{ip}]'.ljust(20)} New thread") | |
for i in range(MAX_CAM_NUMBER + 1): | |
print(f"[enum] {f'[{ip}]'.ljust(20)} Trying: cam{i}ir...") | |
this_http = f"http://{ip}:8080/cam{i}ir" | |
try: | |
res = requests.get(this_http) | |
if res.status_code == 200: | |
print(f"[enum] {f'[{ip}]'.ljust(20)} Success: cam{i}ir") | |
http_cam_links.append(this_http) | |
else: | |
print(f"[enum] {f'[{ip}]'.ljust(20)} Fail: cam{i}ir") | |
except Exception as e: | |
print(f"[enum] {f'[{ip}]'.ljust(20)} Error: {e}") | |
print(f"[enum] Completed thread for IP address {ip}") | |
return http_cam_links | |
print( | |
f"[enum] Starting thread pool for {len(ip_list)} IP addresses (attempting to find {MAX_CAM_NUMBER} cameras)...") | |
pool = ThreadPool(10) | |
results = pool.map(_executor, ip_list) | |
print("[enum] Done enumerating all IP addresses in pool.") | |
pool.close() | |
for result in results: | |
all_results.extend(result) | |
print(f"[enum] Writing {len(all_results)} HTTP cam links to {http_path}...") | |
with open(http_path, "a") as f: | |
f.writelines([f"{r}\n" for r in all_results]) | |
print(f"[enum] Done writing HTTP cam links.") | |
if __name__ == "__main__": | |
cwd = os.path.abspath(os.getcwd()) | |
def _print_usage(): | |
print("Usage: grab_alprs_from_shodan.py search|cams\n") | |
print("- search: Search for ALPR IPs from Shodan.") | |
print("- cams: Enumerate ALPR aggregator IPs and find all cameras.") | |
exit(1) | |
if len(sys.argv) <= 1: | |
_print_usage() | |
elif sys.argv[1] == "search": | |
if not (API_KEY := os.environ.get('SHODAN_API_KEY')): | |
raise OSError("SHODAN_API_KEY environment variable not set") | |
# Ask the user where they want to store results | |
ip_path = input(f"Path to save IP's to [{cwd}/ips.txt]: ") | |
if not ip_path: | |
ip_path = "ips.txt" | |
search(ip_path) | |
elif sys.argv[1] == "cams": | |
ip_list: list[str] = [] | |
ip_path = input(f"Path of saved IP addresses [{cwd}/ips.txt]: ") | |
if not ip_path: | |
ip_path = "ips.txt" | |
if not os.path.exists(ip_path): | |
raise FileNotFoundError(f"File '{ip_path}' not found.") | |
# Read IP file and validate. Parse into list of IP addresses | |
with open(ip_path, "r") as f: | |
lines = f.readlines() | |
for idx, line in enumerate(lines, start=1): | |
if not re.compile(r"^((25[0-5]|(2[0-4]|1[0-9]|[1-9]|)[0-9])(\.(?!$)|$)){4}$").match(line): | |
raise ValueError(f"Invalid IP address on line {idx}: {line}") | |
else: | |
ip_list.append(line.strip()) | |
http_path = input(f"Path to save discovered cams to [{cwd}/cams.txt]: ") | |
if not http_path: | |
http_path = "cams.txt" | |
enum_cams(ip_list, http_path) | |
else: | |
_print_usage() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment