Skip to content

Instantly share code, notes, and snippets.

@watson0x90
Last active December 14, 2025 23:45
Show Gist options
  • Select an option

  • Save watson0x90/fc50fc0aa403e44e059fd587c029b558 to your computer and use it in GitHub Desktop.

Select an option

Save watson0x90/fc50fc0aa403e44e059fd587c029b558 to your computer and use it in GitHub Desktop.
import asyncio
import os
import subprocess
import time
import sys
import shutil
import argparse
import re
from kasa import Discover, Credentials
# ==========================================
# βš™οΈ USER CONFIGURATION
# ==========================================
# KASA PLUG CREDENTIALS
DEVICE_IP = "192.168.1.144"
USERNAME = "" # <--- ENTER YOUR EMAIL
PASSWORD = "" # <--- ENTER YOUR PASSWORD
# HARDWARE ID
TARGET_VID = "0634" # Crucial Vendor ID
# PATHS
IMG_FILE = "/mnt/recovery_drive/sniper.img"
MAP_FILE = "/mnt/recovery_drive/sniper.map"
# DEFAULT SEARCH LIMITS (If not overridden by flags)
DEFAULT_START = 1200 * 1024**3 # 1.2 TB
DEFAULT_END = 1999 * 1024**3 # 1.99 TB
# JUMP SETTINGS
MIN_JUMP = 1 * 1024**3 # 1 GB
MAX_JUMP = 50 * 1024**3 # 50 GB
# TIMING
PROBE_TIMEOUT = 5 # Seconds to wait for a dd probe
RUN_DURATION = 90 # Seconds to run ddrescue
COOL_DOWN = 60 # Seconds to turn OFF and cool down
# ==========================================
# πŸ› οΈ UTILITIES & MAP PARSER
# ==========================================
def parse_size(size_str):
"""Converts strings like '1.5T', '500G' into bytes."""
size_str = size_str.upper().strip()
match = re.match(r"([0-9\.]+)([TGMK]?)", size_str)
if not match: return 0
val, unit = match.groups()
val = float(val)
if unit == 'T': return int(val * 1024**4)
if unit == 'G': return int(val * 1024**3)
if unit == 'M': return int(val * 1024**2)
if unit == 'K': return int(val * 1024)
return int(val)
def get_last_mapfile_pos(map_path):
"""
Parses your ddrescue mapfile to find the absolute furthest point
you have attempted (successfully or failed).
"""
if not os.path.exists(map_path):
print(" [MAP] No mapfile found. Starting from scratch.")
return 0
max_pos = 0
try:
with open(map_path, 'r') as f:
for line in f:
if line.startswith('#'): continue
parts = line.strip().split()
if len(parts) < 3: continue
# Line Format: Position Size Status
try:
pos = int(parts[0], 16)
size = int(parts[1], 16)
status = parts[2]
except ValueError: continue
end_pos = pos + size
# We look for the highest block that has been touched (+, -, *, /)
if status in ['+', '-', '*', '/']:
if end_pos > max_pos:
max_pos = end_pos
print(f" [MAP] Analysis Complete. Highest recovered block ends at: {max_pos / 1024**3:.3f} TB")
return max_pos
except Exception as e:
print(f"⚠️ Error parsing mapfile: {e}")
return 0
async def cycle_power(off_duration=10):
"""Authenticates and cycles the smart plug."""
print(f"\nπŸ”Œ [POWER] Connecting to Smart Plug at {DEVICE_IP}...")
try:
creds = Credentials(USERNAME, PASSWORD)
# Using Discover logic as requested
dev = await Discover.discover_single(DEVICE_IP, credentials=creds)
await dev.update()
print(f" [POWER] Turning OFF for {off_duration}s thermal rest...")
await dev.turn_off()
# Visual Countdown
for i in range(off_duration, 0, -1):
sys.stdout.write(f"\r ❄️ Cooling: {i}s... ")
sys.stdout.flush()
await asyncio.sleep(1)
print("") # Newline
print(f" [POWER] Turning ON...")
await dev.turn_on()
print(f" [POWER] Waiting 15s for drive boot...")
await asyncio.sleep(15)
return True
except Exception as e:
print(f"❌ [ERROR] Kasa Failed: {e}")
await asyncio.sleep(5)
return False
def find_drive():
"""Finds /dev/sdX based on Vendor ID."""
for letter in "cdefgh":
dev = f"sd{letter}"
path = f"/sys/block/{dev}/device"
if not os.path.exists(path): continue
try:
cmd = f"udevadm info -a -n /dev/{dev} | grep {TARGET_VID}"
res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE)
if res.returncode == 0: return f"/dev/{dev}"
except: continue
return None
def probe_with_dd(device_path, offset_bytes):
"""Runs dd to test if a specific sector is readable."""
gb_pos = offset_bytes / (1024**3)
print(f"πŸ“‘ [SONAR] Pinging {gb_pos:.3f} TB... ", end="", flush=True)
cmd = [
"dd", f"if={device_path}", "of=/dev/null",
"bs=512", "count=1", f"skip={offset_bytes}",
"iflag=skip_bytes", "status=none"
]
try:
res = subprocess.run(cmd, timeout=PROBE_TIMEOUT)
if res.returncode == 0:
print("βœ… ALIVE!")
return True
else:
print("❌ Dead (Exit 1)")
return False
except subprocess.TimeoutExpired:
print("❌ Dead (Timeout)")
return False
except Exception:
print("⚠️ Error")
return False
# ==========================================
# πŸš€ MODES
# ==========================================
async def test_kasa():
"""Mode -k: Tests the smart plug connection then exits."""
print("πŸ§ͺ [TEST MODE] Testing Kasa Connection...")
print(f" Target: {DEVICE_IP}")
print(f" User: {USERNAME}")
success = await cycle_power(off_duration=5)
if success:
print("\nβœ… SUCCESS: Smart Plug is communicating and cycling correctly.")
else:
print("\n❌ FAILURE: Check IP, Username, and Password.")
async def run_recovery(start_offset, end_offset):
"""Mode -r: The Island Hunter Loop."""
# 1. Mapfile Backup
if os.path.exists(MAP_FILE):
print("πŸ’Ύ Backing up mapfile...", end="")
shutil.copy(MAP_FILE, f"{MAP_FILE}.bak")
print(" Done.")
current_offset = start_offset
current_jump = MIN_JUMP
consecutive_fails = 0
print("------------------------------------------------")
print("🏝️ SNIPER PRO ACTIVATED")
print(f" Start: {start_offset/1024**3:.3f} TB")
print(f" End: {end_offset/1024**3:.3f} TB")
print("------------------------------------------------")
while current_offset < end_offset:
# 2. Acquire Drive
drive = None
while not drive:
drive = find_drive()
if not drive: await asyncio.sleep(2)
# 3. Probe
is_alive = probe_with_dd(drive, current_offset)
if is_alive:
# --- HIT ---
print(f"🎯 [HIT] Found Data! Launching ddrescue...")
current_jump = MIN_JUMP
consecutive_fails = 0
cmd = [
"timeout", "-s", "INT", str(RUN_DURATION),
"ddrescue", "-n", "-d", "-r", "0", "-c", "64",
f"--input-position={current_offset}",
drive, IMG_FILE, MAP_FILE
]
subprocess.run(cmd)
# Thermal Shutdown
await cycle_power(off_duration=COOL_DOWN)
# Advance to avoid re-probing the same start
current_offset += (500 * 1024 * 1024)
else:
# --- MISS ---
print(f"⏩ [SKIP] Jumping {current_jump/1024**3:.2f} GB")
current_offset += current_jump
consecutive_fails += 1
# Acceleration Logic
if consecutive_fails >= 3:
current_jump = min(current_jump * 2, MAX_JUMP)
# Safety Reset
if consecutive_fails > 20:
print("⚠️ [RESET] Too many sequential fails. Resetting controller...")
await cycle_power(off_duration=10)
consecutive_fails = 0
# ==========================================
# 🏁 MAIN ENTRY
# ==========================================
if __name__ == "__main__":
# Root Check
if os.geteuid() != 0:
sys.exit("❌ ERROR: This script must be run as root (sudo).")
# Arguments
parser = argparse.ArgumentParser(description="Sniper Pro: Adaptive Data Recovery")
# Mode flags
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-r', '--recover', action='store_true', help="Start Recovery Mode")
group.add_argument('-k', '--kasa-test', action='store_true', help="Test Smart Plug Connection")
# Positioning flags
parser.add_argument('--start', type=str, help="Start offset (e.g., 1.5T, 1500G)")
parser.add_argument('--end', type=str, help="End offset (e.g., 2T)")
parser.add_argument('--resume', action='store_true', help="Auto-detect start from mapfile")
args = parser.parse_args()
# Determine Start/End
start_pos = DEFAULT_START
end_pos = DEFAULT_END
if args.start:
start_pos = parse_size(args.start)
elif args.resume:
print("πŸ” Scanning mapfile for resume point...")
# Fixed: Name matches definition now
found_pos = get_last_mapfile_pos(MAP_FILE)
if found_pos > 0:
print(f" Resuming from: {found_pos/1024**3:.3f} TB")
start_pos = found_pos
else:
print(f" Mapfile empty. Using default {start_pos/1024**3:.3f} TB")
if args.end:
end_pos = parse_size(args.end)
try:
if args.kasa_test:
asyncio.run(test_kasa())
elif args.recover:
asyncio.run(run_recovery(start_pos, end_pos))
except KeyboardInterrupt:
print("\nπŸ›‘ Stopped by user.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment