Created
August 29, 2024 10:46
-
-
Save rxwx/c968b3324e74058208fe6e168fd8730f to your computer and use it in GitHub Desktop.
Encrypt a DPAPI blob with arbitrary master key (using Python)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from Crypto.Cipher import AES, DES3 | |
from Crypto.Hash import HMAC, SHA1, SHA512, SHA256 | |
from Crypto.Util.Padding import pad | |
from io import BytesIO | |
import argparse | |
import string | |
import base64 | |
import uuid | |
import os | |
class DPAPIBlob: | |
CALG_3DES = 0x6603 | |
CALG_AES_256 = 0x6610 | |
CALG_SHA1 = 0x8004 | |
CALG_SHA_256 = 0x800c | |
CALG_SHA_512 = 0x800e | |
def combine_bytes(self, *arrays): | |
return b''.join(arrays) | |
def hmac_sha512(self, key, data): | |
hmac = HMAC.new(key, digestmod=SHA512) | |
hmac.update(data) | |
return hmac.digest() | |
def derive_key_raw(self, hash_bytes, alg_hash): | |
ipad = bytearray([0x36] * 64) | |
opad = bytearray([0x5C] * 64) | |
for i in range(len(hash_bytes)): | |
ipad[i] ^= hash_bytes[i] | |
opad[i] ^= hash_bytes[i] | |
if alg_hash == self.CALG_SHA1: | |
sha1 = SHA1.new() | |
ipad_sha1bytes = sha1.new(ipad).digest() | |
opad_sha1bytes = sha1.new(opad).digest() | |
return self.combine_bytes(ipad_sha1bytes, opad_sha1bytes) | |
else: | |
raise Exception(f"Unsupported alg_hash: {alg_hash}") | |
def derive_key2(self, key, nonce, hash_algorithm, blob, entropy=None): | |
""" | |
Derive a key using the provided key, nonce, hash algorithm, blob, and optional entropy. | |
:param key: The base key material. | |
:param nonce: The nonce (salt) value. | |
:param hash_algorithm: The hash algorithm identifier (SHA1, SHA256, SHA512). | |
:param blob: The additional data to include in the key derivation. | |
:param entropy: Optional entropy to include in the key derivation. | |
:return: The derived key as a byte array. | |
""" | |
if hash_algorithm == self.CALG_SHA1: | |
hmac = HMAC.new(key, digestmod=SHA1) | |
elif hash_algorithm == self.CALG_SHA_256: | |
hmac = HMAC.new(key, digestmod=SHA256) | |
elif hash_algorithm == self.CALG_SHA_512: | |
hmac = HMAC.new(key, digestmod=SHA512) | |
else: | |
raise Exception(f"Unsupported hash algorithm: {hash_algorithm}") | |
key_material = bytearray() | |
key_material.extend(nonce) | |
if entropy is not None: | |
key_material.extend(entropy) | |
key_material.extend(blob) | |
hmac.update(key_material) | |
return hmac.digest() | |
def derive_key(self, key_bytes, salt_bytes, alg_hash, entropy=None): | |
if alg_hash == self.CALG_SHA_512: | |
if entropy is not None: | |
return self.hmac_sha512(key_bytes, self.combine_bytes(salt_bytes, entropy)) | |
else: | |
return self.hmac_sha512(key_bytes, salt_bytes) | |
elif alg_hash == self.CALG_SHA1: | |
ipad = bytearray([0x36] * 64) | |
opad = bytearray([0x5C] * 64) | |
for i in range(len(key_bytes)): | |
ipad[i] ^= key_bytes[i] | |
opad[i] ^= key_bytes[i] | |
buffer_i = self.combine_bytes(ipad, salt_bytes) | |
sha1 = SHA1.new() | |
sha1.update(buffer_i) | |
sha1_buffer_i = sha1.digest() | |
buffer_o = self.combine_bytes(opad, sha1_buffer_i) | |
if entropy is not None: | |
buffer_o = self.combine_bytes(buffer_o, entropy) | |
sha1.update(buffer_o) | |
sha1_buffer_o = sha1.digest() | |
return self.derive_key_raw(sha1_buffer_o, alg_hash) | |
else: | |
raise Exception("Unsupported Hash Algorithm") | |
def encrypt(self, plaintext, key, algCrypt): | |
if algCrypt == self.CALG_3DES: | |
iv = b'\x00' * 8 | |
cipher = DES3.new(key, DES3.MODE_CBC, iv) | |
elif algCrypt == self.CALG_AES_256: | |
iv = b'\x00' * 16 | |
cipher = AES.new(key, AES.MODE_CBC, iv) | |
else: | |
raise Exception(f"Unsupported encryption algorithm: {algCrypt}") | |
padded_data = pad(plaintext, cipher.block_size) | |
return cipher.encrypt(padded_data) | |
def create_blob(self, plaintext, masterKey, algCrypt, algHash, masterKeyGuid, flags=0, entropy=None, description=""): | |
descBytes = description.encode('utf-16le') if description else b'\x00\x00' | |
saltBytes = os.urandom(32) | |
hmac2KeyLen = 32 | |
if algCrypt == self.CALG_3DES: | |
algCryptLen = 192 | |
elif algCrypt == self.CALG_AES_256: | |
algCryptLen = 256 | |
else: | |
raise Exception(f"Unsupported encryption algorithm: {algCrypt}") | |
if algHash == self.CALG_SHA1: | |
signLen = 20 | |
elif algHash == self.CALG_SHA_256: | |
signLen = 32 | |
elif algHash == self.CALG_SHA_512: | |
signLen = 64 | |
else: | |
raise Exception(f"Unsupported hash algorithm: {algHash}") | |
# Derive key | |
derivedKeyBytes = self.derive_key(masterKey, saltBytes, algHash, entropy) | |
finalKeyBytes = derivedKeyBytes[:algCryptLen // 8] | |
# Encrypt data | |
encData = self.encrypt(plaintext, finalKeyBytes, algCrypt) | |
# Construct the BLOB using BytesIO | |
blob = BytesIO() | |
# Version | |
blob.write((1).to_bytes(4, 'little')) | |
# Provider GUID | |
providerGuid = uuid.UUID("df9d8cd0-1501-11d1-8c7a-00c04fc297eb").bytes_le | |
blob.write(providerGuid) | |
# MasterKey version | |
blob.write((1).to_bytes(4, 'little')) | |
# MasterKey GUID | |
blob.write(masterKeyGuid.bytes_le) | |
# Flags | |
blob.write((flags).to_bytes(4, 'little')) | |
# Description length | |
blob.write(len(descBytes).to_bytes(4, 'little')) | |
# Description | |
blob.write(descBytes) | |
# Algorithm ID | |
blob.write(algCrypt.to_bytes(4, 'little')) | |
# Algorithm key length | |
blob.write(algCryptLen.to_bytes(4, 'little')) | |
# Salt length | |
blob.write(len(saltBytes).to_bytes(4, 'little')) | |
# Salt | |
blob.write(saltBytes) | |
# HMAC key length (always 0) | |
blob.write((0).to_bytes(4, 'little')) | |
# Hash algorithm ID | |
blob.write(algHash.to_bytes(4, 'little')) | |
# Hash length | |
blob.write((len(derivedKeyBytes) * 8).to_bytes(4, 'little')) | |
# HMAC2 key length | |
blob.write(hmac2KeyLen.to_bytes(4, 'little')) | |
# HMAC2 key | |
hmac2Key = os.urandom(hmac2KeyLen) | |
blob.write(hmac2Key) | |
# Data length | |
blob.write(len(encData).to_bytes(4, 'little')) | |
# Encrypted Data | |
blob.write(encData) | |
# Create the HMAC (sign) over the entire blob except for the sign field | |
signBlob = blob.getvalue()[20:] # Skip the first 20 bytes for the HMAC calculation | |
sign = self.derive_key2(masterKey, hmac2Key, algHash, signBlob, entropy) | |
# Sign length | |
blob.write(signLen.to_bytes(4, 'little')) | |
# Sign | |
blob.write(sign) | |
return blob.getvalue() | |
def main(): | |
parser = argparse.ArgumentParser(description="Encrypt data with DPAPI and output the BLOB") | |
parser.add_argument("input_file", help="Path to the plaintext data to encrypt") | |
parser.add_argument("master_key_guid", help="The master key GUID") | |
parser.add_argument("master_key", help="The master key (hex formatted). " \ | |
"Note: you can dump the master key(s) using mimikatz dpapi::masterkey command") | |
parser.add_argument("--base64", action="store_true", help="Output the encrypted data in base64") | |
parser.add_argument("--output", help="Write the output to a file (provide a file path)") | |
parser.add_argument("--local", help="Encrypt data with the local machine scope. " \ | |
"Note: This requires a SYSTEM master key decrypted using the DPAPI_SYSTEM LSA secret " \ | |
"(use mimikatz lsadump::secrets and dpapi::masterkey to dump this)", | |
action='store_true') | |
args = parser.parse_args() | |
if not os.path.exists(args.input_file): | |
print (f' Input file does not exist: {args.input_file}') | |
return | |
if not all(c in string.hexdigits for c in args.master_key): | |
print (f' Provided master key is not valid: {args.master_key}') | |
return | |
try: | |
uuid.UUID(args.master_key_guid) | |
except ValueError: | |
print (f' Provided master key GUID is not valid: {args.master_key_guid}') | |
return | |
with open(args.input_file, "rb") as f: | |
plaintext = f.read() | |
# Parse the master key and GUID | |
masterKey = bytes.fromhex(args.master_key) | |
masterKeyGuid = uuid.UUID(args.master_key_guid) | |
algCrypt = DPAPIBlob.CALG_AES_256 | |
algHash = DPAPIBlob.CALG_SHA_512 | |
flags = 0 | |
if args.local: | |
flags |= 4 # CRYPTPROTECT_LOCAL_MACHINE | |
dpapi = DPAPIBlob() | |
encrypted_blob = dpapi.create_blob(plaintext, masterKey, algCrypt, algHash, masterKeyGuid, flags) | |
if args.base64: | |
output_data = base64.b64encode(encrypted_blob).decode('utf-8') | |
else: | |
output_data = encrypted_blob.hex(' ') | |
print(f" Encrypted BLOB: {output_data}") | |
if args.output: | |
with open(args.output, 'wb') as f: | |
if args.base64: | |
f.write(base64.b64encode(encrypted_blob)) | |
else: | |
f.write(encrypted_blob) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment