Skip to content

Instantly share code, notes, and snippets.

@M1noa
Created April 5, 2026 02:43
Show Gist options
  • Select an option

  • Save M1noa/f00ccc6153a0964a245b364d4ea9b587 to your computer and use it in GitHub Desktop.

Select an option

Save M1noa/f00ccc6153a0964a245b364d4ea9b587 to your computer and use it in GitHub Desktop.
Cochlea Remake
#!/usr/bin/env python3
# Remake of Benn Jordan's script as shown in: https://www.youtube.com/watch?v=UMIwNiwQewQ
# This does work. DO NOT USE THIS MALICIOUSLY, PLEASE ONLY USE THIS FOR RESEARCH PURPOSES!!
import sys
import os
import time
import csv
import re
import platform
from datetime import datetime
from scapy.all import *
from scapy.layers.dot11 import Dot11
# Verified Ring LLC MAC prefixes (official IEEE registrations)
RING_OUI_PREFIXES = [
'00:b4:63', '18:7f:88', '24:2b:d6', '34:3e:a4',
'54:e0:19', '5c:47:5e', '64:9a:63', '90:48:6c',
'9c:76:13', 'ac:9f:c3', 'c4:db:ad', 'cc:3b:fb'
]
packet_counts = {}
packet_sizes = {}
last_reset = time.time()
spinner_idx = 0
def print_logo():
print("""
################################################
# #
# COCHLEA #
# Passive IoT Activity Detector #
# #
################################################
""")
def get_interfaces():
return get_if_list()
def is_ring_mac(mac):
mac_prefix = ':'.join(mac.split(':')[:3]).lower()
return mac_prefix in RING_OUI_PREFIXES
def scan_for_ring_devices(interface, duration=3):
print(f"[*] Scanning the airwaves on {interface} for {duration} seconds...")
found_devices = {}
def packet_handler(pkt):
if pkt.haslayer(Dot11):
if pkt.type == 0 and pkt.subtype == 8:
mac = pkt.addr2.upper()
if mac and ':' in mac and is_ring_mac(mac):
if mac not in found_devices:
try:
ssid = pkt.info.decode('utf-8', errors='ignore')
found_devices[mac] = ssid if ssid else "Unknown"
except:
found_devices[mac] = "Unknown"
sniff(iface=interface, prn=packet_handler, timeout=duration, store=0)
return found_devices
def validate_mac(mac):
mac = mac.strip().upper()
if re.match(r'^([0-9A-F]{2}[:-]){5}([0-9A-F]{2})$', mac):
return mac.replace('-', ':')
return None
def calculate_throughput(mac, window_seconds=1):
global packet_sizes, last_reset
current_time = time.time()
if mac not in packet_sizes:
return 0
if current_time - last_reset >= window_seconds:
throughput = (packet_sizes[mac] / 1024) / window_seconds
packet_sizes[mac] = 0
last_reset = current_time
return throughput
return 0
def monitor_traffic(interface, target_mac, viewer_mac, threshold, save_csv):
global packet_sizes, last_reset, spinner_idx, csv_file, csv_writer
csv_file = None
csv_writer = None
if save_csv:
csv_file = open('cochlea_alerts.csv', 'a', newline='')
csv_writer = csv.writer(csv_file)
csv_writer.writerow(['Timestamp', 'Alert Type', 'Target MAC', 'Target KB/s', 'Viewer MAC', 'Viewer KB/s'])
print(f"[*] INITIALIZING COCHLEA AUTO-AUDIT")
print(f"[*] Target MAC: {target_mac} [Amazon Technologies Inc.]")
print(f"[*] Threshold: {threshold} KB/s")
if viewer_mac:
print(f"[*] Viewer MAC: {viewer_mac}")
print()
print("[*] Listening for Wi-Fi activity... |", end='', flush=True)
target_mac = target_mac.upper()
viewer_mac = viewer_mac.upper() if viewer_mac else None
spinner = ['|', '/', '-', '\\']
spinner_idx = 0
def packet_handler(pkt):
global packet_sizes
if pkt.haslayer(Dot11):
src_mac = pkt.addr2.upper() if pkt.addr2 else None
dst_mac = pkt.addr1.upper() if pkt.addr1 else None
pkt_size = len(pkt)
if src_mac == target_mac or dst_mac == target_mac:
packet_sizes[target_mac] = packet_sizes.get(target_mac, 0) + pkt_size
if viewer_mac and (src_mac == viewer_mac or dst_mac == viewer_mac):
packet_sizes[viewer_mac] = packet_sizes.get(viewer_mac, 0) + pkt_size
try:
while True:
sniff(iface=interface, prn=packet_handler, timeout=1, store=0)
target_throughput = calculate_throughput(target_mac)
viewer_throughput = calculate_throughput(viewer_mac) if viewer_mac else 0
print(f"\r[*] Listening for Wi-Fi activity... {spinner[spinner_idx]}", end='', flush=True)
spinner_idx = (spinner_idx + 1) % len(spinner)
if target_throughput >= threshold:
timestamp = datetime.now().strftime("%H:%M:%S")
if viewer_mac and viewer_throughput >= threshold / 2:
alert_type = "MOVEMENT IN FRONT OF CAMERA & OWNER WATCHING CAMERA (Download/Receive)"
print(f"\n[{timestamp}] ALERT: {alert_type}")
print(f" -> Target : {target_throughput:8.1f} KB/s")
print(f" -> Viewer : {viewer_throughput:8.1f} KB/s")
if save_csv and csv_writer:
csv_writer.writerow([timestamp, alert_type, target_mac, f"{target_throughput:.1f}", viewer_mac, f"{viewer_throughput:.1f}"])
csv_file.flush()
else:
alert_type = "MOVEMENT IN FRONT OF CAMERA (Upload/Transmit)"
print(f"\n[{timestamp}] ALERT: {alert_type}")
print(f" -> Target : {target_throughput:8.1f} KB/s")
print(f" -> Viewer : {viewer_throughput:8.1f} KB/s")
if save_csv and csv_writer:
csv_writer.writerow([timestamp, alert_type, target_mac, f"{target_throughput:.1f}", viewer_mac or "N/A", f"{viewer_throughput:.1f}"])
csv_file.flush()
print()
print("[*] Listening for Wi-Fi activity... |", end='', flush=True)
except KeyboardInterrupt:
print("\n\n[*] Stopping Cochlea...")
if csv_file:
csv_file.close()
print("[*] Alerts saved to cochlea_alerts.csv")
print("[*] Goodbye!")
def main():
print_logo()
print("--- Interface Selection ---")
interfaces = get_interfaces()
if not interfaces:
print("[!] No network interfaces found. Make sure Npcap is installed on Windows.")
sys.exit(1)
for i, iface in enumerate(interfaces, 1):
print(f"{i}. {iface}")
try:
choice = input("Select your Monitor Mode interface (#): ")
selected_idx = int(choice) - 1
if selected_idx < 0 or selected_idx >= len(interfaces):
print("[!] Invalid selection")
sys.exit(1)
interface = interfaces[selected_idx]
except ValueError:
print("[!] Invalid input")
sys.exit(1)
print()
print("--- Target Discovery ---")
scan_choice = input("Do you want to scan for Ring cameras? (Y/n): ").strip().lower()
target_mac = None
if scan_choice != 'n':
ring_devices = scan_for_ring_devices(interface)
if ring_devices:
print("\nFound Potential Ring/Amazon Devices:")
for i, (mac, ssid) in enumerate(ring_devices.items(), 1):
print(f"{i}. {mac} ({ssid})")
print(f"{len(ring_devices) + 1}. Enter MAC manually")
try:
target_choice = input("Select the Target to monitor (#): ")
target_idx = int(target_choice) - 1
if target_idx == len(ring_devices):
target_mac = input("Enter MAC address: ").strip()
target_mac = validate_mac(target_mac)
if not target_mac:
print("[!] Invalid MAC address")
sys.exit(1)
elif 0 <= target_idx < len(ring_devices):
target_mac = list(ring_devices.keys())[target_idx]
else:
print("[!] Invalid selection")
sys.exit(1)
except ValueError:
print("[!] Invalid input")
sys.exit(1)
else:
print("[!] No Ring devices found")
target_mac = input("Enter MAC address manually: ").strip()
target_mac = validate_mac(target_mac)
if not target_mac:
print("[!] Invalid MAC address")
sys.exit(1)
else:
target_mac = input("Enter MAC address: ").strip()
target_mac = validate_mac(target_mac)
if not target_mac:
print("[!] Invalid MAC address")
sys.exit(1)
print()
print("--- Viewer Discovery ---")
viewer_mac = input("Enter the Viewer MAC [Leave blank to skip]: ").strip()
if viewer_mac:
viewer_mac = validate_mac(viewer_mac)
if not viewer_mac:
print("[!] Invalid Viewer MAC address")
sys.exit(1)
print("[Tip] Entering a Viewer MAC is recommended so you can see the dual-burst.")
else:
viewer_mac = None
print()
print("--- Threshold Configuration ---")
threshold_input = input("Set Sensitivity Threshold in KB/s [Default: 500]: ").strip()
try:
threshold = int(threshold_input) if threshold_input else 500
except ValueError:
threshold = 500
save_csv = input("Would you like to save alerts to a CSV file? (Y/n): ").strip().lower()
save_csv = save_csv != 'n'
print()
monitor_traffic(interface, target_mac, viewer_mac, threshold, save_csv)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == '--help':
print("Cochlea - Passive IoT Activity Detector")
print("Usage: sudo python3 cochlea.py")
print("\nRequirements:")
print("- Wi-Fi adapter capable of Monitor Mode")
print("- Python 3 with scapy installed")
print("- Root/sudo privileges (or Administrator on Windows)")
print("- Windows: Npcap installed (https://npcap.com)")
print("\nInstall dependencies:")
print(" pip install scapy")
sys.exit(0)
print(f"[*] Running on {platform.system()} {platform.release()}")
try:
main()
except PermissionError:
print("[!] Error: This script requires root/sudo privileges to capture packets.")
if platform.system() == "Windows":
print(" Run as Administrator or use Npcap in WinPcap API-compatible mode.")
else:
print(" Run with: sudo python3 cochlea.py")
sys.exit(1)
except KeyboardInterrupt:
print("\n[*] Interrupted by user")
sys.exit(0)
except Exception as e:
print(f"[!] Error: {e}")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment