Created
September 20, 2024 11:13
-
-
Save vitqst/d9c5294e468dcae56d282a7cf907dfef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
""" | |
This script attempts to crack a password hashed with the Blowfish algorithm using a dictionary attack. | |
It utilizes multiprocessing to speed up the process by dividing the password list into chunks and | |
distributing them across multiple CPU cores. | |
Functions: | |
read_password_file(file_path): | |
Reads a file containing passwords, one per line, and returns a list of passwords. | |
check_password_blowfish(stored_hash, input_password): | |
Verifies if the input password matches the stored Blowfish hash. | |
check_password_chunk(stored_hash, passwords, process_id, progress_queue, result_queue): | |
Checks a chunk of passwords against the stored hash. Updates progress and result queues. | |
main(): | |
Main function that sets up multiprocessing, reads the password file, divides the passwords | |
into chunks, and attempts to find the matching password. Displays progress and results. | |
Usage: | |
Run the script directly to start the password cracking process. | |
""" | |
from passlib.hash import phpass | |
import os | |
from tqdm import tqdm | |
import time | |
def read_password_file(file_path): | |
with open(file_path, "r") as file: | |
return file.read().splitlines() | |
def check_password_blowfish(stored_hash, input_password): | |
return phpass.verify(input_password, stored_hash) | |
def check_password_chunk(stored_hash, passwords, process_id, progress_queue, result_queue): | |
for i, password in enumerate(passwords): | |
if i % 1000 == 0: # Update progress every 1000 passwords | |
progress_queue.put(1000) | |
if check_password_blowfish(stored_hash, password): | |
result_queue.put((password, process_id)) | |
return | |
progress_queue.put(len(passwords) % 1000) # Report any remaining passwords | |
def main(): | |
stored_hash = "$P$BpTFA3Qt.D2gx6ntNzgQzDMs7Eur24/" | |
input_passwords = read_password_file("10-million-password-list-top-1000000.txt") | |
num_cores = os.cpu_count() | |
chunk_size = len(input_passwords) // num_cores | |
password_chunks = [input_passwords[i:i + chunk_size] for i in range(0, len(input_passwords), chunk_size)] | |
with mp.Manager() as manager: | |
progress_queue = manager.Queue() | |
result_queue = manager.Queue() | |
with mp.Pool(processes=num_cores) as pool: | |
args = [(stored_hash, chunk, i, progress_queue, result_queue) for i, chunk in enumerate(password_chunks)] | |
pbar = tqdm(total=len(input_passwords), desc="Checking passwords") | |
async_results = pool.starmap_async(check_password_chunk, args) | |
found_password = False | |
checked_passwords = 0 | |
while not async_results.ready() or not progress_queue.empty(): | |
try: | |
# Use a timeout to prevent blocking indefinitely | |
completed = progress_queue.get(timeout=0.1) | |
checked_passwords += completed | |
pbar.update(completed) | |
except mp.queues.Empty: | |
pass | |
if not result_queue.empty(): | |
found_password = True | |
break | |
pbar.close() | |
if found_password: | |
matching_password, process_id = result_queue.get() | |
print(f"\nPassword found: {matching_password}") | |
print(f"Found by process: {process_id}") | |
else: | |
print("\nPassword not found") | |
# Ensure all processes are terminated | |
pool.terminate() | |
pool.join() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment