Last active
December 14, 2025 23:45
-
-
Save watson0x90/fc50fc0aa403e44e059fd587c029b558 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import os | |
| import subprocess | |
| import time | |
| import sys | |
| import shutil | |
| import argparse | |
| import re | |
| from kasa import Discover, Credentials | |
| # ========================================== | |
| # βοΈ USER CONFIGURATION | |
| # ========================================== | |
| # KASA PLUG CREDENTIALS | |
| DEVICE_IP = "192.168.1.144" | |
| USERNAME = "" # <--- ENTER YOUR EMAIL | |
| PASSWORD = "" # <--- ENTER YOUR PASSWORD | |
| # HARDWARE ID | |
| TARGET_VID = "0634" # Crucial Vendor ID | |
| # PATHS | |
| IMG_FILE = "/mnt/recovery_drive/sniper.img" | |
| MAP_FILE = "/mnt/recovery_drive/sniper.map" | |
| # DEFAULT SEARCH LIMITS (If not overridden by flags) | |
| DEFAULT_START = 1200 * 1024**3 # 1.2 TB | |
| DEFAULT_END = 1999 * 1024**3 # 1.99 TB | |
| # JUMP SETTINGS | |
| MIN_JUMP = 1 * 1024**3 # 1 GB | |
| MAX_JUMP = 50 * 1024**3 # 50 GB | |
| # TIMING | |
| PROBE_TIMEOUT = 5 # Seconds to wait for a dd probe | |
| RUN_DURATION = 90 # Seconds to run ddrescue | |
| COOL_DOWN = 60 # Seconds to turn OFF and cool down | |
| # ========================================== | |
| # π οΈ UTILITIES & MAP PARSER | |
| # ========================================== | |
| def parse_size(size_str): | |
| """Converts strings like '1.5T', '500G' into bytes.""" | |
| size_str = size_str.upper().strip() | |
| match = re.match(r"([0-9\.]+)([TGMK]?)", size_str) | |
| if not match: return 0 | |
| val, unit = match.groups() | |
| val = float(val) | |
| if unit == 'T': return int(val * 1024**4) | |
| if unit == 'G': return int(val * 1024**3) | |
| if unit == 'M': return int(val * 1024**2) | |
| if unit == 'K': return int(val * 1024) | |
| return int(val) | |
| def get_last_mapfile_pos(map_path): | |
| """ | |
| Parses your ddrescue mapfile to find the absolute furthest point | |
| you have attempted (successfully or failed). | |
| """ | |
| if not os.path.exists(map_path): | |
| print(" [MAP] No mapfile found. Starting from scratch.") | |
| return 0 | |
| max_pos = 0 | |
| try: | |
| with open(map_path, 'r') as f: | |
| for line in f: | |
| if line.startswith('#'): continue | |
| parts = line.strip().split() | |
| if len(parts) < 3: continue | |
| # Line Format: Position Size Status | |
| try: | |
| pos = int(parts[0], 16) | |
| size = int(parts[1], 16) | |
| status = parts[2] | |
| except ValueError: continue | |
| end_pos = pos + size | |
| # We look for the highest block that has been touched (+, -, *, /) | |
| if status in ['+', '-', '*', '/']: | |
| if end_pos > max_pos: | |
| max_pos = end_pos | |
| print(f" [MAP] Analysis Complete. Highest recovered block ends at: {max_pos / 1024**3:.3f} TB") | |
| return max_pos | |
| except Exception as e: | |
| print(f"β οΈ Error parsing mapfile: {e}") | |
| return 0 | |
| async def cycle_power(off_duration=10): | |
| """Authenticates and cycles the smart plug.""" | |
| print(f"\nπ [POWER] Connecting to Smart Plug at {DEVICE_IP}...") | |
| try: | |
| creds = Credentials(USERNAME, PASSWORD) | |
| # Using Discover logic as requested | |
| dev = await Discover.discover_single(DEVICE_IP, credentials=creds) | |
| await dev.update() | |
| print(f" [POWER] Turning OFF for {off_duration}s thermal rest...") | |
| await dev.turn_off() | |
| # Visual Countdown | |
| for i in range(off_duration, 0, -1): | |
| sys.stdout.write(f"\r βοΈ Cooling: {i}s... ") | |
| sys.stdout.flush() | |
| await asyncio.sleep(1) | |
| print("") # Newline | |
| print(f" [POWER] Turning ON...") | |
| await dev.turn_on() | |
| print(f" [POWER] Waiting 15s for drive boot...") | |
| await asyncio.sleep(15) | |
| return True | |
| except Exception as e: | |
| print(f"β [ERROR] Kasa Failed: {e}") | |
| await asyncio.sleep(5) | |
| return False | |
| def find_drive(): | |
| """Finds /dev/sdX based on Vendor ID.""" | |
| for letter in "cdefgh": | |
| dev = f"sd{letter}" | |
| path = f"/sys/block/{dev}/device" | |
| if not os.path.exists(path): continue | |
| try: | |
| cmd = f"udevadm info -a -n /dev/{dev} | grep {TARGET_VID}" | |
| res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) | |
| if res.returncode == 0: return f"/dev/{dev}" | |
| except: continue | |
| return None | |
| def probe_with_dd(device_path, offset_bytes): | |
| """Runs dd to test if a specific sector is readable.""" | |
| gb_pos = offset_bytes / (1024**3) | |
| print(f"π‘ [SONAR] Pinging {gb_pos:.3f} TB... ", end="", flush=True) | |
| cmd = [ | |
| "dd", f"if={device_path}", "of=/dev/null", | |
| "bs=512", "count=1", f"skip={offset_bytes}", | |
| "iflag=skip_bytes", "status=none" | |
| ] | |
| try: | |
| res = subprocess.run(cmd, timeout=PROBE_TIMEOUT) | |
| if res.returncode == 0: | |
| print("β ALIVE!") | |
| return True | |
| else: | |
| print("β Dead (Exit 1)") | |
| return False | |
| except subprocess.TimeoutExpired: | |
| print("β Dead (Timeout)") | |
| return False | |
| except Exception: | |
| print("β οΈ Error") | |
| return False | |
| # ========================================== | |
| # π MODES | |
| # ========================================== | |
| async def test_kasa(): | |
| """Mode -k: Tests the smart plug connection then exits.""" | |
| print("π§ͺ [TEST MODE] Testing Kasa Connection...") | |
| print(f" Target: {DEVICE_IP}") | |
| print(f" User: {USERNAME}") | |
| success = await cycle_power(off_duration=5) | |
| if success: | |
| print("\nβ SUCCESS: Smart Plug is communicating and cycling correctly.") | |
| else: | |
| print("\nβ FAILURE: Check IP, Username, and Password.") | |
| async def run_recovery(start_offset, end_offset): | |
| """Mode -r: The Island Hunter Loop.""" | |
| # 1. Mapfile Backup | |
| if os.path.exists(MAP_FILE): | |
| print("πΎ Backing up mapfile...", end="") | |
| shutil.copy(MAP_FILE, f"{MAP_FILE}.bak") | |
| print(" Done.") | |
| current_offset = start_offset | |
| current_jump = MIN_JUMP | |
| consecutive_fails = 0 | |
| print("------------------------------------------------") | |
| print("ποΈ SNIPER PRO ACTIVATED") | |
| print(f" Start: {start_offset/1024**3:.3f} TB") | |
| print(f" End: {end_offset/1024**3:.3f} TB") | |
| print("------------------------------------------------") | |
| while current_offset < end_offset: | |
| # 2. Acquire Drive | |
| drive = None | |
| while not drive: | |
| drive = find_drive() | |
| if not drive: await asyncio.sleep(2) | |
| # 3. Probe | |
| is_alive = probe_with_dd(drive, current_offset) | |
| if is_alive: | |
| # --- HIT --- | |
| print(f"π― [HIT] Found Data! Launching ddrescue...") | |
| current_jump = MIN_JUMP | |
| consecutive_fails = 0 | |
| cmd = [ | |
| "timeout", "-s", "INT", str(RUN_DURATION), | |
| "ddrescue", "-n", "-d", "-r", "0", "-c", "64", | |
| f"--input-position={current_offset}", | |
| drive, IMG_FILE, MAP_FILE | |
| ] | |
| subprocess.run(cmd) | |
| # Thermal Shutdown | |
| await cycle_power(off_duration=COOL_DOWN) | |
| # Advance to avoid re-probing the same start | |
| current_offset += (500 * 1024 * 1024) | |
| else: | |
| # --- MISS --- | |
| print(f"β© [SKIP] Jumping {current_jump/1024**3:.2f} GB") | |
| current_offset += current_jump | |
| consecutive_fails += 1 | |
| # Acceleration Logic | |
| if consecutive_fails >= 3: | |
| current_jump = min(current_jump * 2, MAX_JUMP) | |
| # Safety Reset | |
| if consecutive_fails > 20: | |
| print("β οΈ [RESET] Too many sequential fails. Resetting controller...") | |
| await cycle_power(off_duration=10) | |
| consecutive_fails = 0 | |
| # ========================================== | |
| # π MAIN ENTRY | |
| # ========================================== | |
| if __name__ == "__main__": | |
| # Root Check | |
| if os.geteuid() != 0: | |
| sys.exit("β ERROR: This script must be run as root (sudo).") | |
| # Arguments | |
| parser = argparse.ArgumentParser(description="Sniper Pro: Adaptive Data Recovery") | |
| # Mode flags | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument('-r', '--recover', action='store_true', help="Start Recovery Mode") | |
| group.add_argument('-k', '--kasa-test', action='store_true', help="Test Smart Plug Connection") | |
| # Positioning flags | |
| parser.add_argument('--start', type=str, help="Start offset (e.g., 1.5T, 1500G)") | |
| parser.add_argument('--end', type=str, help="End offset (e.g., 2T)") | |
| parser.add_argument('--resume', action='store_true', help="Auto-detect start from mapfile") | |
| args = parser.parse_args() | |
| # Determine Start/End | |
| start_pos = DEFAULT_START | |
| end_pos = DEFAULT_END | |
| if args.start: | |
| start_pos = parse_size(args.start) | |
| elif args.resume: | |
| print("π Scanning mapfile for resume point...") | |
| # Fixed: Name matches definition now | |
| found_pos = get_last_mapfile_pos(MAP_FILE) | |
| if found_pos > 0: | |
| print(f" Resuming from: {found_pos/1024**3:.3f} TB") | |
| start_pos = found_pos | |
| else: | |
| print(f" Mapfile empty. Using default {start_pos/1024**3:.3f} TB") | |
| if args.end: | |
| end_pos = parse_size(args.end) | |
| try: | |
| if args.kasa_test: | |
| asyncio.run(test_kasa()) | |
| elif args.recover: | |
| asyncio.run(run_recovery(start_pos, end_pos)) | |
| except KeyboardInterrupt: | |
| print("\nπ Stopped by user.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment