Skip to content

Instantly share code, notes, and snippets.

@honno
Created July 26, 2019 07:15
Show Gist options
  • Save honno/4e6e4790e14c43bf5d6fa822fb56b6ea to your computer and use it in GitHub Desktop.
Save honno/4e6e4790e14c43bf5d6fa822fb56b6ea to your computer and use it in GitHub Desktop.
Multiprocessing bruteforcing script to find a valid CRC32 checksum in a file which quotes said CRC32 value, i.e. a self-referential CRC32.
#!/usr/bin/python3
import logging
import argparse
import sys
import binascii
import multiprocessing as mp
from time import sleep
SLEEP_TIME = 0.1
CRC32_BYTES = 4
CRC32_MAX = 2**32
TEMPLATE = None
DESTINATION = None
CRC_QUEUE = None
def crc_pos_bytes_print(start_pos):
end_pos = start_pos + CRC32_BYTES
bytes_str = "{} >> {} << {}".format(TEMPLATE[start_pos-CRC32_BYTES:start_pos].hex(), TEMPLATE[start_pos:end_pos].hex(), TEMPLATE[end_pos:end_pos+CRC32_BYTES].hex())
return "byte positions {} to {} ({})".format(start_pos, end_pos, bytes_str)
def crc_queue_handler(start_crc):
for crc in range(start_crc, CRC32_MAX):
CRC_QUEUE.put(crc)
# print("Queued CRC {}".format(crc))
def bruteforce_crc_process():
while True:
crc = CRC_QUEUE.get()
if crc == None:
sleep(SLEEP_TIME)
continue
crc_bytes = crc.to_bytes(4, 'little')
# print("Trying CRC {} ({})".format(str(crc), crc_bytes.hex()))
new_infile = TEMPLATE[0:qpos] + crc_bytes + TEMPLATE[qpos+CRC32_BYTES:pos] + crc_bytes + TEMPLATE[pos+CRC32_BYTES:]
new_infile_crc = binascii.crc32(new_infile)
# print("Modified source using CRC {} has the CRC value {}".format(str(crc), str(new_infile_crc)))
if crc == new_infile_crc:
return (crc, new_infile)
def crc_match(crc_and_new_infile):
crc = crc_and_new_infile[0]
new_infile = crc_and_new_infile[1]
print("CRC {} ({}) is a match!".format(crc, crc.to_bytes(4, 'little').hex()))
outfile = open(DESTINATION, 'wb')
outfile.write(new_infile)
outfile.close()
print("Infile with self-referencing CRC written to {}".format(DESTINATION))
# pool and crc_queue_handler_process are visible because it's global in __main__
pool.terminate()
crc_queue_handler_process.terminate()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('infile')
parser.add_argument('outfile')
parser.add_argument('pos', type=int)
parser.add_argument('qpos', type=int)
parser.add_argument('-s', '--start_at', type=int, default=0)
args = parser.parse_args(sys.argv[1:])
source = open(args.infile, 'rb')
TEMPLATE = bytearray(source.read())
source.close()
if len(TEMPLATE) == 0:
print("Infile contains no bytes")
sys.exit(-1)
DESTINATION = args.outfile
try:
dest = open(DESTINATION, 'w+')
dest.close()
except IOError:
print("Cannot open {} outfile".format(DESTINATION))
sys.exit(-1)
pos = args.pos
qpos = args.qpos
print("Finding CRC for {}".format(crc_pos_bytes_print(pos)))
print("Quoted CRC at {}".format(crc_pos_bytes_print(qpos)))
start_crc = args.start_at
print("Starting at CRC {}".format(start_crc))
cpus = mp.cpu_count()
CRC_QUEUE = mp.Queue(maxsize=cpus)
crc_queue_handler_process = mp.Process(target=crc_queue_handler, args=(start_crc,))
crc_queue_handler_process.start()
pool = mp.Pool(cpus)
pool.apply_async(bruteforce_crc_process, callback=crc_match)
pool.close()
pool.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment