Created
December 14, 2025 22:08
-
-
Save watson0x90/efe012e6f3eae397a06ca53cedbe87a0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import os | |
| import subprocess | |
| import time | |
| import re | |
| from kasa import Discover, Credentials | |
| # --- CONFIGURATION --- | |
| TARGET_VID = "0634" # Crucial Vendor ID | |
| TARGET_PID = "5603" # Crucial Product ID | |
| KASA_IP = "192.168.1.144" | |
| IMG_FILE = "/mnt/recovery_drive/sniper.img" | |
| MAP_FILE = "/mnt/recovery_drive/sniper.map" | |
| # SKIP SETTINGS | |
| SKIP_SIZE_BYTES = 10 * 1024 * 1024 * 1024 # 10 GB Jumps | |
| DRIVE_SIZE_BYTES = 2000398934016 # 2TB (Approx, can be adjusted) | |
| # TIMING | |
| RUN_LIMIT_SECONDS = 90 # Prevent overheating | |
| COOL_DOWN_SECONDS = 60 # Let controller rest | |
| async def cycle_power(): | |
| print(f"\nπ [POWER] Connecting to Smart Plug at {KASA_IP}...") | |
| try: | |
| dev = await Discover.discover_single(KASA_IP) | |
| await dev.update() | |
| print(f" [POWER] Turning OFF...") | |
| await dev.turn_off() | |
| await asyncio.sleep(10) # Hard wait for capacitors to drain | |
| print(f" [POWER] Turning ON...") | |
| await dev.turn_on() | |
| print(f" [POWER] Cycle Complete. Waiting for drive to boot...") | |
| except Exception as e: | |
| print(f"β [ERROR] Kasa failed: {e}") | |
| def find_drive(): | |
| """Scans /sys/class/scsi_device to find the sdX with matching VID/PID.""" | |
| # Simple check of all sd[c-z] devices | |
| for letter in "cdefgh": | |
| dev = f"sd{letter}" | |
| path = f"/sys/block/{dev}/device" | |
| if not os.path.exists(path): | |
| continue | |
| # Check Vendor/Model via udevadm or sysfs | |
| try: | |
| # Quick sysfs check | |
| with open(f"/sys/block/{dev}/device/vendor", "r") as f: | |
| vendor = f.read().strip() | |
| with open(f"/sys/block/{dev}/device/model", "r") as f: | |
| model = f.read().strip() | |
| # You might need to adjust these strings based on exact 'lsusb' output, | |
| # but usually VID/PID is safer via udevadm. | |
| # Let's use the udevadm command you trusted in bash: | |
| cmd = f"udevadm info -a -n /dev/{dev} | grep {TARGET_VID}" | |
| res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE) | |
| if res.returncode == 0: | |
| return f"/dev/{dev}" | |
| except: | |
| continue | |
| return None | |
| def get_resume_point(map_file_path): | |
| """ | |
| Parses the mapfile to find the last 'non-finished' position. | |
| This is complex, so for this logic, we will rely on a simpler | |
| 'Last Attempted' pointer that we maintain in memory or | |
| start from the End of the last successful block. | |
| """ | |
| # If mapfile doesn't exist, start at 0 | |
| if not os.path.exists(map_file_path): | |
| return 0 | |
| # Read the mapfile to find the highest "finished" block? | |
| # Or simply return the value we want to skip FROM. | |
| # For this script, we will allow the user to override, | |
| # but initially we will just use ddrescue's --input-position logic. | |
| return None # We will let ddrescue handle the map unless we force a skip. | |
| def probe_drive(device_path, offset): | |
| """ | |
| The Sonar: Attempts to read 1 sector at the given offset. | |
| Returns True if readable, False if IO Error/Hang. | |
| """ | |
| print(f"π‘ [SONAR] Probing {device_path} at {offset / (1024**3):.2f} GB...") | |
| try: | |
| fd = os.open(device_path, os.O_RDONLY | os.O_DIRECT) | |
| os.lseek(fd, offset, os.SEEK_SET) | |
| data = os.read(fd, 512) # Read 1 sector | |
| os.close(fd) | |
| if len(data) == 512: | |
| print(" [SONAR] Ping successful! Water is clear.") | |
| return True | |
| except OSError: | |
| print(" [SONAR] Hit rock! (IO Error)") | |
| return False | |
| except Exception as e: | |
| print(f" [SONAR] Error: {e}") | |
| return False | |
| return False | |
| async def main(): | |
| current_skip_offset = 1990 * 1024 * 1024 * 1024 # START WHERE WE KNOW IT WORKS (1.99TB) | |
| while True: | |
| # 1. WAIT FOR DRIVE | |
| print("\nβ³ [WAIT] Looking for drive...") | |
| drive_path = None | |
| while drive_path is None: | |
| drive_path = find_drive() | |
| if drive_path: | |
| print(f"β [FOUND] Target detected at {drive_path}") | |
| # Wait for /dev node to settle | |
| time.sleep(3) | |
| if not os.path.exists(drive_path): | |
| print(" [GHOST] Drive vanished immediately.") | |
| drive_path = None | |
| else: | |
| await asyncio.sleep(2) | |
| # 2. PROBE PHASE (Dynamic Skipping) | |
| # We try to read at 'current_skip_offset'. If fail, we add 10GB and try again. | |
| # This loop continues until we find a good spot OR we run off the end of the drive. | |
| while True: | |
| if current_skip_offset >= DRIVE_SIZE_BYTES: | |
| print("π [FINISH] Reached end of drive capacity.") | |
| # Optional: Loop back to start or fill gaps? | |
| # For now, let's just stop or reset to 0 to fill gaps. | |
| # current_skip_offset = 0 | |
| return | |
| is_good = probe_drive(drive_path, current_skip_offset) | |
| if is_good: | |
| break # Found a good spot! Exit Probe loop and start ddrescue. | |
| else: | |
| print(f"βοΈ [SKIP] Jumping forward {SKIP_SIZE_BYTES / (1024**3)} GB...") | |
| current_skip_offset += SKIP_SIZE_BYTES | |
| # Force power cycle if probe failed? usually probing doesn't kill it, | |
| # but if it hung, we might need to. | |
| # For safety, if probe fails, we assume we are deep in bad zone. | |
| # Just increment and try next spot without reboot unless probing hung. | |
| # 3. EXTRACTION PHASE | |
| print(f"π [LAUNCH] Starting ddrescue at {current_skip_offset}...") | |
| # We run ddrescue but force it to start at our known-good offset | |
| # We use 'timeout' via subprocess to enforce the thermal limit | |
| cmd = [ | |
| "timeout", "-s", "INT", str(RUN_LIMIT_SECONDS), | |
| "ddrescue", "-n", "-d", "-r", "0", "-c", "64", | |
| f"--input-position={current_skip_offset}", | |
| drive_path, IMG_FILE, MAP_FILE | |
| ] | |
| try: | |
| # Run ddrescue and wait for it | |
| process = subprocess.run(cmd) | |
| # Check exit code | |
| if process.returncode == 0: | |
| print("π [SUCCESS] ddrescue finished a run successfully.") | |
| # If it finished naturally, it reached the end of the drive! | |
| # We can probably exit or reset offset to 0 to scan for gaps. | |
| break | |
| elif process.returncode == 124: # Timeout (Normal behavior for us) | |
| print("clock [TIME] Thermal pause triggered.") | |
| else: | |
| print(f"β οΈ [WARN] ddrescue exited with code {process.returncode}") | |
| # If it crashed, we advance our offset slightly for the next run | |
| # or rely on the Mapfile + Probe to find the next spot. | |
| except Exception as e: | |
| print(f"β [ERROR] subprocess failed: {e}") | |
| # 4. COOL DOWN / RESET | |
| # Update our 'current_skip_offset' based on the mapfile? | |
| # Actually, if ddrescue ran for a bit, it advanced the map. | |
| # But if it crashed, we want to skip ahead. | |
| # For simplicity in this 'Forward Only' Logic: | |
| # We assume ddrescue worked for a bit. We will stick to the Mapfile next time, | |
| # UNLESS probing fails. | |
| # NOTE: To truly be dynamic, we would parse the mapfile here to see where it stopped. | |
| # But simply Power Cycling and letting the Probe check the *last used* offset | |
| # is a safe loop. | |
| print(f"βοΈ [COOL] Resting for {COOL_DOWN_SECONDS}s...") | |
| await asyncio.sleep(COOL_DOWN_SECONDS) | |
| # Power Cycle to reset controller state for next round | |
| await cycle_power() | |
| # Update Start Position for next loop? | |
| # If we successfully imaged a chunk, ddrescue mapfile knows it. | |
| # We can set current_skip_offset to the *next gap*. | |
| # For now, let's just increment slightly or keep it; the Probe will confirm if it's readable. | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment