Created
March 2, 2020 02:06
-
-
Save adiroiban/b70f5257b10439c90aaa9dd6f6fda6ca to your computer and use it in GitHub Desktop.
AES Crypt Sans I/O
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Implementation of AES Crypt witout any I/O operations. | |
| """ | |
| _AES_BLOCK_SIZE = 16 | |
| _AES_BUFFER_SIZE = 16 * 1024 | |
| _IV_SIZE = 16 | |
| _KEY_SIZE = 32 | |
| def _aes_crypt_derive(password, iv): | |
| """ | |
| Return the derivated internal key used to encrypt the payload. | |
| This is slow. | |
| """ | |
| if len(password) > 1024: | |
| raise ValueError('AESCrypt password is too long.') | |
| # hash the external iv and the password 8192 times | |
| digest = iv + (16 * b'\x00') | |
| for i in range(8192): | |
| passHash = hashes.Hash(hashes.SHA256(), backend=default_backend()) | |
| passHash.update(digest) | |
| passHash.update(bytearray(password, 'utf_16_le')) | |
| digest = passHash.finalize() | |
| return digest | |
| class AESCryptWriter(object): | |
| """ | |
| Sans I/O implementation for AESCrypt encryption. | |
| 1. Call `getHeader` to obtain the header data. | |
| 2. Continue calling `encryptChunk` to get encrypted data. | |
| 3. When no more data needs to be encrypted, call `getFooter` to get the | |
| final encryption, padding, HMAC calculated and actual footer data. | |
| Based on the implementation from | |
| https://github.com/marcobellaccini/pyAesCrypt | |
| """ | |
| _CREATED_BY = b'CREATED_BY\x00AESCrypt Sans I/O 0.1.0' | |
| def __init__(self, password): | |
| self._password = password | |
| self._encryptor = None | |
| self._hmac_internal = None | |
| self._buffer = b'' | |
| @staticmethod | |
| def _random(n): # pragma: no cover | |
| """ | |
| Return random bytes. | |
| Here to help with testing. | |
| """ | |
| return urandom(n) | |
| def getHeader(self): | |
| """ | |
| Return the header data. | |
| """ | |
| result = [ | |
| b'AES', # Marker | |
| b'\x02', # Version. | |
| b'\x00', # Reserved. Set to 0. | |
| # Extension length. | |
| b'\x00' + struct.pack('B', len(self._CREATED_BY)), | |
| self._CREATED_BY, # Extension payload. | |
| b'\x00\x80', # Container extension length. | |
| b'\x00' * 128, # Container payload. | |
| b'\x00\x00', # End of extension. | |
| ] | |
| iv_external = self._random(_IV_SIZE) | |
| iv_internal = self._random(_IV_SIZE) | |
| external_key = _aes_crypt_derive(self._password, iv_external) | |
| internal_key = self._random(_KEY_SIZE) | |
| # The cipher for the payload. | |
| self._encryptor = Cipher( | |
| algorithms.AES(internal_key), | |
| modes.CBC(iv_internal), | |
| backend=default_backend(), | |
| ).encryptor() | |
| self._hmac_internal = hmac.HMAC( | |
| internal_key, | |
| hashes.SHA256(), | |
| backend=default_backend(), | |
| ) | |
| # The cipher for encrypting the secrets used to encrypt the payload. | |
| encryptor_external = Cipher( | |
| algorithms.AES(external_key), | |
| modes.CBC(iv_external), | |
| backend=default_backend(), | |
| ).encryptor() | |
| hmac_external = hmac.HMAC( | |
| external_key, hashes.SHA256(), backend=default_backend()) | |
| c_iv_key = ( | |
| encryptor_external.update(iv_internal + internal_key) + | |
| encryptor_external.finalize() | |
| ) | |
| hmac_external.update(c_iv_key) | |
| result.extend([ | |
| iv_external, | |
| c_iv_key, # IV used to encrypted the main (internal) IV + KEY. | |
| hmac_external.finalize(), # HMAC for internal encrypted IV + KEY. | |
| ]) | |
| return b''.join(result) | |
| def encryptChunk(self, data): | |
| """ | |
| Return the encrypted chunk for data. | |
| Might return an empty data is there is not enough data to create | |
| an AES block. | |
| """ | |
| self._buffer += data | |
| if len(self._buffer) < _AES_BUFFER_SIZE: | |
| # We don't have yet enough data. | |
| return b'' | |
| data = self._buffer[:_AES_BUFFER_SIZE] | |
| self._buffer = self._buffer[_AES_BUFFER_SIZE:] | |
| result = self._encryptor.update(data) | |
| self._hmac_internal.update(result) | |
| return result | |
| def getFooter(self): | |
| """ | |
| Return the footer data associated with the data encrypted so far. | |
| It will also finalize/pad the last block and return the encrypted data | |
| for it. | |
| """ | |
| result = [] | |
| if self._buffer: | |
| # Still have to write the last chunk. | |
| size_modulo = len(self._buffer) % _AES_BLOCK_SIZE | |
| if size_modulo == 0: | |
| pad_size = 0 | |
| else: | |
| pad_size = 16 - size_modulo | |
| self._buffer += struct.pack('B', size_modulo) * pad_size | |
| data = self._encryptor.update(self._buffer) | |
| else: | |
| # Nothing left to write. | |
| data = b'' | |
| size_modulo = 0 | |
| data += self._encryptor.finalize() | |
| self._hmac_internal.update(data) | |
| result.extend([ | |
| data, # Remaining data... padded. If any. | |
| struct.pack('B', size_modulo), # Payload size module 16. | |
| self._hmac_internal.finalize(), # Payload HMAC | |
| ]) | |
| return b''.join(result) | |
| class AESCryptReader(object): | |
| """ | |
| Sans I/O implementation for AESCrypt decryption. | |
| 1. Call `decryptChunk` for each encrypted chunk. | |
| 2. When there are no more chunks call `getFinal` to get the final | |
| decrypted chunk and run file validations. | |
| Based on the implementation from | |
| https://github.com/marcobellaccini/pyAesCrypt | |
| """ | |
| def __init__(self, password): | |
| self._password = password | |
| self._buffer = b'' | |
| self._state = self._parseHeader | |
| def decryptChunk(self, data): | |
| """ | |
| Redirect to the current state. | |
| """ | |
| return self._state(data) | |
| def getFinal(self): | |
| """ | |
| Get the final data from the stream and do the final security checks. | |
| No more data will be read. | |
| """ | |
| size_modulo = struct.unpack('B', self._buffer[-33])[0] | |
| pad_size = 16 - size_modulo | |
| hmac_payload = self._buffer[-32:] | |
| chunk = self._buffer[:-33] | |
| self._buffer = b'' | |
| self._hmac_internal.update(chunk) | |
| result = ( | |
| self._decryptor.update(chunk) + self._decryptor.finalize() | |
| ) | |
| if hmac_payload != self._hmac_internal.finalize(): | |
| raise ServerException( | |
| 'Bad AESCrypt HMAC. File is corrupted.') | |
| return result[:-pad_size] | |
| def _parseHeader(self, data): | |
| """ | |
| Read the header | |
| """ | |
| self._buffer += data | |
| if len(self._buffer) < 5: | |
| # We don't yet have a full header. | |
| return b'' | |
| if self._buffer[:3] != b'AES': | |
| raise ServerException( | |
| 'Invalid AESCrypt format. Unknown type marker.') | |
| if self._buffer[3] != b'\x02': | |
| raise ServerException( | |
| 'Invalid AESCrypt format. Only V2 file format is supported.') | |
| # We skip the file marker and reserved byte. | |
| self._buffer = self._buffer[5:] | |
| self._state = self._parseExtension | |
| return self._parseExtension(b'') | |
| def _parseExtension(self, data): | |
| """ | |
| Read the extension section. | |
| No payload is returned and for now all extensions are ignored. | |
| """ | |
| self._buffer += data | |
| if len(self._buffer) < 2: | |
| # continue reading more data from the extension. | |
| return b'' | |
| extension_size = struct.unpack('>H', self._buffer[:2])[0] | |
| if extension_size == 0: | |
| # We are at the end of the extension. | |
| # continue with reading the IV and KEY for payload. | |
| self._state = self._readKey | |
| self._buffer = self._buffer[2:] | |
| return self._readKey(b'') | |
| if len(self._buffer) < 2 + extension_size: | |
| # Still need to read more data for the extension | |
| return b'' | |
| # Ignore the data for the current extension | |
| self._buffer = self._buffer[2 + extension_size:] | |
| return self._parseExtension(b'') | |
| def _readKey(self, data): | |
| """ | |
| Read the key and payload from the header. | |
| """ | |
| self._buffer += data | |
| if len(self._buffer) < 96: | |
| # Still need to read more data. | |
| return b'' | |
| iv_external = self._buffer[:16] | |
| external_key = _aes_crypt_derive(self._password, iv_external) | |
| c_iv_key = self._buffer[16:64] | |
| hmac_external = self._buffer[64:96] | |
| hmac_external_test = hmac.HMAC( | |
| external_key, | |
| hashes.SHA256(), | |
| backend=default_backend(), | |
| ) | |
| hmac_external_test.update(c_iv_key) | |
| if hmac_external != hmac_external_test.finalize(): | |
| raise ServerException( | |
| 'Wrong AESCrypt password (or file is corrupted).') | |
| decryptor_external = Cipher( | |
| algorithms.AES(external_key), | |
| modes.CBC(iv_external), | |
| backend=default_backend(), | |
| ).decryptor() | |
| iv_key = ( | |
| decryptor_external.update(c_iv_key) + | |
| decryptor_external.finalize() | |
| ) | |
| iv_internal = iv_key[:16] | |
| internal_key = iv_key[16:] | |
| self._decryptor = Cipher( | |
| algorithms.AES(internal_key), | |
| modes.CBC(iv_internal), | |
| backend=default_backend(), | |
| ).decryptor() | |
| self._hmac_internal = hmac.HMAC( | |
| internal_key, hashes.SHA256(), backend=default_backend()) | |
| self._buffer = self._buffer[96:] | |
| self._state = self._decryptChunk | |
| return self._decryptChunk(b'') | |
| def _decryptChunk(self, data): | |
| """ | |
| Extract decrypted data. | |
| Might return an empty data if no data could be decrypted yet. | |
| """ | |
| self._buffer += data | |
| if len(self._buffer) < _AES_BUFFER_SIZE: | |
| return b'' | |
| chunk = self._buffer[:_AES_BUFFER_SIZE] | |
| self._buffer = self._buffer[_AES_BUFFER_SIZE:] | |
| self._hmac_internal.update(chunk) | |
| result = self._decryptor.update(chunk) | |
| return result + self._decryptChunk(b'') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment