Created
December 16, 2025 14:01
-
-
Save watson0x90/3ff6748bc75a4970b078a59064b568bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import os | |
| import subprocess | |
| import time | |
| import sys | |
| import shutil | |
| import argparse | |
| from kasa import Discover, Credentials | |
| # ========================================== | |
| # βοΈ USER CONFIGURATION | |
| # ========================================== | |
| # KASA PLUG CREDENTIALS | |
| DEVICE_IP = "192.168.1.144" | |
| USERNAME = "" # <--- ENTER EMAIL | |
| PASSWORD = "" # <--- ENTER PASSWORD | |
| # HARDWARE ID | |
| TARGET_VID = "0634" # Crucial Vendor ID | |
| # PATHS | |
| IMG_FILE = "/mnt/recovery_drive/sniper.img" | |
| MAP_FILE = "/mnt/recovery_drive/sniper.map" | |
| # SETTINGS | |
| PROBE_TIMEOUT = 5 # Seconds to wait for a dd probe | |
| RUN_DURATION = 45 # Shorter runs (45s) since we are hitting bad areas often | |
| COOL_DOWN = 60 # Seconds to turn OFF and cool down | |
| SKIP_SIZE = 512 * 1024 # 512 KB Jumps (High Precision for cleanup) | |
| # ========================================== | |
| # π οΈ UTILITIES & MAP PARSER | |
| # ========================================== | |
| def get_cleanup_targets(map_path): | |
| """ | |
| Parses mapfile for '?' (Non-tried), '*' (Non-trimmed), and '/' (Non-scraped). | |
| Returns a list of (start, end) tuples. | |
| """ | |
| if not os.path.exists(map_path): | |
| print("β Mapfile not found.") | |
| return [] | |
| targets = [] | |
| try: | |
| with open(map_path, 'r') as f: | |
| for line in f: | |
| if line.startswith('#'): continue | |
| parts = line.strip().split() | |
| if len(parts) < 3: continue | |
| try: | |
| pos = int(parts[0], 16) | |
| size = int(parts[1], 16) | |
| status = parts[2] | |
| except ValueError: continue | |
| # TARGET LOGIC: | |
| # ? = Skipped (Gray) | |
| # * = Failed/Non-trimmed (Yellow) | |
| # / = Failed/Non-scraped (Yellow) | |
| if status in ['?', '*', '/']: | |
| targets.append((pos, pos + size)) | |
| print(f"π Cleanup Analysis: Found {len(targets)} specific areas to recover.") | |
| return targets | |
| except Exception as e: | |
| print(f"β οΈ Error parsing mapfile: {e}") | |
| return [] | |
| async def cycle_power(off_duration=10): | |
| """Authenticates and cycles the smart plug.""" | |
| print(f"\nπ [POWER] Connecting to Smart Plug at {DEVICE_IP}...") | |
| try: | |
| creds = Credentials(USERNAME, PASSWORD) | |
| dev = await Discover.discover_single(DEVICE_IP, credentials=creds) | |
| await dev.update() | |
| print(f" [POWER] Turning OFF for {off_duration}s thermal rest...") | |
| await dev.turn_off() | |
| # Visual Countdown | |
| for i in range(off_duration, 0, -1): | |
| sys.stdout.write(f"\r βοΈ Cooling: {i}s... ") | |
| sys.stdout.flush() | |
| await asyncio.sleep(1) | |
| print("") | |
| print(f" [POWER] Turning ON...") | |
| await dev.turn_on() | |
| print(f" [POWER] Waiting 15s for drive boot...") | |
| await asyncio.sleep(15) | |
| return True | |
| except Exception as e: | |
| print(f"β [ERROR] Kasa Failed: {e}") | |
| await asyncio.sleep(5) | |
| return False | |
| def find_drive(): | |
| """Finds /dev/sdX based on Vendor ID.""" | |
| for letter in "cdefgh": | |
| dev = f"sd{letter}" | |
| path = f"/sys/block/{dev}/device" | |
| if not os.path.exists(path): continue | |
| try: | |
| cmd = f"udevadm info -a -n /dev/{dev} | grep {TARGET_VID}" | |
| res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) | |
| if res.returncode == 0: return f"/dev/{dev}" | |
| except: continue | |
| return None | |
| def probe_with_dd(device_path, offset_bytes): | |
| """Runs dd to test if a specific sector is readable.""" | |
| cmd = [ | |
| "dd", f"if={device_path}", "of=/dev/null", | |
| "bs=512", "count=1", f"skip={offset_bytes}", | |
| "iflag=skip_bytes", "status=none" | |
| ] | |
| try: | |
| res = subprocess.run(cmd, timeout=PROBE_TIMEOUT) | |
| if res.returncode == 0: | |
| sys.stdout.write("β ") | |
| sys.stdout.flush() | |
| return True | |
| else: | |
| sys.stdout.write("β") | |
| sys.stdout.flush() | |
| return False | |
| except subprocess.TimeoutExpired: | |
| sys.stdout.write("β³") | |
| sys.stdout.flush() | |
| return False | |
| except Exception: | |
| return False | |
| # ========================================== | |
| # π CLEANUP LOOP | |
| # ========================================== | |
| async def hunt_in_range(start_offset, end_offset): | |
| """Attacks a specific gap using micro-jumps.""" | |
| current_offset = start_offset | |
| gap_size_mb = (end_offset - start_offset) / (1024**2) | |
| print(f"\n βοΈ Scrubbing Sector ({gap_size_mb:.2f} MB) at {start_offset/1024**3:.4f} TB") | |
| sys.stdout.write(" Probing: ") | |
| while current_offset < end_offset: | |
| # 1. Acquire Drive (Silent check) | |
| drive = None | |
| while not drive: | |
| drive = find_drive() | |
| if not drive: await asyncio.sleep(2) | |
| # 2. Probe | |
| is_alive = probe_with_dd(drive, current_offset) | |
| if is_alive: | |
| print(f"\n π― [HIT] Data found! Extracting...") | |
| # Calculate remaining size of THIS specific gap | |
| size_to_read = end_offset - current_offset | |
| # We use -r 1 (Retry 1) here to encourage it to try harder on these small bits | |
| cmd = [ | |
| "timeout", "-s", "INT", str(RUN_DURATION), | |
| "ddrescue", "-n", "-d", "-r", "1", "-c", "32", | |
| f"--input-position={current_offset}", | |
| f"--size={size_to_read}", | |
| drive, IMG_FILE, MAP_FILE | |
| ] | |
| subprocess.run(cmd) | |
| # Thermal Shutdown after success | |
| await cycle_power(off_duration=COOL_DOWN) | |
| # Advance 1MB | |
| current_offset += (1024 * 1024) | |
| print(f" Resume Probing: ", end="") | |
| else: | |
| # MISS - Jump forward small amount (512KB) | |
| current_offset += SKIP_SIZE | |
| if current_offset >= end_offset: | |
| print(" -> Done.") | |
| return | |
| async def run_cleanup(): | |
| """Main cleanup coordinator.""" | |
| if os.path.exists(MAP_FILE): | |
| print("πΎ Backing up mapfile...", end="") | |
| shutil.copy(MAP_FILE, f"{MAP_FILE}.bak") | |
| print(" Done.") | |
| targets = get_cleanup_targets(MAP_FILE) | |
| if not targets: | |
| print("β No cleanup targets found! You might be done.") | |
| return | |
| print(f"π Mission: Scrub {len(targets)} imperfect areas.") | |
| for i, (start, end) in enumerate(targets): | |
| print(f"πΉ Target {i+1}/{len(targets)}...", end="") | |
| await hunt_in_range(start, end) | |
| print("\nπ Cleanup Pass Complete.") | |
| if __name__ == "__main__": | |
| if os.geteuid() != 0: | |
| sys.exit("β ERROR: Must run as root (sudo).") | |
| try: | |
| asyncio.run(run_cleanup()) | |
| except KeyboardInterrupt: | |
| print("\nπ Stopped by user.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment