Skip to content

Instantly share code, notes, and snippets.

@0xDBFB7
Created September 4, 2017 18:53
Show Gist options
  • Save 0xDBFB7/a46823a102fc843f188c50801ce358a7 to your computer and use it in GitHub Desktop.
Save 0xDBFB7/a46823a102fc843f188c50801ce358a7 to your computer and use it in GitHub Desktop.
MicroPython socket-only file download routine with cert pinning and hashing
#DESIGNED FOR MICROPYTHON
server_dict = [{"address": "test.com","fingerprints":[bytearray('adlfasdfekre3r2309234902302lqkejroi23ro239842342304')]}] #list of possible servers and accepted keys
def download_file(server_dir,filename,server_index=0,block_size=512): #Boneless file downloader
#typical socket stuff
s = socket.socket()
s.settimeout(60)
ai = socket.getaddrinfo(server_dict[server_index]['address'], 443) #FIXME: This will probably do a dns lookup every time - probably shouldn't.
addr = ai[0][-1]
s.connect(addr)
s = ssl.wrap_socket(s)
#get the hash and compare
certHash = binascii.hexlify(hashlib.sha256(s.getpeercert(True)).digest())
print("Certificate Hash: " + str(certHash))
print("Accepted Hashes:")
for i in server_dict[server_index]['fingerprints']:
print(str(i))
if not bytearray(certHash) in server_dict[server_index]['fingerprints']:
print("Certificate doesn't match.")
s.close()
return False
#tell the server what you wnat
http_headers = "POST " + server_dir + " HTTP/1.1\r\nConnection: Keep-Alive\r\nAccept: */*\r\nHost: " + server_dict[server_index]['address'] + "\r\nUser-Agent: Seldon\r\nContent-Type: application/x-www-form-urlencoded\r\nContent-Length: 0\r\n\r\n"
s.write(http_headers)
#Extract the content_length from the response header
response_header = s.read(1)
while not '\r\n\r\n' in response_header:
response_header += s.read(1)
response_header = str(response_header).split('\\r\\n')
content_length = int([x for x in response_header if 'Content-Length' in x][0].replace('Content-Length: ',""))
#Actually write to file now
f = open(filename,'wb+')
page_count = 0
hash_obj = hashlib.sha256()
while(True):
if(content_length-(page_count*block_size) < block_size):
response = s.read(content_length%block_size)
f.write(response)
hash_obj.update(response)
print('Wrote ' + str(content_length%block_size))
break
else:
response = s.read(block_size)
f.write(response)
hash_obj.update(response)
page_count+=1
print('Wrote full block')
f.close()
s.close()
print("Final file size: ")
print(os.stat(filename)[6])
#return the hex digest
return base64.b64encode(hash_obj.digest())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment