Skip to content

Instantly share code, notes, and snippets.

@watson0x90
Created December 16, 2025 14:01
Show Gist options
  • Select an option

  • Save watson0x90/3ff6748bc75a4970b078a59064b568bc to your computer and use it in GitHub Desktop.

Select an option

Save watson0x90/3ff6748bc75a4970b078a59064b568bc to your computer and use it in GitHub Desktop.
import asyncio
import os
import subprocess
import time
import sys
import shutil
import argparse
from kasa import Discover, Credentials
# ==========================================
# βš™οΈ USER CONFIGURATION
# ==========================================
# KASA PLUG CREDENTIALS
DEVICE_IP = "192.168.1.144"
USERNAME = "" # <--- ENTER EMAIL
PASSWORD = "" # <--- ENTER PASSWORD
# HARDWARE ID
TARGET_VID = "0634" # Crucial Vendor ID
# PATHS
IMG_FILE = "/mnt/recovery_drive/sniper.img"
MAP_FILE = "/mnt/recovery_drive/sniper.map"
# SETTINGS
PROBE_TIMEOUT = 5 # Seconds to wait for a dd probe
RUN_DURATION = 45 # Shorter runs (45s) since we are hitting bad areas often
COOL_DOWN = 60 # Seconds to turn OFF and cool down
SKIP_SIZE = 512 * 1024 # 512 KB Jumps (High Precision for cleanup)
# ==========================================
# πŸ› οΈ UTILITIES & MAP PARSER
# ==========================================
def get_cleanup_targets(map_path):
"""
Parses mapfile for '?' (Non-tried), '*' (Non-trimmed), and '/' (Non-scraped).
Returns a list of (start, end) tuples.
"""
if not os.path.exists(map_path):
print("❌ Mapfile not found.")
return []
targets = []
try:
with open(map_path, 'r') as f:
for line in f:
if line.startswith('#'): continue
parts = line.strip().split()
if len(parts) < 3: continue
try:
pos = int(parts[0], 16)
size = int(parts[1], 16)
status = parts[2]
except ValueError: continue
# TARGET LOGIC:
# ? = Skipped (Gray)
# * = Failed/Non-trimmed (Yellow)
# / = Failed/Non-scraped (Yellow)
if status in ['?', '*', '/']:
targets.append((pos, pos + size))
print(f"πŸ” Cleanup Analysis: Found {len(targets)} specific areas to recover.")
return targets
except Exception as e:
print(f"⚠️ Error parsing mapfile: {e}")
return []
async def cycle_power(off_duration=10):
"""Authenticates and cycles the smart plug."""
print(f"\nπŸ”Œ [POWER] Connecting to Smart Plug at {DEVICE_IP}...")
try:
creds = Credentials(USERNAME, PASSWORD)
dev = await Discover.discover_single(DEVICE_IP, credentials=creds)
await dev.update()
print(f" [POWER] Turning OFF for {off_duration}s thermal rest...")
await dev.turn_off()
# Visual Countdown
for i in range(off_duration, 0, -1):
sys.stdout.write(f"\r ❄️ Cooling: {i}s... ")
sys.stdout.flush()
await asyncio.sleep(1)
print("")
print(f" [POWER] Turning ON...")
await dev.turn_on()
print(f" [POWER] Waiting 15s for drive boot...")
await asyncio.sleep(15)
return True
except Exception as e:
print(f"❌ [ERROR] Kasa Failed: {e}")
await asyncio.sleep(5)
return False
def find_drive():
"""Finds /dev/sdX based on Vendor ID."""
for letter in "cdefgh":
dev = f"sd{letter}"
path = f"/sys/block/{dev}/device"
if not os.path.exists(path): continue
try:
cmd = f"udevadm info -a -n /dev/{dev} | grep {TARGET_VID}"
res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE)
if res.returncode == 0: return f"/dev/{dev}"
except: continue
return None
def probe_with_dd(device_path, offset_bytes):
"""Runs dd to test if a specific sector is readable."""
cmd = [
"dd", f"if={device_path}", "of=/dev/null",
"bs=512", "count=1", f"skip={offset_bytes}",
"iflag=skip_bytes", "status=none"
]
try:
res = subprocess.run(cmd, timeout=PROBE_TIMEOUT)
if res.returncode == 0:
sys.stdout.write("βœ…")
sys.stdout.flush()
return True
else:
sys.stdout.write("❌")
sys.stdout.flush()
return False
except subprocess.TimeoutExpired:
sys.stdout.write("⏳")
sys.stdout.flush()
return False
except Exception:
return False
# ==========================================
# πŸš€ CLEANUP LOOP
# ==========================================
async def hunt_in_range(start_offset, end_offset):
"""Attacks a specific gap using micro-jumps."""
current_offset = start_offset
gap_size_mb = (end_offset - start_offset) / (1024**2)
print(f"\n βš”οΈ Scrubbing Sector ({gap_size_mb:.2f} MB) at {start_offset/1024**3:.4f} TB")
sys.stdout.write(" Probing: ")
while current_offset < end_offset:
# 1. Acquire Drive (Silent check)
drive = None
while not drive:
drive = find_drive()
if not drive: await asyncio.sleep(2)
# 2. Probe
is_alive = probe_with_dd(drive, current_offset)
if is_alive:
print(f"\n 🎯 [HIT] Data found! Extracting...")
# Calculate remaining size of THIS specific gap
size_to_read = end_offset - current_offset
# We use -r 1 (Retry 1) here to encourage it to try harder on these small bits
cmd = [
"timeout", "-s", "INT", str(RUN_DURATION),
"ddrescue", "-n", "-d", "-r", "1", "-c", "32",
f"--input-position={current_offset}",
f"--size={size_to_read}",
drive, IMG_FILE, MAP_FILE
]
subprocess.run(cmd)
# Thermal Shutdown after success
await cycle_power(off_duration=COOL_DOWN)
# Advance 1MB
current_offset += (1024 * 1024)
print(f" Resume Probing: ", end="")
else:
# MISS - Jump forward small amount (512KB)
current_offset += SKIP_SIZE
if current_offset >= end_offset:
print(" -> Done.")
return
async def run_cleanup():
"""Main cleanup coordinator."""
if os.path.exists(MAP_FILE):
print("πŸ’Ύ Backing up mapfile...", end="")
shutil.copy(MAP_FILE, f"{MAP_FILE}.bak")
print(" Done.")
targets = get_cleanup_targets(MAP_FILE)
if not targets:
print("βœ… No cleanup targets found! You might be done.")
return
print(f"πŸ“‹ Mission: Scrub {len(targets)} imperfect areas.")
for i, (start, end) in enumerate(targets):
print(f"πŸ”Ή Target {i+1}/{len(targets)}...", end="")
await hunt_in_range(start, end)
print("\nπŸŽ‰ Cleanup Pass Complete.")
if __name__ == "__main__":
if os.geteuid() != 0:
sys.exit("❌ ERROR: Must run as root (sudo).")
try:
asyncio.run(run_cleanup())
except KeyboardInterrupt:
print("\nπŸ›‘ Stopped by user.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment