Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Created March 2, 2020 02:06
Show Gist options
  • Save adiroiban/b70f5257b10439c90aaa9dd6f6fda6ca to your computer and use it in GitHub Desktop.
Save adiroiban/b70f5257b10439c90aaa9dd6f6fda6ca to your computer and use it in GitHub Desktop.
AES Crypt Sans I/O
"""
Implementation of AES Crypt witout any I/O operations.
"""
_AES_BLOCK_SIZE = 16
_AES_BUFFER_SIZE = 16 * 1024
_IV_SIZE = 16
_KEY_SIZE = 32
def _aes_crypt_derive(password, iv):
"""
Return the derivated internal key used to encrypt the payload.
This is slow.
"""
if len(password) > 1024:
raise ValueError('AESCrypt password is too long.')
# hash the external iv and the password 8192 times
digest = iv + (16 * b'\x00')
for i in range(8192):
passHash = hashes.Hash(hashes.SHA256(), backend=default_backend())
passHash.update(digest)
passHash.update(bytearray(password, 'utf_16_le'))
digest = passHash.finalize()
return digest
class AESCryptWriter(object):
"""
Sans I/O implementation for AESCrypt encryption.
1. Call `getHeader` to obtain the header data.
2. Continue calling `encryptChunk` to get encrypted data.
3. When no more data needs to be encrypted, call `getFooter` to get the
final encryption, padding, HMAC calculated and actual footer data.
Based on the implementation from
https://github.com/marcobellaccini/pyAesCrypt
"""
_CREATED_BY = b'CREATED_BY\x00AESCrypt Sans I/O 0.1.0'
def __init__(self, password):
self._password = password
self._encryptor = None
self._hmac_internal = None
self._buffer = b''
@staticmethod
def _random(n): # pragma: no cover
"""
Return random bytes.
Here to help with testing.
"""
return urandom(n)
def getHeader(self):
"""
Return the header data.
"""
result = [
b'AES', # Marker
b'\x02', # Version.
b'\x00', # Reserved. Set to 0.
# Extension length.
b'\x00' + struct.pack('B', len(self._CREATED_BY)),
self._CREATED_BY, # Extension payload.
b'\x00\x80', # Container extension length.
b'\x00' * 128, # Container payload.
b'\x00\x00', # End of extension.
]
iv_external = self._random(_IV_SIZE)
iv_internal = self._random(_IV_SIZE)
external_key = _aes_crypt_derive(self._password, iv_external)
internal_key = self._random(_KEY_SIZE)
# The cipher for the payload.
self._encryptor = Cipher(
algorithms.AES(internal_key),
modes.CBC(iv_internal),
backend=default_backend(),
).encryptor()
self._hmac_internal = hmac.HMAC(
internal_key,
hashes.SHA256(),
backend=default_backend(),
)
# The cipher for encrypting the secrets used to encrypt the payload.
encryptor_external = Cipher(
algorithms.AES(external_key),
modes.CBC(iv_external),
backend=default_backend(),
).encryptor()
hmac_external = hmac.HMAC(
external_key, hashes.SHA256(), backend=default_backend())
c_iv_key = (
encryptor_external.update(iv_internal + internal_key) +
encryptor_external.finalize()
)
hmac_external.update(c_iv_key)
result.extend([
iv_external,
c_iv_key, # IV used to encrypted the main (internal) IV + KEY.
hmac_external.finalize(), # HMAC for internal encrypted IV + KEY.
])
return b''.join(result)
def encryptChunk(self, data):
"""
Return the encrypted chunk for data.
Might return an empty data is there is not enough data to create
an AES block.
"""
self._buffer += data
if len(self._buffer) < _AES_BUFFER_SIZE:
# We don't have yet enough data.
return b''
data = self._buffer[:_AES_BUFFER_SIZE]
self._buffer = self._buffer[_AES_BUFFER_SIZE:]
result = self._encryptor.update(data)
self._hmac_internal.update(result)
return result
def getFooter(self):
"""
Return the footer data associated with the data encrypted so far.
It will also finalize/pad the last block and return the encrypted data
for it.
"""
result = []
if self._buffer:
# Still have to write the last chunk.
size_modulo = len(self._buffer) % _AES_BLOCK_SIZE
if size_modulo == 0:
pad_size = 0
else:
pad_size = 16 - size_modulo
self._buffer += struct.pack('B', size_modulo) * pad_size
data = self._encryptor.update(self._buffer)
else:
# Nothing left to write.
data = b''
size_modulo = 0
data += self._encryptor.finalize()
self._hmac_internal.update(data)
result.extend([
data, # Remaining data... padded. If any.
struct.pack('B', size_modulo), # Payload size module 16.
self._hmac_internal.finalize(), # Payload HMAC
])
return b''.join(result)
class AESCryptReader(object):
"""
Sans I/O implementation for AESCrypt decryption.
1. Call `decryptChunk` for each encrypted chunk.
2. When there are no more chunks call `getFinal` to get the final
decrypted chunk and run file validations.
Based on the implementation from
https://github.com/marcobellaccini/pyAesCrypt
"""
def __init__(self, password):
self._password = password
self._buffer = b''
self._state = self._parseHeader
def decryptChunk(self, data):
"""
Redirect to the current state.
"""
return self._state(data)
def getFinal(self):
"""
Get the final data from the stream and do the final security checks.
No more data will be read.
"""
size_modulo = struct.unpack('B', self._buffer[-33])[0]
pad_size = 16 - size_modulo
hmac_payload = self._buffer[-32:]
chunk = self._buffer[:-33]
self._buffer = b''
self._hmac_internal.update(chunk)
result = (
self._decryptor.update(chunk) + self._decryptor.finalize()
)
if hmac_payload != self._hmac_internal.finalize():
raise ServerException(
'Bad AESCrypt HMAC. File is corrupted.')
return result[:-pad_size]
def _parseHeader(self, data):
"""
Read the header
"""
self._buffer += data
if len(self._buffer) < 5:
# We don't yet have a full header.
return b''
if self._buffer[:3] != b'AES':
raise ServerException(
'Invalid AESCrypt format. Unknown type marker.')
if self._buffer[3] != b'\x02':
raise ServerException(
'Invalid AESCrypt format. Only V2 file format is supported.')
# We skip the file marker and reserved byte.
self._buffer = self._buffer[5:]
self._state = self._parseExtension
return self._parseExtension(b'')
def _parseExtension(self, data):
"""
Read the extension section.
No payload is returned and for now all extensions are ignored.
"""
self._buffer += data
if len(self._buffer) < 2:
# continue reading more data from the extension.
return b''
extension_size = struct.unpack('>H', self._buffer[:2])[0]
if extension_size == 0:
# We are at the end of the extension.
# continue with reading the IV and KEY for payload.
self._state = self._readKey
self._buffer = self._buffer[2:]
return self._readKey(b'')
if len(self._buffer) < 2 + extension_size:
# Still need to read more data for the extension
return b''
# Ignore the data for the current extension
self._buffer = self._buffer[2 + extension_size:]
return self._parseExtension(b'')
def _readKey(self, data):
"""
Read the key and payload from the header.
"""
self._buffer += data
if len(self._buffer) < 96:
# Still need to read more data.
return b''
iv_external = self._buffer[:16]
external_key = _aes_crypt_derive(self._password, iv_external)
c_iv_key = self._buffer[16:64]
hmac_external = self._buffer[64:96]
hmac_external_test = hmac.HMAC(
external_key,
hashes.SHA256(),
backend=default_backend(),
)
hmac_external_test.update(c_iv_key)
if hmac_external != hmac_external_test.finalize():
raise ServerException(
'Wrong AESCrypt password (or file is corrupted).')
decryptor_external = Cipher(
algorithms.AES(external_key),
modes.CBC(iv_external),
backend=default_backend(),
).decryptor()
iv_key = (
decryptor_external.update(c_iv_key) +
decryptor_external.finalize()
)
iv_internal = iv_key[:16]
internal_key = iv_key[16:]
self._decryptor = Cipher(
algorithms.AES(internal_key),
modes.CBC(iv_internal),
backend=default_backend(),
).decryptor()
self._hmac_internal = hmac.HMAC(
internal_key, hashes.SHA256(), backend=default_backend())
self._buffer = self._buffer[96:]
self._state = self._decryptChunk
return self._decryptChunk(b'')
def _decryptChunk(self, data):
"""
Extract decrypted data.
Might return an empty data if no data could be decrypted yet.
"""
self._buffer += data
if len(self._buffer) < _AES_BUFFER_SIZE:
return b''
chunk = self._buffer[:_AES_BUFFER_SIZE]
self._buffer = self._buffer[_AES_BUFFER_SIZE:]
self._hmac_internal.update(chunk)
result = self._decryptor.update(chunk)
return result + self._decryptChunk(b'')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment