Skip to content

Instantly share code, notes, and snippets.

@watson0x90
Created December 14, 2025 22:08
Show Gist options
  • Select an option

  • Save watson0x90/efe012e6f3eae397a06ca53cedbe87a0 to your computer and use it in GitHub Desktop.

Select an option

Save watson0x90/efe012e6f3eae397a06ca53cedbe87a0 to your computer and use it in GitHub Desktop.
import asyncio
import os
import subprocess
import time
import re
from kasa import Discover, Credentials
# --- CONFIGURATION ---
TARGET_VID = "0634" # Crucial Vendor ID
TARGET_PID = "5603" # Crucial Product ID
KASA_IP = "192.168.1.144"
IMG_FILE = "/mnt/recovery_drive/sniper.img"
MAP_FILE = "/mnt/recovery_drive/sniper.map"
# SKIP SETTINGS
SKIP_SIZE_BYTES = 10 * 1024 * 1024 * 1024 # 10 GB Jumps
DRIVE_SIZE_BYTES = 2000398934016 # 2TB (Approx, can be adjusted)
# TIMING
RUN_LIMIT_SECONDS = 90 # Prevent overheating
COOL_DOWN_SECONDS = 60 # Let controller rest
async def cycle_power():
print(f"\nπŸ”Œ [POWER] Connecting to Smart Plug at {KASA_IP}...")
try:
dev = await Discover.discover_single(KASA_IP)
await dev.update()
print(f" [POWER] Turning OFF...")
await dev.turn_off()
await asyncio.sleep(10) # Hard wait for capacitors to drain
print(f" [POWER] Turning ON...")
await dev.turn_on()
print(f" [POWER] Cycle Complete. Waiting for drive to boot...")
except Exception as e:
print(f"❌ [ERROR] Kasa failed: {e}")
def find_drive():
"""Scans /sys/class/scsi_device to find the sdX with matching VID/PID."""
# Simple check of all sd[c-z] devices
for letter in "cdefgh":
dev = f"sd{letter}"
path = f"/sys/block/{dev}/device"
if not os.path.exists(path):
continue
# Check Vendor/Model via udevadm or sysfs
try:
# Quick sysfs check
with open(f"/sys/block/{dev}/device/vendor", "r") as f:
vendor = f.read().strip()
with open(f"/sys/block/{dev}/device/model", "r") as f:
model = f.read().strip()
# You might need to adjust these strings based on exact 'lsusb' output,
# but usually VID/PID is safer via udevadm.
# Let's use the udevadm command you trusted in bash:
cmd = f"udevadm info -a -n /dev/{dev} | grep {TARGET_VID}"
res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE)
if res.returncode == 0:
return f"/dev/{dev}"
except:
continue
return None
def get_resume_point(map_file_path):
"""
Parses the mapfile to find the last 'non-finished' position.
This is complex, so for this logic, we will rely on a simpler
'Last Attempted' pointer that we maintain in memory or
start from the End of the last successful block.
"""
# If mapfile doesn't exist, start at 0
if not os.path.exists(map_file_path):
return 0
# Read the mapfile to find the highest "finished" block?
# Or simply return the value we want to skip FROM.
# For this script, we will allow the user to override,
# but initially we will just use ddrescue's --input-position logic.
return None # We will let ddrescue handle the map unless we force a skip.
def probe_drive(device_path, offset):
"""
The Sonar: Attempts to read 1 sector at the given offset.
Returns True if readable, False if IO Error/Hang.
"""
print(f"πŸ“‘ [SONAR] Probing {device_path} at {offset / (1024**3):.2f} GB...")
try:
fd = os.open(device_path, os.O_RDONLY | os.O_DIRECT)
os.lseek(fd, offset, os.SEEK_SET)
data = os.read(fd, 512) # Read 1 sector
os.close(fd)
if len(data) == 512:
print(" [SONAR] Ping successful! Water is clear.")
return True
except OSError:
print(" [SONAR] Hit rock! (IO Error)")
return False
except Exception as e:
print(f" [SONAR] Error: {e}")
return False
return False
async def main():
current_skip_offset = 1990 * 1024 * 1024 * 1024 # START WHERE WE KNOW IT WORKS (1.99TB)
while True:
# 1. WAIT FOR DRIVE
print("\n⏳ [WAIT] Looking for drive...")
drive_path = None
while drive_path is None:
drive_path = find_drive()
if drive_path:
print(f"βœ… [FOUND] Target detected at {drive_path}")
# Wait for /dev node to settle
time.sleep(3)
if not os.path.exists(drive_path):
print(" [GHOST] Drive vanished immediately.")
drive_path = None
else:
await asyncio.sleep(2)
# 2. PROBE PHASE (Dynamic Skipping)
# We try to read at 'current_skip_offset'. If fail, we add 10GB and try again.
# This loop continues until we find a good spot OR we run off the end of the drive.
while True:
if current_skip_offset >= DRIVE_SIZE_BYTES:
print("🏁 [FINISH] Reached end of drive capacity.")
# Optional: Loop back to start or fill gaps?
# For now, let's just stop or reset to 0 to fill gaps.
# current_skip_offset = 0
return
is_good = probe_drive(drive_path, current_skip_offset)
if is_good:
break # Found a good spot! Exit Probe loop and start ddrescue.
else:
print(f"⏭️ [SKIP] Jumping forward {SKIP_SIZE_BYTES / (1024**3)} GB...")
current_skip_offset += SKIP_SIZE_BYTES
# Force power cycle if probe failed? usually probing doesn't kill it,
# but if it hung, we might need to.
# For safety, if probe fails, we assume we are deep in bad zone.
# Just increment and try next spot without reboot unless probing hung.
# 3. EXTRACTION PHASE
print(f"πŸš€ [LAUNCH] Starting ddrescue at {current_skip_offset}...")
# We run ddrescue but force it to start at our known-good offset
# We use 'timeout' via subprocess to enforce the thermal limit
cmd = [
"timeout", "-s", "INT", str(RUN_LIMIT_SECONDS),
"ddrescue", "-n", "-d", "-r", "0", "-c", "64",
f"--input-position={current_skip_offset}",
drive_path, IMG_FILE, MAP_FILE
]
try:
# Run ddrescue and wait for it
process = subprocess.run(cmd)
# Check exit code
if process.returncode == 0:
print("πŸŽ‰ [SUCCESS] ddrescue finished a run successfully.")
# If it finished naturally, it reached the end of the drive!
# We can probably exit or reset offset to 0 to scan for gaps.
break
elif process.returncode == 124: # Timeout (Normal behavior for us)
print("clock [TIME] Thermal pause triggered.")
else:
print(f"⚠️ [WARN] ddrescue exited with code {process.returncode}")
# If it crashed, we advance our offset slightly for the next run
# or rely on the Mapfile + Probe to find the next spot.
except Exception as e:
print(f"❌ [ERROR] subprocess failed: {e}")
# 4. COOL DOWN / RESET
# Update our 'current_skip_offset' based on the mapfile?
# Actually, if ddrescue ran for a bit, it advanced the map.
# But if it crashed, we want to skip ahead.
# For simplicity in this 'Forward Only' Logic:
# We assume ddrescue worked for a bit. We will stick to the Mapfile next time,
# UNLESS probing fails.
# NOTE: To truly be dynamic, we would parse the mapfile here to see where it stopped.
# But simply Power Cycling and letting the Probe check the *last used* offset
# is a safe loop.
print(f"❄️ [COOL] Resting for {COOL_DOWN_SECONDS}s...")
await asyncio.sleep(COOL_DOWN_SECONDS)
# Power Cycle to reset controller state for next round
await cycle_power()
# Update Start Position for next loop?
# If we successfully imaged a chunk, ddrescue mapfile knows it.
# We can set current_skip_offset to the *next gap*.
# For now, let's just increment slightly or keep it; the Probe will confirm if it's readable.
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment