Created
April 5, 2026 02:43
-
-
Save M1noa/f00ccc6153a0964a245b364d4ea9b587 to your computer and use it in GitHub Desktop.
Cochlea Remake
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Remake of Benn Jordan's script as shown in: https://www.youtube.com/watch?v=UMIwNiwQewQ | |
| # This does work. DO NOT USE THIS MALICIOUSLY, PLEASE ONLY USE THIS FOR RESEARCH PURPOSES!! | |
| import sys | |
| import os | |
| import time | |
| import csv | |
| import re | |
| import platform | |
| from datetime import datetime | |
| from scapy.all import * | |
| from scapy.layers.dot11 import Dot11 | |
| # Verified Ring LLC MAC prefixes (official IEEE registrations) | |
| RING_OUI_PREFIXES = [ | |
| '00:b4:63', '18:7f:88', '24:2b:d6', '34:3e:a4', | |
| '54:e0:19', '5c:47:5e', '64:9a:63', '90:48:6c', | |
| '9c:76:13', 'ac:9f:c3', 'c4:db:ad', 'cc:3b:fb' | |
| ] | |
| packet_counts = {} | |
| packet_sizes = {} | |
| last_reset = time.time() | |
| spinner_idx = 0 | |
| def print_logo(): | |
| print(""" | |
| ################################################ | |
| # # | |
| # COCHLEA # | |
| # Passive IoT Activity Detector # | |
| # # | |
| ################################################ | |
| """) | |
| def get_interfaces(): | |
| return get_if_list() | |
| def is_ring_mac(mac): | |
| mac_prefix = ':'.join(mac.split(':')[:3]).lower() | |
| return mac_prefix in RING_OUI_PREFIXES | |
| def scan_for_ring_devices(interface, duration=3): | |
| print(f"[*] Scanning the airwaves on {interface} for {duration} seconds...") | |
| found_devices = {} | |
| def packet_handler(pkt): | |
| if pkt.haslayer(Dot11): | |
| if pkt.type == 0 and pkt.subtype == 8: | |
| mac = pkt.addr2.upper() | |
| if mac and ':' in mac and is_ring_mac(mac): | |
| if mac not in found_devices: | |
| try: | |
| ssid = pkt.info.decode('utf-8', errors='ignore') | |
| found_devices[mac] = ssid if ssid else "Unknown" | |
| except: | |
| found_devices[mac] = "Unknown" | |
| sniff(iface=interface, prn=packet_handler, timeout=duration, store=0) | |
| return found_devices | |
| def validate_mac(mac): | |
| mac = mac.strip().upper() | |
| if re.match(r'^([0-9A-F]{2}[:-]){5}([0-9A-F]{2})$', mac): | |
| return mac.replace('-', ':') | |
| return None | |
| def calculate_throughput(mac, window_seconds=1): | |
| global packet_sizes, last_reset | |
| current_time = time.time() | |
| if mac not in packet_sizes: | |
| return 0 | |
| if current_time - last_reset >= window_seconds: | |
| throughput = (packet_sizes[mac] / 1024) / window_seconds | |
| packet_sizes[mac] = 0 | |
| last_reset = current_time | |
| return throughput | |
| return 0 | |
| def monitor_traffic(interface, target_mac, viewer_mac, threshold, save_csv): | |
| global packet_sizes, last_reset, spinner_idx, csv_file, csv_writer | |
| csv_file = None | |
| csv_writer = None | |
| if save_csv: | |
| csv_file = open('cochlea_alerts.csv', 'a', newline='') | |
| csv_writer = csv.writer(csv_file) | |
| csv_writer.writerow(['Timestamp', 'Alert Type', 'Target MAC', 'Target KB/s', 'Viewer MAC', 'Viewer KB/s']) | |
| print(f"[*] INITIALIZING COCHLEA AUTO-AUDIT") | |
| print(f"[*] Target MAC: {target_mac} [Amazon Technologies Inc.]") | |
| print(f"[*] Threshold: {threshold} KB/s") | |
| if viewer_mac: | |
| print(f"[*] Viewer MAC: {viewer_mac}") | |
| print() | |
| print("[*] Listening for Wi-Fi activity... |", end='', flush=True) | |
| target_mac = target_mac.upper() | |
| viewer_mac = viewer_mac.upper() if viewer_mac else None | |
| spinner = ['|', '/', '-', '\\'] | |
| spinner_idx = 0 | |
| def packet_handler(pkt): | |
| global packet_sizes | |
| if pkt.haslayer(Dot11): | |
| src_mac = pkt.addr2.upper() if pkt.addr2 else None | |
| dst_mac = pkt.addr1.upper() if pkt.addr1 else None | |
| pkt_size = len(pkt) | |
| if src_mac == target_mac or dst_mac == target_mac: | |
| packet_sizes[target_mac] = packet_sizes.get(target_mac, 0) + pkt_size | |
| if viewer_mac and (src_mac == viewer_mac or dst_mac == viewer_mac): | |
| packet_sizes[viewer_mac] = packet_sizes.get(viewer_mac, 0) + pkt_size | |
| try: | |
| while True: | |
| sniff(iface=interface, prn=packet_handler, timeout=1, store=0) | |
| target_throughput = calculate_throughput(target_mac) | |
| viewer_throughput = calculate_throughput(viewer_mac) if viewer_mac else 0 | |
| print(f"\r[*] Listening for Wi-Fi activity... {spinner[spinner_idx]}", end='', flush=True) | |
| spinner_idx = (spinner_idx + 1) % len(spinner) | |
| if target_throughput >= threshold: | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| if viewer_mac and viewer_throughput >= threshold / 2: | |
| alert_type = "MOVEMENT IN FRONT OF CAMERA & OWNER WATCHING CAMERA (Download/Receive)" | |
| print(f"\n[{timestamp}] ALERT: {alert_type}") | |
| print(f" -> Target : {target_throughput:8.1f} KB/s") | |
| print(f" -> Viewer : {viewer_throughput:8.1f} KB/s") | |
| if save_csv and csv_writer: | |
| csv_writer.writerow([timestamp, alert_type, target_mac, f"{target_throughput:.1f}", viewer_mac, f"{viewer_throughput:.1f}"]) | |
| csv_file.flush() | |
| else: | |
| alert_type = "MOVEMENT IN FRONT OF CAMERA (Upload/Transmit)" | |
| print(f"\n[{timestamp}] ALERT: {alert_type}") | |
| print(f" -> Target : {target_throughput:8.1f} KB/s") | |
| print(f" -> Viewer : {viewer_throughput:8.1f} KB/s") | |
| if save_csv and csv_writer: | |
| csv_writer.writerow([timestamp, alert_type, target_mac, f"{target_throughput:.1f}", viewer_mac or "N/A", f"{viewer_throughput:.1f}"]) | |
| csv_file.flush() | |
| print() | |
| print("[*] Listening for Wi-Fi activity... |", end='', flush=True) | |
| except KeyboardInterrupt: | |
| print("\n\n[*] Stopping Cochlea...") | |
| if csv_file: | |
| csv_file.close() | |
| print("[*] Alerts saved to cochlea_alerts.csv") | |
| print("[*] Goodbye!") | |
| def main(): | |
| print_logo() | |
| print("--- Interface Selection ---") | |
| interfaces = get_interfaces() | |
| if not interfaces: | |
| print("[!] No network interfaces found. Make sure Npcap is installed on Windows.") | |
| sys.exit(1) | |
| for i, iface in enumerate(interfaces, 1): | |
| print(f"{i}. {iface}") | |
| try: | |
| choice = input("Select your Monitor Mode interface (#): ") | |
| selected_idx = int(choice) - 1 | |
| if selected_idx < 0 or selected_idx >= len(interfaces): | |
| print("[!] Invalid selection") | |
| sys.exit(1) | |
| interface = interfaces[selected_idx] | |
| except ValueError: | |
| print("[!] Invalid input") | |
| sys.exit(1) | |
| print() | |
| print("--- Target Discovery ---") | |
| scan_choice = input("Do you want to scan for Ring cameras? (Y/n): ").strip().lower() | |
| target_mac = None | |
| if scan_choice != 'n': | |
| ring_devices = scan_for_ring_devices(interface) | |
| if ring_devices: | |
| print("\nFound Potential Ring/Amazon Devices:") | |
| for i, (mac, ssid) in enumerate(ring_devices.items(), 1): | |
| print(f"{i}. {mac} ({ssid})") | |
| print(f"{len(ring_devices) + 1}. Enter MAC manually") | |
| try: | |
| target_choice = input("Select the Target to monitor (#): ") | |
| target_idx = int(target_choice) - 1 | |
| if target_idx == len(ring_devices): | |
| target_mac = input("Enter MAC address: ").strip() | |
| target_mac = validate_mac(target_mac) | |
| if not target_mac: | |
| print("[!] Invalid MAC address") | |
| sys.exit(1) | |
| elif 0 <= target_idx < len(ring_devices): | |
| target_mac = list(ring_devices.keys())[target_idx] | |
| else: | |
| print("[!] Invalid selection") | |
| sys.exit(1) | |
| except ValueError: | |
| print("[!] Invalid input") | |
| sys.exit(1) | |
| else: | |
| print("[!] No Ring devices found") | |
| target_mac = input("Enter MAC address manually: ").strip() | |
| target_mac = validate_mac(target_mac) | |
| if not target_mac: | |
| print("[!] Invalid MAC address") | |
| sys.exit(1) | |
| else: | |
| target_mac = input("Enter MAC address: ").strip() | |
| target_mac = validate_mac(target_mac) | |
| if not target_mac: | |
| print("[!] Invalid MAC address") | |
| sys.exit(1) | |
| print() | |
| print("--- Viewer Discovery ---") | |
| viewer_mac = input("Enter the Viewer MAC [Leave blank to skip]: ").strip() | |
| if viewer_mac: | |
| viewer_mac = validate_mac(viewer_mac) | |
| if not viewer_mac: | |
| print("[!] Invalid Viewer MAC address") | |
| sys.exit(1) | |
| print("[Tip] Entering a Viewer MAC is recommended so you can see the dual-burst.") | |
| else: | |
| viewer_mac = None | |
| print() | |
| print("--- Threshold Configuration ---") | |
| threshold_input = input("Set Sensitivity Threshold in KB/s [Default: 500]: ").strip() | |
| try: | |
| threshold = int(threshold_input) if threshold_input else 500 | |
| except ValueError: | |
| threshold = 500 | |
| save_csv = input("Would you like to save alerts to a CSV file? (Y/n): ").strip().lower() | |
| save_csv = save_csv != 'n' | |
| print() | |
| monitor_traffic(interface, target_mac, viewer_mac, threshold, save_csv) | |
| if __name__ == "__main__": | |
| if len(sys.argv) > 1 and sys.argv[1] == '--help': | |
| print("Cochlea - Passive IoT Activity Detector") | |
| print("Usage: sudo python3 cochlea.py") | |
| print("\nRequirements:") | |
| print("- Wi-Fi adapter capable of Monitor Mode") | |
| print("- Python 3 with scapy installed") | |
| print("- Root/sudo privileges (or Administrator on Windows)") | |
| print("- Windows: Npcap installed (https://npcap.com)") | |
| print("\nInstall dependencies:") | |
| print(" pip install scapy") | |
| sys.exit(0) | |
| print(f"[*] Running on {platform.system()} {platform.release()}") | |
| try: | |
| main() | |
| except PermissionError: | |
| print("[!] Error: This script requires root/sudo privileges to capture packets.") | |
| if platform.system() == "Windows": | |
| print(" Run as Administrator or use Npcap in WinPcap API-compatible mode.") | |
| else: | |
| print(" Run with: sudo python3 cochlea.py") | |
| sys.exit(1) | |
| except KeyboardInterrupt: | |
| print("\n[*] Interrupted by user") | |
| sys.exit(0) | |
| except Exception as e: | |
| print(f"[!] Error: {e}") | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment