Skip to content

Instantly share code, notes, and snippets.

@V3ntus
Last active January 6, 2025 20:46
Show Gist options
  • Save V3ntus/f3c37ca44cb15b353c8fa15ef7de5298 to your computer and use it in GitHub Desktop.
Save V3ntus/f3c37ca44cb15b353c8fa15ef7de5298 to your computer and use it in GitHub Desktop.
Shodan ALPR Scraper and Enumerator
"""
Based on Matt Brown's ALPR videos: https://www.youtube.com/watch?v=BQTy9XVeSaE
This script can:
1. Grab public ALPRs from Shodan using the query below then save the IPs to a file.
2. Enumerate a file of ALPR IPs and attempt to find all cameras associated to the ALPR aggregator found on Shodan.
Supply your paid account Shodan API key with the "SHODAN_API_KEY" environment variable.
https://account.shodan.io/billing
### USE THIS SCRIPT AT YOUR OWN RISK ###
ALPRs are often used by law enforcement. What you do with the information you find is at your own responsibility.
https://www.shodan.io/search?query=port%3A8080+country%3AUS+http.html%3A%22PLease+contact+administrator+to+get+correct+stream+name%22
"""
import re
import os
import sys
from multiprocessing.pool import ThreadPool
import requests
import time
BASE_API_URL = "https://api.shodan.io/shodan"
ALPR_QUERY = 'port:8080 country:US http.html:"PLease contact administrator to get correct stream name"'
MAX_CAM_NUMBER = 10 # the amount of cams to try for each IP
def search(ip_path: str):
IP_ADDRESSES: list[str] = []
def _request(page: int = 1) -> dict:
print("[search] Requesting IP addresses from Shodan...")
return requests.get(f"{BASE_API_URL}/host/search?query={ALPR_QUERY}&key={API_KEY}&page={page}").json()
def _parse(matches: list[dict]):
for match in matches:
IP_ADDRESSES.append(match['ip_str'])
# Make initial search
res = _request(page=1)
# Calculate pages
total_results: int = res["total"]
pages: int = (total_results // 100) + 1
print(f"[search] Parsing {len(res['matches'])} results on page 1...")
_parse(res['matches'])
for page in range(2, pages + 1):
res = _request(page=page)["matches"]
print(f"[search] Parsing {len(res)} results on page {page}...")
_parse(res)
time.sleep(3)
print(f"[search] Done parsing. Writing to {ip_path}...")
with open(ip_path, "a") as f:
f.writelines([f"{ip}\n" for ip in IP_ADDRESSES])
print(f"[search] Done writing {len(IP_ADDRESSES)} IP addresses to {ip_path}.")
def enum_cams(ip_list: list[str], http_path: str):
all_results: list[str] = []
def _executor(ip: str) -> list[str]:
http_cam_links: list[str] = []
print(f"[enum] {f'[{ip}]'.ljust(20)} New thread")
for i in range(MAX_CAM_NUMBER + 1):
print(f"[enum] {f'[{ip}]'.ljust(20)} Trying: cam{i}ir...")
this_http = f"http://{ip}:8080/cam{i}ir"
try:
res = requests.get(this_http)
if res.status_code == 200:
print(f"[enum] {f'[{ip}]'.ljust(20)} Success: cam{i}ir")
http_cam_links.append(this_http)
else:
print(f"[enum] {f'[{ip}]'.ljust(20)} Fail: cam{i}ir")
except Exception as e:
print(f"[enum] {f'[{ip}]'.ljust(20)} Error: {e}")
print(f"[enum] Completed thread for IP address {ip}")
return http_cam_links
print(
f"[enum] Starting thread pool for {len(ip_list)} IP addresses (attempting to find {MAX_CAM_NUMBER} cameras)...")
pool = ThreadPool(10)
results = pool.map(_executor, ip_list)
print("[enum] Done enumerating all IP addresses in pool.")
pool.close()
for result in results:
all_results.extend(result)
print(f"[enum] Writing {len(all_results)} HTTP cam links to {http_path}...")
with open(http_path, "a") as f:
f.writelines([f"{r}\n" for r in all_results])
print(f"[enum] Done writing HTTP cam links.")
if __name__ == "__main__":
cwd = os.path.abspath(os.getcwd())
def _print_usage():
print("Usage: grab_alprs_from_shodan.py search|cams\n")
print("- search: Search for ALPR IPs from Shodan.")
print("- cams: Enumerate ALPR aggregator IPs and find all cameras.")
exit(1)
if len(sys.argv) <= 1:
_print_usage()
elif sys.argv[1] == "search":
if not (API_KEY := os.environ.get('SHODAN_API_KEY')):
raise OSError("SHODAN_API_KEY environment variable not set")
# Ask the user where they want to store results
ip_path = input(f"Path to save IP's to [{cwd}/ips.txt]: ")
if not ip_path:
ip_path = "ips.txt"
search(ip_path)
elif sys.argv[1] == "cams":
ip_list: list[str] = []
ip_path = input(f"Path of saved IP addresses [{cwd}/ips.txt]: ")
if not ip_path:
ip_path = "ips.txt"
if not os.path.exists(ip_path):
raise FileNotFoundError(f"File '{ip_path}' not found.")
# Read IP file and validate. Parse into list of IP addresses
with open(ip_path, "r") as f:
lines = f.readlines()
for idx, line in enumerate(lines, start=1):
if not re.compile(r"^((25[0-5]|(2[0-4]|1[0-9]|[1-9]|)[0-9])(\.(?!$)|$)){4}$").match(line):
raise ValueError(f"Invalid IP address on line {idx}: {line}")
else:
ip_list.append(line.strip())
http_path = input(f"Path to save discovered cams to [{cwd}/cams.txt]: ")
if not http_path:
http_path = "cams.txt"
enum_cams(ip_list, http_path)
else:
_print_usage()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment