Created
July 26, 2019 07:15
-
-
Save honno/4e6e4790e14c43bf5d6fa822fb56b6ea to your computer and use it in GitHub Desktop.
Multiprocessing bruteforcing script to find a valid CRC32 checksum in a file which quotes said CRC32 value, i.e. a self-referential CRC32.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import logging | |
import argparse | |
import sys | |
import binascii | |
import multiprocessing as mp | |
from time import sleep | |
SLEEP_TIME = 0.1 | |
CRC32_BYTES = 4 | |
CRC32_MAX = 2**32 | |
TEMPLATE = None | |
DESTINATION = None | |
CRC_QUEUE = None | |
def crc_pos_bytes_print(start_pos): | |
end_pos = start_pos + CRC32_BYTES | |
bytes_str = "{} >> {} << {}".format(TEMPLATE[start_pos-CRC32_BYTES:start_pos].hex(), TEMPLATE[start_pos:end_pos].hex(), TEMPLATE[end_pos:end_pos+CRC32_BYTES].hex()) | |
return "byte positions {} to {} ({})".format(start_pos, end_pos, bytes_str) | |
def crc_queue_handler(start_crc): | |
for crc in range(start_crc, CRC32_MAX): | |
CRC_QUEUE.put(crc) | |
# print("Queued CRC {}".format(crc)) | |
def bruteforce_crc_process(): | |
while True: | |
crc = CRC_QUEUE.get() | |
if crc == None: | |
sleep(SLEEP_TIME) | |
continue | |
crc_bytes = crc.to_bytes(4, 'little') | |
# print("Trying CRC {} ({})".format(str(crc), crc_bytes.hex())) | |
new_infile = TEMPLATE[0:qpos] + crc_bytes + TEMPLATE[qpos+CRC32_BYTES:pos] + crc_bytes + TEMPLATE[pos+CRC32_BYTES:] | |
new_infile_crc = binascii.crc32(new_infile) | |
# print("Modified source using CRC {} has the CRC value {}".format(str(crc), str(new_infile_crc))) | |
if crc == new_infile_crc: | |
return (crc, new_infile) | |
def crc_match(crc_and_new_infile): | |
crc = crc_and_new_infile[0] | |
new_infile = crc_and_new_infile[1] | |
print("CRC {} ({}) is a match!".format(crc, crc.to_bytes(4, 'little').hex())) | |
outfile = open(DESTINATION, 'wb') | |
outfile.write(new_infile) | |
outfile.close() | |
print("Infile with self-referencing CRC written to {}".format(DESTINATION)) | |
# pool and crc_queue_handler_process are visible because it's global in __main__ | |
pool.terminate() | |
crc_queue_handler_process.terminate() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('infile') | |
parser.add_argument('outfile') | |
parser.add_argument('pos', type=int) | |
parser.add_argument('qpos', type=int) | |
parser.add_argument('-s', '--start_at', type=int, default=0) | |
args = parser.parse_args(sys.argv[1:]) | |
source = open(args.infile, 'rb') | |
TEMPLATE = bytearray(source.read()) | |
source.close() | |
if len(TEMPLATE) == 0: | |
print("Infile contains no bytes") | |
sys.exit(-1) | |
DESTINATION = args.outfile | |
try: | |
dest = open(DESTINATION, 'w+') | |
dest.close() | |
except IOError: | |
print("Cannot open {} outfile".format(DESTINATION)) | |
sys.exit(-1) | |
pos = args.pos | |
qpos = args.qpos | |
print("Finding CRC for {}".format(crc_pos_bytes_print(pos))) | |
print("Quoted CRC at {}".format(crc_pos_bytes_print(qpos))) | |
start_crc = args.start_at | |
print("Starting at CRC {}".format(start_crc)) | |
cpus = mp.cpu_count() | |
CRC_QUEUE = mp.Queue(maxsize=cpus) | |
crc_queue_handler_process = mp.Process(target=crc_queue_handler, args=(start_crc,)) | |
crc_queue_handler_process.start() | |
pool = mp.Pool(cpus) | |
pool.apply_async(bruteforce_crc_process, callback=crc_match) | |
pool.close() | |
pool.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment