Created
September 4, 2017 18:53
-
-
Save 0xDBFB7/a46823a102fc843f188c50801ce358a7 to your computer and use it in GitHub Desktop.
MicroPython socket-only file download routine with cert pinning and hashing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#DESIGNED FOR MICROPYTHON | |
server_dict = [{"address": "test.com","fingerprints":[bytearray('adlfasdfekre3r2309234902302lqkejroi23ro239842342304')]}] #list of possible servers and accepted keys | |
def download_file(server_dir,filename,server_index=0,block_size=512): #Boneless file downloader | |
#typical socket stuff | |
s = socket.socket() | |
s.settimeout(60) | |
ai = socket.getaddrinfo(server_dict[server_index]['address'], 443) #FIXME: This will probably do a dns lookup every time - probably shouldn't. | |
addr = ai[0][-1] | |
s.connect(addr) | |
s = ssl.wrap_socket(s) | |
#get the hash and compare | |
certHash = binascii.hexlify(hashlib.sha256(s.getpeercert(True)).digest()) | |
print("Certificate Hash: " + str(certHash)) | |
print("Accepted Hashes:") | |
for i in server_dict[server_index]['fingerprints']: | |
print(str(i)) | |
if not bytearray(certHash) in server_dict[server_index]['fingerprints']: | |
print("Certificate doesn't match.") | |
s.close() | |
return False | |
#tell the server what you wnat | |
http_headers = "POST " + server_dir + " HTTP/1.1\r\nConnection: Keep-Alive\r\nAccept: */*\r\nHost: " + server_dict[server_index]['address'] + "\r\nUser-Agent: Seldon\r\nContent-Type: application/x-www-form-urlencoded\r\nContent-Length: 0\r\n\r\n" | |
s.write(http_headers) | |
#Extract the content_length from the response header | |
response_header = s.read(1) | |
while not '\r\n\r\n' in response_header: | |
response_header += s.read(1) | |
response_header = str(response_header).split('\\r\\n') | |
content_length = int([x for x in response_header if 'Content-Length' in x][0].replace('Content-Length: ',"")) | |
#Actually write to file now | |
f = open(filename,'wb+') | |
page_count = 0 | |
hash_obj = hashlib.sha256() | |
while(True): | |
if(content_length-(page_count*block_size) < block_size): | |
response = s.read(content_length%block_size) | |
f.write(response) | |
hash_obj.update(response) | |
print('Wrote ' + str(content_length%block_size)) | |
break | |
else: | |
response = s.read(block_size) | |
f.write(response) | |
hash_obj.update(response) | |
page_count+=1 | |
print('Wrote full block') | |
f.close() | |
s.close() | |
print("Final file size: ") | |
print(os.stat(filename)[6]) | |
#return the hex digest | |
return base64.b64encode(hash_obj.digest()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment