Last active
November 26, 2024 18:42
-
-
Save iahuang/cc0399ba349bc3160720257e4820535b to your computer and use it in GitHub Desktop.
A command line tool for Kali Linux that uses apfs-fuse to crack encrypted MacOS drives. Run with -h to see usage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from subprocess import Popen, PIPE, STDOUT | |
from multiprocessing import Process, Queue | |
import queue | |
import os | |
import re | |
from termcolor import colored | |
import argparse | |
import time | |
from datetime import datetime | |
from dataclasses import dataclass | |
parser = argparse.ArgumentParser(description='Crack drive partitions encrypted with FileVault') | |
parser.add_argument( | |
"disk", help="the disk to decrypt. use fdisk -l to list all available disks") | |
parser.add_argument( | |
"wordlist", help="a plaintext file containing a list of passwords to try") | |
parser.add_argument("-n", help="number of threads to use", | |
action="store", dest="numthreads", type=int, default=8) | |
parser.add_argument("-v", help="show passwords being tried", | |
action="store_true", dest="verbose", default=False) | |
def am_i_root(): | |
return os.getuid() == 0 | |
try: | |
os.mkdir("/tmp/mount") | |
except OSError as e: | |
pass | |
def initial_test(disk): | |
p = Popen(["./apfs-fuse", disk, "/tmp/mount"], | |
stdout=PIPE, stdin=PIPE, stderr=PIPE) | |
r = p.communicate(input=bytes("", "utf8"))[0] | |
return p.returncode | |
def try_password(password, disk): | |
p = Popen(["./apfs-fuse", disk, "/tmp/mount"], | |
stdout=PIPE, stdin=PIPE, stderr=PIPE) | |
r = p.communicate(input=bytes(password, "utf8"))[0] | |
return not p.returncode | |
@dataclass | |
class WorkerStatus: | |
thread: int = 0 | |
try_rate: float = 0 | |
found: str = "" | |
progress: int = 0 | |
ran_out: bool = False | |
def worker(i, words, q, args): | |
start = time.time() | |
num_tried = 0 | |
found_password = None | |
for word in words: | |
if args.verbose: | |
print( | |
colored(f"[thread {str(i).zfill(2)}]", "magenta"), | |
"trying password", | |
colored(f'"{word}"', "cyan") | |
) | |
if try_password(word, args.disk): | |
found_password = word | |
q.put(WorkerStatus(thread=i, try_rate=num_tried / (time.time()-start), found=found_password, progress=num_tried)) | |
if found_password: | |
break | |
num_tried += 1 | |
if not found_password: | |
q.put(WorkerStatus(ran_out=True)) | |
def chunkIt(seq, num): | |
avg = len(seq) / float(num) | |
out = [] | |
last = 0.0 | |
while last < len(seq): | |
out.append(seq[int(last):int(last + avg)]) | |
last += avg | |
return out | |
if __name__ == "__main__": | |
if not am_i_root(): | |
print(colored("error: script must be run as root", "red")) | |
exit(1) | |
args = parser.parse_args() | |
if initial_test(args.disk) == 0: | |
print(colored("error: disk isn't even encrypted", "red")) | |
exit(1) | |
with open(args.wordlist) as fl: | |
words = fl.read().split("\n") | |
batches = chunkIt( | |
words, args.numthreads | |
) | |
queue = Queue() | |
processes = [] | |
for i in range(args.numthreads): | |
p = Process(target=worker, args=( | |
i, | |
batches[i], | |
queue, | |
args | |
)) | |
processes.append(p) | |
for p in processes: | |
p.start() | |
worker_statuses = {} | |
last_status_report = time.time() | |
start_time = datetime.now() | |
while 1: | |
new_status = queue.get() | |
worker_statuses[new_status.thread] = new_status | |
if new_status.found: | |
print(colored(f'found password: "{new_status.found}"', "green")) | |
for p in processes: | |
p.terminate() | |
break | |
if new_status.ran_out: | |
print(colored("ran out of passwords to try", "red")) | |
for p in processes: | |
p.terminate() | |
break | |
statuses_recieved = set() | |
for status in worker_statuses.values(): | |
statuses_recieved.add(status.thread) | |
has_all_worker_statuses = statuses_recieved == set( | |
range(args.numthreads)) | |
if time.time()-last_status_report >= 1 and has_all_worker_statuses: | |
print("\n--- status ---") | |
print(colored("start time:", "magenta"), start_time.strftime("%I:%M %p, %A")) | |
cuml_rate = 0 | |
total_done = 0 | |
for status in worker_statuses.values(): | |
cuml_rate+=status.try_rate | |
total_done+=status.progress | |
print(colored("attempt rate:", "magenta"), int(cuml_rate*60), "per minute") | |
print(colored("progress:", "magenta"), str(int(total_done/len(words)*100))+"%") | |
last_status_report = time.time() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment