Last active
July 23, 2024 16:09
-
-
Save williballenthin/d6bf9f1553d9fa27e0cc6880a6d992b4 to your computer and use it in GitHub Desktop.
bling.py - extract keys from macOS keychains.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
bling.py - extract keys from macOS keychains. | |
installation: | |
pip install pytz hexdump vivisect-vstruct-wb tabulate argparse pycryptodome | |
usage: | |
python bling.py /path/to/keychain-db <password> ./path/to/output/directory | |
references: | |
- https://repo.zenk-security.com/Forensic/Keychain%20Analysis%20with%20Mac%20OS%20X%20Memory%20Forensics.pdf | |
- https://github.com/libyal/dtformats/blob/master/documentation/MacOS%20keychain%20database%20file%20format.asciidoc | |
author: Willi Ballenthin | |
email: [email protected] | |
license: Apache 2.0 | |
''' | |
# TODO: detect invalid password | |
import os | |
import os.path | |
import sys | |
import copy | |
import string | |
import hashlib | |
import logging | |
import binascii | |
import datetime | |
import itertools | |
from pprint import pprint | |
import pytz | |
import hexdump | |
import vstruct | |
from vstruct.primitives import * | |
import tabulate | |
import argparse | |
# from pycryptodome | |
from Crypto.Cipher import DES3 | |
from Crypto.Util.Padding import unpad | |
import Crypto.Protocol.KDF | |
logger = logging.getLogger('osx.bling') | |
class v_greedy_bytes(v_bytes): | |
''' | |
a v_bytes byte array that consumes to the end of the given buffer. | |
''' | |
def vsParse(self, fbytes, offset=0): | |
self._vs_value = fbytes[offset:] | |
return len(fbytes) | |
class RECORD_HEADER(vstruct.VStruct): | |
def __init__(self, attrs): | |
vstruct.VStruct.__init__(self) | |
self.RecordSize = v_uint32(bigend=True) | |
self.RecordNumber = v_uint32(bigend=True) | |
self.unk1 = v_uint32(bigend=True) | |
self.unk2 = v_uint32(bigend=True) | |
self.BlobSize = v_uint32(bigend=True) | |
self.zero = v_uint32(bigend=True) | |
# offset 0x18 | |
self.AttributeOffsets = vstruct.VArray([v_uint32(bigend=True) for _ in range(len(attrs))]) | |
self.blob_data_offset = 0x18 + (4 * len(attrs)) | |
self.BlobData = v_bytes(size=0) | |
self.attribute_data_offset = self.blob_data_offset | |
self.AttributeData = v_bytes(size=0) | |
def pcb_BlobSize(self): | |
self['BlobData'].vsSetLength(int(self.BlobSize)) | |
self['AttributeData'].vsSetLength(int(self.RecordSize) - self.blob_data_offset - int(self.BlobSize)) | |
self.attribute_data_offset = self.blob_data_offset + int(self.BlobSize) | |
class Record: | |
def __init__(self, schema, buf): | |
''' | |
Args: | |
schema: from `Keychain.get_table_schema()`. | |
buf (bytes): buffer to parse for a record. | |
''' | |
# | |
# diagram: | |
# | |
# +-----------+-----------+-----------+-----------+ | |
# | rec size rec index unk1 unk2 | | |
# +-----------+-----------+-----------+-----------+ | |
# | blob size 0x0 | attribute offsets | <-- attributes are declared in schema, | |
# +-----------+-----------+ | record structure based on table. | |
# | | | |
# +-----------+-----------+-----------+-----------+ | |
# | blob data (parsed into "blob") | \ | |
# | | > blob size | |
# | | / | |
# +-----------+-----------+-----------+-----------+ | |
# | attribute data | | |
# | | | |
# | | | |
# +-----------+-----------+-----------+-----------+ <-- rec size | |
# | |
self.buf = buf | |
# this is the generic header, contains record size, record number, etc. | |
self.header = RECORD_HEADER(schema['attrs']) | |
#print(len(schema['attrs'])) | |
#hexdump.hexdump(buf[:0x100]) | |
self.header.vsParse(buf) | |
self.attrs = {} | |
for i, attr_desc in enumerate(schema['attrs']): | |
attr_offset = int(self.header.AttributeOffsets[i]) | |
if attr_offset != 0: | |
# offset == 0 signals the attribute is empty | |
# so shift offsets by 1. | |
# | |
# this offset is relative to the start of the record. | |
attr_offset = attr_offset - 1 | |
attr_buf = buf[attr_offset:] | |
attr = ATTRIBUTE_PARSERS[int(attr_desc['AttributeFormat'])]() | |
attr.vsParse(attr_buf) | |
self.attrs[str(attr_desc['AttributeName'])] = attr | |
self.blob = BLOB_PARSERS[int(schema['RelationID'])]() | |
self.blob.vsParse(self.header.BlobData) | |
CSSM_DL_DB = v_enum() | |
# Schema Management | |
CSSM_DL_DB.SCHEMA_INFO = 0x00000000 # Schema information | |
CSSM_DL_DB.SCHEMA_INDEXES = 0x00000001 # Schema indexes | |
CSSM_DL_DB.SCHEMA_ATTRIBUTES = 0x00000002 # Schema attributes | |
CSSM_DL_DB.SCHEMA_PARSING_MODULE = 0x00000003 # Schema parsing module | |
# Open Group Application | |
CSSM_DL_DB.RECORD_ANY = 0x0000000A # Temporary table type. | |
CSSM_DL_DB.RECORD_CERT = 0x0000000B # Certificates | |
CSSM_DL_DB.RECORD_CRL = 0x0000000C # Certificate Revocation List | |
CSSM_DL_DB.RECORD_POLICY = 0x0000000D # Policy | |
CSSM_DL_DB.RECORD_GENERIC = 0x0000000E # Generic information | |
CSSM_DL_DB.RECORD_PUBLIC_KEY = 0x0000000F # Public key | |
CSSM_DL_DB.RECORD_PRIVATE_KEY = 0x00000010 # Private key | |
CSSM_DL_DB.RECORD_SYMMETRIC_KEY = 0x00000011 # Symmetric key | |
CSSM_DL_DB.RECORD_ALL_KEY = 0x00000012 # Temporary table type | |
# Industry at Large Applications | |
CSSM_DL_DB.RECORD_GENERIC_PASSWORD = 0x80000000 # User credential | |
CSSM_DL_DB.RECORD_INTERNET_PASSWORD = 0x80000001 # User credential on the Internet in particular | |
CSSM_DL_DB.RECORD_APPLESHARE_PASSWORD = 0x80000002 # (Depreciated) | |
CSSM_DL_DB.RECORD_USER_TRUST = 0x80000003 # User-defined certificates | |
CSSM_DL_DB.RECORD_X509_CRL = 0x80000004 # X.509 Certificate Revocation List | |
CSSM_DL_DB.RECORD_UNLOCK_REFERRAL = 0x80000005 # Unlock referral | |
CSSM_DL_DB.RECORD_EXTENDED_ATTRIBUTE = 0x80000006 # Extended attribute for database management | |
CSSM_DL_DB.RECORD_X509_CERTIFICATE = 0x80001000 # X.509 Certificates | |
CSSM_DL_DB.RECORD_METADATA = 0x80008000 # Metadata information | |
class EMPTY_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
def decrypt(self, keychain): | |
return {} | |
class COMMON_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.Magic = v_uint32(bigend=True) | |
self.BlobVersion = v_uint32(bigend=True) | |
def pcb_Magic(self): | |
if self.Magic != 0xfade0711: | |
raise ValueError('invalid COMMON_BLOB magic') | |
class DB_PARAMETERS(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.IdleTimeout = v_uint32(bigend=True) # uint32 | |
self.LockOnSleep = v_uint32(bigend=True) # uint8 | |
class DB_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.CommonBlob = COMMON_BLOB() | |
self.StartCryptoBlob = v_uint32(bigend=True) | |
self.TotalLength = v_uint32(bigend=True) | |
self.RandomSignature = v_bytes(size=0x10) | |
self.Sequence = v_uint32(bigend=True) | |
self.Params = DB_PARAMETERS() | |
self.Salt = v_bytes(size=0x14) | |
self.IV = v_bytes(size=8) | |
self.BlobSignature = v_bytes(size=0x14) | |
self.unk2 = vstruct.VArray([v_uint32(bigend=True) for _ in range(7)]) | |
self.EncryptedDBKey = v_bytes(size=0x30) | |
def decrypt(self, keychain): | |
# magic: number of rounds = 1000 | |
# magic: key size = 24 | |
master_key = Crypto.Protocol.KDF.PBKDF2(keychain.password, self.Salt, count=1000, dkLen=24) | |
des3 = DES3.new(master_key, DES3.MODE_CBC, self.IV) | |
# pkcs#7 padding, 3DES block size (8 bytes) | |
# magic: size of key = 24 bytes | |
db_key = unpad(des3.decrypt(self.EncryptedDBKey), 8)[:24] | |
return { | |
'master_key': master_key, | |
'db_key': db_key, | |
'plaintext': db_key, | |
} | |
class SSGP(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.Magic = v_bytes(size=4) | |
self.Label = v_bytes(size=0x10) | |
def pcb_Magic(self): | |
if self.Magic != b'ssgp': | |
raise ValueError('invalid SSGP header') | |
def parse_ssgp_label(label): | |
''' | |
parse a buffer into an SSGP label and return the id. | |
''' | |
ssgp = SSGP() | |
ssgp.vsParse(label.data) | |
return ssgp.Label | |
class SYMMETRIC_KEY_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.CommonBlob = COMMON_BLOB() | |
self.StartCryptoBlob = v_uint32(bigend=True) | |
self.TotalLength = v_uint32(bigend=True) | |
self.IV = v_bytes(size=8) | |
self.Padding = v_bytes(size=0) | |
self.EncryptedKey = v_bytes(size=0) | |
def pcb_StartCryptoBlob(self): | |
self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x18) | |
def pcb_TotalLength(self): | |
self['EncryptedKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob)) | |
def decrypt(self, keychain): | |
des3a = DES3.new(keychain.db_key, DES3.MODE_CBC, binascii.unhexlify('4adda22c79e82105')) | |
p1 = unpad(des3a.decrypt(self.EncryptedKey), 8) | |
des3b = DES3.new(keychain.db_key, DES3.MODE_CBC, self.IV) | |
# the ciphertext is the first 32 bytes, reversed | |
p2 = unpad(des3b.decrypt(p1[:0x20][::-1]), 8) | |
# example plaintext: | |
# | |
# 00000000: 00 00 00 00 C1 3D 0F F9 CB AC 6D AC D6 40 3A 98 .....=....m..@:. | |
# 00000010: 4B 3C 5C F4 E8 12 F0 3E CB 31 83 6C K<\....>.1.l | |
if len(p2) != 0x1C: | |
raise ValueError('unexpected plaintext length') | |
if p2[:4] != b'\x00\x00\x00\x00': | |
raise ValueError('unexpected plaintext header') | |
return { | |
'plaintext': p2[4:], | |
} | |
class GENERIC_PASSWORD_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.SSGP = SSGP() | |
self.IV = v_bytes(size=0x8) | |
self.EncryptedKey = v_greedy_bytes() | |
def decrypt(self, keychain): | |
keyid = self.SSGP.Label | |
key = keychain.get_symmetric_key(keyid) | |
if self.EncryptedKey: | |
des3 = DES3.new(key, DES3.MODE_CBC, self.IV) | |
plaintext = unpad(des3.decrypt(self.EncryptedKey), 8) | |
return { | |
'plaintext': plaintext | |
} | |
else: | |
# its possible for there to be no encrypted key, | |
# e.g. the BlobSize is 0x1C, which only leaves space for: | |
# SSGP magic | |
# SSGP label | |
# IV | |
# some entries for `Microsoft Office Identities Cache 3` look like this. | |
# TODO: figure out how to interpret this. | |
return {} | |
class INTERNET_PASSWORD_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.SSGP = SSGP() | |
self.IV = v_bytes(size=0x8) | |
self.EncryptedKey = v_greedy_bytes() | |
def decrypt(self, keychain): | |
keyid = self.SSGP.Label | |
key = keychain.get_symmetric_key(keyid) | |
des3 = DES3.new(key, DES3.MODE_CBC, self.IV) | |
plaintext = unpad(des3.decrypt(self.EncryptedKey), 8) | |
return { | |
'plaintext': plaintext | |
} | |
class PUBLIC_KEY_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.CommonBlob = COMMON_BLOB() | |
self.StartCryptoBlob = v_uint32(bigend=True) | |
self.TotalLength = v_uint32(bigend=True) | |
self.Padding = v_bytes(size=0) | |
self.PublicKey = v_bytes(size=0) | |
def pcb_StartCryptoBlob(self): | |
# 0x10 = sizeof(CommonBlob + StartCryptoBlob + TotalLength) | |
self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x10) | |
def pcb_TotalLength(self): | |
self['PublicKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob)) | |
def decrypt(self, keychain): | |
return { | |
'plaintext': self.PublicKey, | |
} | |
class PRIVATE_KEY_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.CommonBlob = COMMON_BLOB() | |
self.StartCryptoBlob = v_uint32(bigend=True) | |
self.TotalLength = v_uint32(bigend=True) | |
self.IV = v_bytes(size=8) | |
self.Padding = v_bytes(size=0) | |
self.EncryptedKey = v_bytes(size=0) | |
def pcb_StartCryptoBlob(self): | |
self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x18) | |
def pcb_TotalLength(self): | |
self['EncryptedKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob)) | |
def decrypt(self, keychain): | |
des3a = DES3.new(keychain.db_key, DES3.MODE_CBC, binascii.unhexlify('4adda22c79e82105')) | |
p1 = unpad(des3a.decrypt(self.EncryptedKey), 8) | |
des3b = DES3.new(keychain.db_key, DES3.MODE_CBC, self.IV) | |
# the ciphertext is the first 32 bytes, reversed | |
p2 = unpad(des3b.decrypt(p1[::-1]), 8) | |
return { | |
'plaintext': p2, | |
} | |
class X509_CERTIFICATE_BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.Certificate = v_greedy_bytes() | |
def decrypt(self, keychain): | |
return { | |
'plaintext': self.Certificate, | |
} | |
BLOB_PARSERS = { | |
# scheam structure is stored in attributes. | |
CSSM_DL_DB.SCHEMA_INFO: EMPTY_BLOB, | |
CSSM_DL_DB.SCHEMA_INDEXES: EMPTY_BLOB, | |
CSSM_DL_DB.SCHEMA_ATTRIBUTES: EMPTY_BLOB, | |
CSSM_DL_DB.SCHEMA_PARSING_MODULE: EMPTY_BLOB, | |
CSSM_DL_DB.RECORD_ANY: NotImplemented, | |
CSSM_DL_DB.RECORD_CERT: NotImplemented, | |
CSSM_DL_DB.RECORD_CRL: NotImplemented, | |
CSSM_DL_DB.RECORD_POLICY: NotImplemented, | |
CSSM_DL_DB.RECORD_GENERIC: NotImplemented, | |
CSSM_DL_DB.RECORD_PUBLIC_KEY: PUBLIC_KEY_BLOB, | |
CSSM_DL_DB.RECORD_PRIVATE_KEY: PRIVATE_KEY_BLOB, | |
CSSM_DL_DB.RECORD_SYMMETRIC_KEY: SYMMETRIC_KEY_BLOB, | |
CSSM_DL_DB.RECORD_ALL_KEY: NotImplemented, | |
CSSM_DL_DB.RECORD_GENERIC_PASSWORD: GENERIC_PASSWORD_BLOB, | |
CSSM_DL_DB.RECORD_INTERNET_PASSWORD: INTERNET_PASSWORD_BLOB, | |
CSSM_DL_DB.RECORD_APPLESHARE_PASSWORD: NotImplemented, | |
CSSM_DL_DB.RECORD_USER_TRUST: NotImplemented, | |
CSSM_DL_DB.RECORD_X509_CRL: NotImplemented, | |
CSSM_DL_DB.RECORD_UNLOCK_REFERRAL: NotImplemented, | |
CSSM_DL_DB.RECORD_EXTENDED_ATTRIBUTE: NotImplemented, | |
CSSM_DL_DB.RECORD_X509_CERTIFICATE: X509_CERTIFICATE_BLOB, | |
CSSM_DL_DB.RECORD_METADATA: DB_BLOB, | |
} | |
CSSM_DB_ATTRIBUTE_FORMAT= v_enum() | |
CSSM_DB_ATTRIBUTE_FORMAT.STRING = 0 | |
CSSM_DB_ATTRIBUTE_FORMAT.SINT32 = 1 | |
CSSM_DB_ATTRIBUTE_FORMAT.UINT32 = 2 | |
CSSM_DB_ATTRIBUTE_FORMAT.BIG_NUM = 3 | |
CSSM_DB_ATTRIBUTE_FORMAT.REAL = 4 | |
CSSM_DB_ATTRIBUTE_FORMAT.TIME_DATE = 5 | |
CSSM_DB_ATTRIBUTE_FORMAT.BLOB = 6 | |
CSSM_DB_ATTRIBUTE_FORMAT.MULTI_UINT32 = 7 | |
CSSM_DB_ATTRIBUTE_FORMAT.COMPLEX = 8 | |
class STRING(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.length = v_uint32(bigend=True) | |
self.data = v_str() | |
def pcb_length(self): | |
self['data'].vsSetLength(int(self.length)) | |
def __str__(self): | |
return str(self.data).rstrip('\x00') | |
def __repr__(self): | |
return repr(self.data).rstrip('\x00') | |
SINT32 = lambda: v_int32(bigend=True) | |
UINT32 = lambda: v_uint32(bigend=True) | |
BIG_NUM = NotImplemented | |
REAL = lambda: v_double(bigend=True) | |
class TIME_DATE(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.data = v_bytes(size=0x10) | |
self._ts = {} | |
@property | |
def ts(self): | |
return self._ts['it'] | |
def pcb_data(self): | |
try: | |
year = int(self.data[0:4]) | |
month = int(self.data[4:6]) | |
day = int(self.data[6:8]) | |
hour = int(self.data[8:10]) | |
min = int(self.data[10:12]) | |
sec = int(self.data[12:14]) | |
except ValueError: | |
self._ts['it'] = datetime.datetime.min | |
return | |
z = self.data[14:16] | |
if z == b'Z\x00': | |
# TODO: set tz | |
self._ts['it'] = datetime.datetime(year, month, day, hour, min, sec, tzinfo=pytz.utc) | |
else: | |
self._ts['it'] = datetime.datetime.min | |
def __repr__(self): | |
return self.ts.isoformat('T') + 'Z' | |
def is_ascii(s): | |
if sys.version_info[0] < 3: | |
return all(c in string.printable for c in s) | |
else: | |
return all(chr(c) in string.printable for c in s) | |
def is_printable(buf): | |
try: | |
s = buf.decode('utf-8').partition('\x00')[0].encode('ascii') | |
except (UnicodeDecodeError, UnicodeEncodeError): | |
return False | |
else: | |
return is_ascii(s) | |
class BLOB(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.length = v_uint32(bigend=True) | |
self.data = v_bytes() | |
def pcb_length(self): | |
self['data'].vsSetLength(int(self.length)) | |
def __str__(self): | |
if is_printable(self.data): | |
return self.data.decode('utf-8').partition('\x00')[0] | |
else: | |
return 'hex:' + binascii.hexlify(self.data).decode('ascii') | |
def __repr__(self): | |
return str(self) | |
MULTI_UINT32 = NotImplemented | |
COMPLEX = NotImplemented | |
ATTRIBUTE_PARSERS = { | |
CSSM_DB_ATTRIBUTE_FORMAT.STRING: STRING, | |
CSSM_DB_ATTRIBUTE_FORMAT.SINT32: SINT32, | |
CSSM_DB_ATTRIBUTE_FORMAT.UINT32: UINT32, | |
CSSM_DB_ATTRIBUTE_FORMAT.BIG_NUM: BIG_NUM, | |
CSSM_DB_ATTRIBUTE_FORMAT.REAL: REAL, | |
CSSM_DB_ATTRIBUTE_FORMAT.TIME_DATE: TIME_DATE, | |
CSSM_DB_ATTRIBUTE_FORMAT.BLOB: BLOB, | |
CSSM_DB_ATTRIBUTE_FORMAT.MULTI_UINT32: MULTI_UINT32, | |
CSSM_DB_ATTRIBUTE_FORMAT.COMPLEX: COMPLEX, | |
} | |
CSSM_KEYCLASS = v_enum() | |
CSSM_KEYCLASS.PUBLIC_KEY = 0x00+0x0F | |
CSSM_KEYCLASS.PRIVATE_KEY = 0x01+0x0F | |
CSSM_KEYCLASS.SESSION_KEY = 0x02+0x0F | |
CSSM_KEYCLASS.SECRET_PART = 0x03+0x0F | |
CSSM_KEYCLASS.OTHER = 0xFFFFFFFF | |
CSSM_ALGID = v_enum() | |
CSSM_ALGID.NONE = 0 | |
CSSM_ALGID.CUSTOM = 1 | |
CSSM_ALGID.DH = 2 | |
CSSM_ALGID.PH = 3 | |
CSSM_ALGID.KEA = 4 | |
CSSM_ALGID.MD2 = 5 | |
CSSM_ALGID.MD4 = 6 | |
CSSM_ALGID.MD5 = 7 | |
CSSM_ALGID.SHA1 = 8 | |
CSSM_ALGID.NHASH = 9 | |
CSSM_ALGID.HAVAL = 10 | |
CSSM_ALGID.RIPEMD = 11 | |
CSSM_ALGID.IBCHASH = 12 | |
CSSM_ALGID.RIPEMAC = 13 | |
CSSM_ALGID.DES = 14 | |
CSSM_ALGID.DESX = 15 | |
CSSM_ALGID.RDES = 16 | |
CSSM_ALGID.THREEDES_3KEY_EDE = 17 | |
CSSM_ALGID.THREEDES_2KEY_EDE = 18 | |
CSSM_ALGID.THREEDES_1KEY_EEE = 19 | |
CSSM_ALGID.THREEDES_3KEY_EEE = 20 | |
CSSM_ALGID.THREEDES_2KEY_EEE = 21 | |
CSSM_ALGID.IDEA = 22 | |
CSSM_ALGID.RC2 = 23 | |
CSSM_ALGID.RC5 = 24 | |
CSSM_ALGID.RC4 = 25 | |
CSSM_ALGID.SEAL = 26 | |
CSSM_ALGID.CAST = 27 | |
CSSM_ALGID.BLOWFISH = 28 | |
CSSM_ALGID.SKIPJACK = 29 | |
CSSM_ALGID.LUCIFER = 30 | |
CSSM_ALGID.MADRYGA = 31 | |
CSSM_ALGID.FEAL = 32 | |
CSSM_ALGID.REDOC = 33 | |
CSSM_ALGID.REDOC3 = 34 | |
CSSM_ALGID.LOKI = 35 | |
CSSM_ALGID.KHUFU = 36 | |
CSSM_ALGID.KHAFRE = 37 | |
CSSM_ALGID.MMB = 38 | |
CSSM_ALGID.GOST = 39 | |
CSSM_ALGID.SAFER = 40 | |
CSSM_ALGID.CRAB = 41 | |
CSSM_ALGID.RSA = 42 | |
CSSM_ALGID.DSA = 43 | |
CSSM_ALGID.MD5WithRSA = 44 | |
CSSM_ALGID.MD2WithRSA = 45 | |
CSSM_ALGID.ElGamal = 46 | |
CSSM_ALGID.MD2Random = 47 | |
CSSM_ALGID.MD5Random = 48 | |
CSSM_ALGID.SHARandom = 49 | |
CSSM_ALGID.DESRandom = 50 | |
CSSM_ALGID.SHA1WithRSA = 51 | |
CSSM_ALGID.CDMF = 52 | |
CSSM_ALGID.CAST3 = 53 | |
CSSM_ALGID.CAST5 = 54 | |
CSSM_ALGID.GenericSecret = 55 | |
CSSM_ALGID.ConcatBaseAndKey = 56 | |
CSSM_ALGID.ConcatKeyAndBase = 57 | |
CSSM_ALGID.ConcatBaseAndData = 58 | |
CSSM_ALGID.ConcatDataAndBase = 59 | |
CSSM_ALGID.XORBaseAndData = 60 | |
CSSM_ALGID.ExtractFromKey = 61 | |
CSSM_ALGID.SSL3PreMasterGen = 62 | |
CSSM_ALGID.SSL3MasterDerive = 63 | |
CSSM_ALGID.SSL3KeyAndMacDerive = 64 | |
CSSM_ALGID.SSL3MD5_MAC = 65 | |
CSSM_ALGID.SSL3SHA1_MAC = 66 | |
CSSM_ALGID.PKCS5_PBKDF1_MD5 = 67 | |
CSSM_ALGID.PKCS5_PBKDF1_MD2 = 68 | |
CSSM_ALGID.PKCS5_PBKDF1_SHA1 = 69 | |
CSSM_ALGID.WrapLynks = 70 | |
CSSM_ALGID.WrapSET_OAEP = 71 | |
CSSM_ALGID.BATON = 72 | |
CSSM_ALGID.ECDSA = 73 | |
CSSM_ALGID.MAYFLY = 74 | |
CSSM_ALGID.JUNIPER = 75 | |
CSSM_ALGID.FASTHASH = 76 | |
CSSM_ALGID.THREEDES = 77 | |
CSSM_ALGID.SSL3MD5 = 78 | |
CSSM_ALGID.SSL3SHA1 = 79 | |
CSSM_ALGID.FortezzaTimestamp = 80 | |
CSSM_ALGID.SHA1WithDSA = 81 | |
CSSM_ALGID.SHA1WithECDSA = 82 | |
CSSM_ALGID.DSA_BSAFE = 83 | |
CSSM_ALGID.ECDH = 84 | |
CSSM_ALGID.ECMQV = 85 | |
CSSM_ALGID.PKCS12_SHA1_PBE = 86 | |
CSSM_ALGID.ECNRA = 87 | |
CSSM_ALGID.SHA1WithECNRA = 88 | |
CSSM_ALGID.ECES = 89 | |
CSSM_ALGID.ECAES = 90 | |
CSSM_ALGID.SHA1HMAC = 91 | |
CSSM_ALGID.FIPS186Random = 92 | |
CSSM_ALGID.ECC = 93 | |
CSSM_ALGID.MQV = 94 | |
CSSM_ALGID.NRA = 95 | |
CSSM_ALGID.IntelPlatformRandom = 96 | |
CSSM_ALGID.UTC = 97 | |
CSSM_ALGID.HAVAL3 = 98 | |
CSSM_ALGID.HAVAL4 = 99 | |
CSSM_ALGID.HAVAL5 = 100 | |
CSSM_ALGID.TIGER = 101 | |
CSSM_ALGID.MD5HMAC = 102 | |
CSSM_ALGID.PKCS5_PBKDF2 = 103 | |
CSSM_ALGID.RUNNING_COUNTER = 104 | |
class TABLE_HEADER(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.TableSize = v_uint32(bigend=True) | |
self.TableId = v_uint32(bigend=True, enum=CSSM_DL_DB) | |
# number of offset entries with LSB not set (valid offset) | |
# (contrast to `TotalRowCount`) | |
self.AllocatedRowCount = v_uint32(bigend=True) | |
self.Records = v_uint32(bigend=True) | |
self.IndexesOffset = v_uint32(bigend=True) | |
self.FreeListHead = v_uint32(bigend=True) | |
# total number of offset entries | |
# (contrast to `AllocatedRowCount`) | |
self.TotalRowCount = v_uint32(bigend=True) | |
self.RecordOffsets = vstruct.VArray() | |
def pcb_TotalRowCount(self): | |
for _ in range(self.TotalRowCount): | |
self.RecordOffsets.vsAddElement(v_uint32(bigend=True)) | |
class Table: | |
def __init__(self, db, buf): | |
''' | |
Args: | |
db (Database): the database that owns this table. | |
buf (bytes): the data to parse for this table. | |
''' | |
self.db = db | |
self.buf = buf | |
self.header = TABLE_HEADER() | |
self.header.vsParse(buf) | |
def get_records(self): | |
logger.debug('get_records for %s, %d rows total, %d rows allocated', | |
CSSM_DL_DB.vsReverseMapping(int(self.header.TableId)), | |
self.header.TotalRowCount, | |
self.header.AllocatedRowCount) | |
schema = self.db.get_table_schema(int(self.header.TableId)) | |
for i in range(self.header.TotalRowCount): | |
record_offset = int(self.header.RecordOffsets[i]) | |
if record_offset & 0b1 > 0: | |
# if LSB is set, then record is invalid/unallocated | |
continue | |
if record_offset == 0x0: | |
continue | |
record_length = struct.unpack('>I', self.buf[record_offset:record_offset+4].tobytes())[0] | |
if record_length == 0x0: | |
continue | |
record_buf = self.buf[record_offset:record_offset+record_length] | |
if isinstance(self.buf, memoryview): | |
record_buf = self.buf[record_offset:record_offset+record_length].tobytes() | |
try: | |
record = Record(schema, record_buf) | |
except ValueError as e: | |
logger.warning('failed to parse record (table: %s, record: %s): %s', self.header.TableId, i, e) | |
continue | |
yield record | |
class APPL_DB_SCHEMA(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.SchemaSize = v_uint32(bigend=True) | |
self.TableCount = v_uint32(bigend=True) | |
self.TableOffsets = vstruct.VArray() | |
def pcb_TableCount(self): | |
for _ in range(self.TableCount): | |
self.TableOffsets.vsAddElement(v_uint32(bigend=True)) | |
# via: http://mirror.informatimago.com/next/developer.apple.com/documentation/Security/Reference/keychainservices/Reference/reference.html | |
SecItemAttr = v_enum() | |
SecItemAttr.CreationDate = struct.unpack('>I', struct.pack('>4s', b'cdat'))[0] | |
SecItemAttr.ModDate = struct.unpack('>I', struct.pack('>4s', b'mdat'))[0] | |
SecItemAttr.Description = struct.unpack('>I', struct.pack('>4s', b'desc'))[0] | |
SecItemAttr.Comment = struct.unpack('>I', struct.pack('>4s', b'icmt'))[0] | |
SecItemAttr.Creator = struct.unpack('>I', struct.pack('>4s', b'crtr'))[0] | |
SecItemAttr.Type = struct.unpack('>I', struct.pack('>4s', b'type'))[0] | |
SecItemAttr.ScriptCode = struct.unpack('>I', struct.pack('>4s', b'scrp'))[0] | |
SecItemAttr.Label = struct.unpack('>I', struct.pack('>4s', b'labl'))[0] | |
SecItemAttr.Invisible = struct.unpack('>I', struct.pack('>4s', b'invi'))[0] | |
SecItemAttr.Negative = struct.unpack('>I', struct.pack('>4s', b'nega'))[0] | |
SecItemAttr.CustomIcon = struct.unpack('>I', struct.pack('>4s', b'cusi'))[0] | |
SecItemAttr.Account = struct.unpack('>I', struct.pack('>4s', b'acct'))[0] | |
SecItemAttr.Service = struct.unpack('>I', struct.pack('>4s', b'svce'))[0] | |
SecItemAttr.Generic = struct.unpack('>I', struct.pack('>4s', b'gena'))[0] | |
SecItemAttr.SecurityDomain = struct.unpack('>I', struct.pack('>4s', b'sdmn'))[0] | |
SecItemAttr.Server = struct.unpack('>I', struct.pack('>4s', b'srvr'))[0] | |
SecItemAttr.AuthenticationType = struct.unpack('>I', struct.pack('>4s', b'atyp'))[0] | |
SecItemAttr.Port = struct.unpack('>I', struct.pack('>4s', b'port'))[0] | |
SecItemAttr.Path = struct.unpack('>I', struct.pack('>4s', b'path'))[0] | |
SecItemAttr.Volume = struct.unpack('>I', struct.pack('>4s', b'vlme'))[0] | |
SecItemAttr.Address = struct.unpack('>I', struct.pack('>4s', b'addr'))[0] | |
SecItemAttr.Signature = struct.unpack('>I', struct.pack('>4s', b'ssig'))[0] | |
SecItemAttr.Protocol = struct.unpack('>I', struct.pack('>4s', b'ptcl'))[0] | |
SecItemAttr.CertificateType = struct.unpack('>I', struct.pack('>4s', b'ctyp'))[0] | |
SecItemAttr.CertificateEncoding = struct.unpack('>I', struct.pack('>4s', b'cenc'))[0] | |
SecItemAttr.CrlType = struct.unpack('>I', struct.pack('>4s', b'crtp'))[0] | |
SecItemAttr.CrlEncoding = struct.unpack('>I', struct.pack('>4s', b'crnc'))[0] | |
SecItemAttr.Alias = struct.unpack('>I', struct.pack('>4s', b'alis'))[0] | |
CSSM_CERT = v_enum() | |
CSSM_CERT.UNKNOWN = 0x00 | |
CSSM_CERT.X_509v1 = 0x01 | |
CSSM_CERT.X_509v2 = 0x02 | |
CSSM_CERT.X_509v3 = 0x03 | |
CSSM_CERT.PGP = 0x04 | |
CSSM_CERT.SPKI = 0x05 | |
CSSM_CERT.SDSIv1 = 0x06 | |
CSSM_CERT.Intel = 0x08 | |
CSSM_CERT.X_509_ATTRIBUTE = 0x09 | |
CSSM_CERT.X9_ATTRIBUTE = 0x0A | |
CSSM_CERT.ACL_ENTRY = 0x0C | |
CSSM_CERT.MULTIPLE = 0x7FFE | |
CSSM_CERT.LAST = 0x7FFF | |
CSSM_CERT.CUSTOM = 0x8000 | |
CSSM_CERT_ENCODING = v_enum() | |
CSSM_CERT_ENCODING.UNKNOWN = 0x00 | |
CSSM_CERT_ENCODING.CUSTOM = 0x01 | |
CSSM_CERT_ENCODING.BER = 0x02 | |
CSSM_CERT_ENCODING.DER = 0x03 | |
CSSM_CERT_ENCODING.NDR = 0x04 | |
CSSM_CERT_ENCODING.SEXPR = 0x05 | |
CSSM_CERT_ENCODING.PGP = 0x06 | |
CSSM_CERT_ENCODING.MULTIPLE = 0x7FFE | |
CSSM_CERT_ENCODING.LAST = 0x7FFF | |
SecAuthenticationType = v_enum() | |
SecAuthenticationType.NTLM = b'ntlm' | |
SecAuthenticationType.MSN = b'msna' | |
SecAuthenticationType.DPA = b'dpaa' | |
SecAuthenticationType.RPA = b'rpaa' | |
SecAuthenticationType.HTTPBasic = b'http' | |
SecAuthenticationType.HTTPDigest = b'httd' | |
SecAuthenticationType.HTMLForm = b'form' | |
SecAuthenticationType.Default = b'dflt' | |
SecAuthenticationType.Any = b'\x00\x00\x00\x00' | |
SecProtocolType = v_enum() | |
SecProtocolType.FTP = struct.unpack('>I', struct.pack('>4s', b'ftp '))[0] | |
SecProtocolType.FTPAccount = struct.unpack('>I', struct.pack('>4s', b'ftpa'))[0] | |
SecProtocolType.HTTP = struct.unpack('>I', struct.pack('>4s', b'http'))[0] | |
SecProtocolType.IRC = struct.unpack('>I', struct.pack('>4s', b'irc '))[0] | |
SecProtocolType.NNTP = struct.unpack('>I', struct.pack('>4s', b'nntp'))[0] | |
SecProtocolType.POP3 = struct.unpack('>I', struct.pack('>4s', b'pop3'))[0] | |
SecProtocolType.SMTP = struct.unpack('>I', struct.pack('>4s', b'smtp'))[0] | |
SecProtocolType.SOCKS = struct.unpack('>I', struct.pack('>4s', b'sox '))[0] | |
SecProtocolType.IMAP = struct.unpack('>I', struct.pack('>4s', b'imap'))[0] | |
SecProtocolType.LDAP = struct.unpack('>I', struct.pack('>4s', b'ldap'))[0] | |
SecProtocolType.AppleTalk = struct.unpack('>I', struct.pack('>4s', b'atlk'))[0] | |
SecProtocolType.AFP = struct.unpack('>I', struct.pack('>4s', b'afp '))[0] | |
SecProtocolType.Telnet = struct.unpack('>I', struct.pack('>4s', b'teln'))[0] | |
SecProtocolType.SSH = struct.unpack('>I', struct.pack('>4s', b'ssh '))[0] | |
SecProtocolType.FTPS = struct.unpack('>I', struct.pack('>4s', b'ftps'))[0] | |
SecProtocolType.HTTPS = struct.unpack('>I', struct.pack('>4s', b'htps'))[0] | |
SecProtocolType.HTTPProxy = struct.unpack('>I', struct.pack('>4s', b'htpx'))[0] | |
SecProtocolType.HTTPSProxy = struct.unpack('>I', struct.pack('>4s', b'htsx'))[0] | |
SecProtocolType.FTPProxy = struct.unpack('>I', struct.pack('>4s', b'ftpx'))[0] | |
SecProtocolType.CIFS = struct.unpack('>I', struct.pack('>4s', b'cifs'))[0] | |
SecProtocolType.SMB = struct.unpack('>I', struct.pack('>4s', b'smb '))[0] | |
SecProtocolType.RTSP = struct.unpack('>I', struct.pack('>4s', b'rtsp'))[0] | |
SecProtocolType.RTSPProxy = struct.unpack('>I', struct.pack('>4s', b'rtsx'))[0] | |
SecProtocolType.DAAP = struct.unpack('>I', struct.pack('>4s', b'daap'))[0] | |
SecProtocolType.EPPC = struct.unpack('>I', struct.pack('>4s', b'eppc'))[0] | |
SecProtocolType.IPP = struct.unpack('>I', struct.pack('>4s', b'ipp '))[0] | |
SecProtocolType.NNTPS = struct.unpack('>I', struct.pack('>4s', b'ntps'))[0] | |
SecProtocolType.LDAPS = struct.unpack('>I', struct.pack('>4s', b'ldps'))[0] | |
SecProtocolType.TelnetS = struct.unpack('>I', struct.pack('>4s', b'tels'))[0] | |
SecProtocolType.IMAPS = struct.unpack('>I', struct.pack('>4s', b'imps'))[0] | |
SecProtocolType.IRCS = struct.unpack('>I', struct.pack('>4s', b'ircs'))[0] | |
SecProtocolType.POP3S = struct.unpack('>I', struct.pack('>4s', b'pops'))[0] | |
SecProtocolType.CVSpserver = struct.unpack('>I', struct.pack('>4s', b'cvsp'))[0] | |
SecProtocolType.CVSpserver = struct.unpack('>I', struct.pack('>4s', b'svn '))[0] | |
SecProtocolType.AdiumMessenger = struct.unpack('>I', struct.pack('>4s', b'AdIM'))[0] | |
SecProtocolType.Any = struct.unpack('>I', struct.pack('>4s', b'\x00\x00\x00\x00'))[0] | |
class Database: | |
def __init__(self, buf): | |
self.buf = buf | |
self.schema = APPL_DB_SCHEMA() | |
self.schema.vsParse(buf) | |
tables = [ | |
self._get_table_by_index(i) | |
for i in range(self.schema.TableCount) | |
] | |
self.tables = {} | |
for table in tables: | |
if table.header.TableId in self.tables: | |
raise ValueError("dupliate tables with id: " + hex(table.header.TableId)) | |
self.tables[table.header.TableId] = table | |
def _get_table_by_index(self, index): | |
table_offset = self.schema.TableOffsets[index] | |
table_buf = self.buf[table_offset:] | |
return Table(self, table_buf) | |
def select(self, table, record_index=None, limit=sys.maxsize): | |
table = self.tables[table] | |
for r in itertools.islice(table.get_records(), min(limit, table.header.TotalRowCount)): | |
yield r | |
def get_table_schema(self, table): | |
# the schema is self-describing, so you can inspect the schema itself. | |
# we provide hardcoded definitions of these tables to bootstrap the schema. | |
logger.debug('fetching schema for 0x%x', table) | |
if table == CSSM_DL_DB.SCHEMA_INFO: | |
return {'RelationID': CSSM_DL_DB.SCHEMA_INFO, | |
'RelationName': 'CSSM_DL_DB_SCHEMA_INFO', | |
'attrs': [{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, | |
'AttributeID': 0, | |
'AttributeName': 'RelationID', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_INFO}, | |
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'AttributeID': 1, | |
'AttributeName': 'RelationName', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_INFO}]} | |
elif table == CSSM_DL_DB.SCHEMA_ATTRIBUTES: | |
return {'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES, | |
'RelationName': 'CSSM_DL_DB_SCHEMA_ATTRIBUTES', | |
'attrs': [{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, | |
'AttributeID': 0, | |
'AttributeName': 'RelationID', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, | |
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, | |
'AttributeID': 1, | |
'AttributeName': 'AttributeID', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, | |
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, | |
'AttributeID': 2, | |
'AttributeName': 'AttributeNameFormat', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, | |
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'AttributeID': 3, | |
'AttributeName': 'AttributeName', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, | |
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.BLOB, | |
'AttributeID': 4, | |
'AttributeName': 'AttributeNameID', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, | |
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, | |
'AttributeID': 5, | |
'AttributeName': 'AttributeFormat', | |
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, | |
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}]} | |
for r in self.select(CSSM_DL_DB.SCHEMA_INFO): | |
if int(r.attrs['RelationID']) != table: | |
continue | |
attrs = [] | |
for a in self.select(CSSM_DL_DB.SCHEMA_ATTRIBUTES): | |
if int(a.attrs['RelationID']) != int(r.attrs['RelationID']): | |
continue | |
if 'AttributeName' not in a.attrs: | |
a.attrs['AttributeName'] = SecItemAttr.vsReverseMapping(int(a.attrs['AttributeID']), default='Unknown') | |
else: | |
a.attrs['AttributeName'] = str(a.attrs['AttributeName']) | |
attrs.append(a.attrs) | |
ret = copy.copy(r.attrs) | |
ret['attrs'] = attrs | |
return ret | |
raise KeyError('failed to find table') | |
class APPL_DB_HEADER(vstruct.VStruct): | |
def __init__(self): | |
vstruct.VStruct.__init__(self) | |
self.Signature = v_bytes(size=4) | |
self.MajorVersion = v_uint16() | |
self.MinorVersion = v_uint16() | |
self.HeaderSize = v_uint32(bigend=True) | |
self.SchemaOffset = v_uint32(bigend=True) | |
self.AuthOffset = v_uint32(bigend=True) | |
def pcb_Signature(self): | |
if self.Signature != b'kych': | |
raise ValueError('invalid header signature') | |
def pcb_Version(self): | |
if self.Version != 0x100: | |
raise ValueError('unsupported version') | |
class Keychain: | |
def __init__(self, buf, password): | |
self.buf = buf | |
self.password = password | |
self.header = APPL_DB_HEADER() | |
self.header.vsParse(buf) | |
self.db = Database(buf[self.header.SchemaOffset:]) | |
keys = self.get_master_keys() | |
self.master_key = keys['master_key'] | |
self.db_key = keys['db_key'] | |
self.symmetric_keys = { | |
parse_ssgp_label(key['attrs']['Label']): key['plaintext'] | |
for key in self.get_symmetric_keys() | |
} | |
def get_decrypted_rows(self, table): | |
for record in self.db.select(table): | |
key = { | |
'attrs': copy.copy(record.attrs), | |
} | |
key.update(record.blob.decrypt(self)) | |
yield key | |
def get_master_keys(self): | |
# index zero seems to be a magic constant. | |
return next(self.get_decrypted_rows(CSSM_DL_DB.RECORD_METADATA)) | |
def get_symmetric_keys(self): | |
for key in self.get_decrypted_rows(CSSM_DL_DB.RECORD_SYMMETRIC_KEY): | |
yield key | |
def get_symmetric_key(self, keyid): | |
return self.symmetric_keys[keyid] | |
# these are the names of attributes that should be rendered as a boolean (true/false) | |
# this list is collected empirically, not from any database metadata. | |
BOOL_ATTRIBUTES = { | |
'Permanent', | |
'Private', | |
'Modifiable', | |
'Sensitive', | |
'AlwaysSensitive', | |
'Extractable', | |
'NeverExtractable', | |
'Encrypt', | |
'Decrypt', | |
'Derive', | |
'Sign', | |
'Verify', | |
'SignRecover', | |
'VerifyRecover', | |
'Wrap', | |
'Unwrap', | |
'Invisible', | |
} | |
def render_cell(attr_name, attr_value): | |
if attr_value == '': | |
return '' | |
elif attr_name in BOOL_ATTRIBUTES: | |
if bool(int(attr_value)): | |
return 'true' | |
else: | |
return 'false' | |
elif attr_name == 'KeyClass': | |
return CSSM_KEYCLASS.vsReverseMapping(int(attr_value)) | |
elif attr_name == 'KeyType': | |
return CSSM_ALGID.vsReverseMapping(int(attr_value)) | |
elif attr_name == 'CertType': | |
return CSSM_CERT.vsReverseMapping(int(attr_value)) | |
elif attr_name == 'CertEncoding': | |
return CSSM_CERT_ENCODING.vsReverseMapping(int(attr_value)) | |
elif attr_name == 'AuthenticationType': | |
return SecAuthenticationType.vsReverseMapping(attr_value.data) | |
elif attr_name == 'Protocol': | |
return SecProtocolType.vsReverseMapping(int(attr_value)) | |
elif attr_name == 'Port': | |
return str(attr_value) | |
elif isinstance(attr_value, v_number): | |
return hex(attr_value).rstrip('L') | |
else: | |
return str(attr_value) | |
def render_plaintext(outdir, plaintext): | |
if is_printable(plaintext) and len(plaintext) < 64: | |
return plaintext.decode('ascii') | |
else: | |
md5 = hashlib.md5() | |
md5.update(plaintext) | |
outpath = os.path.join(outdir, 'binary', md5.hexdigest()) | |
logger.debug('writing binary blob to file %s', outpath) | |
with open(outpath, 'wb') as f: | |
f.write(plaintext) | |
return 'file://' + os.path.join('binary', md5.hexdigest()) | |
def render_table(keychain, table, outdir): | |
logger.debug('rendering table %s', CSSM_DL_DB.vsReverseMapping(int(table))) | |
schema = keychain.db.get_table_schema(table) | |
rows = [] | |
has_plaintext = any(map(lambda r: 'plaintext' in r, keychain.get_decrypted_rows(table))) | |
for i, row in enumerate(keychain.get_decrypted_rows(table)): | |
logger.debug('table %s row %d', CSSM_DL_DB.vsReverseMapping(int(table)), i) | |
cells = [render_cell(attr['AttributeName'], | |
row['attrs'].get(attr['AttributeName'], '')) | |
for attr in schema['attrs']] | |
if has_plaintext: | |
cells.append(render_plaintext(outdir, row.get('plaintext', b''))) | |
rows.append(cells) | |
headers = [attr['AttributeName'] for attr in schema['attrs']] | |
if has_plaintext: | |
headers.append('plaintext') | |
return tabulate.tabulate( | |
rows, | |
headers=headers, | |
) | |
def write_keychain_report(keychain, outdir): | |
binary_dir = os.path.join(outdir, 'binary') | |
logger.info('writing binary blobs into directory %s', binary_dir) | |
if not os.path.exists(binary_dir): | |
os.makedirs(binary_dir) | |
report_path = os.path.join(outdir, 'report.txt') | |
logger.info('writing report into file %s', report_path) | |
with open(report_path, 'wb') as f: | |
for tableid in sorted(keychain.db.tables.keys()): | |
try: | |
f.write(('%s TABLE %s %s\n' % ('#' * 20, CSSM_DL_DB.vsReverseMapping(int(tableid)), '#' * 20)).encode('utf-8')) | |
f.write(render_table(keychain, tableid, outdir).encode('utf-8')) | |
f.write('\n'.encode('utf-8')) | |
except TypeError: | |
logger.warn('table not supported: %s (submit to Willi for testing)' % (CSSM_DL_DB.vsReverseMapping(int(tableid)))) | |
except Exception as e: | |
logger.warn('failed to render table %s: %s' % (CSSM_DL_DB.vsReverseMapping(int(tableid)), e)) | |
return None | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv[1:] | |
parser = argparse.ArgumentParser(description="extract keys from macOS keychains") | |
parser.add_argument("keychain", type=str, | |
help="Path to input keychain file") | |
parser.add_argument("password", type=str, | |
help="Keychain password") | |
parser.add_argument("output_directory", type=str, | |
help="Path into which to write binary data") | |
parser.add_argument("-v", "--verbose", action="store_true", | |
help="Enable debug logging") | |
parser.add_argument("-q", "--quiet", action="store_true", | |
help="Disable all output but errors") | |
args = parser.parse_args(args=argv) | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) | |
logging.getLogger().setLevel(logging.DEBUG) | |
elif args.quiet: | |
logging.basicConfig(level=logging.ERROR) | |
logging.getLogger().setLevel(logging.ERROR) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger().setLevel(logging.INFO) | |
if not os.path.exists(args.output_directory): | |
os.makedirs(args.output_directory) | |
with open(args.keychain, 'rb') as f: | |
buf = memoryview(f.read()) | |
keychain = Keychain(buf, args.password) | |
write_keychain_report(keychain, args.output_directory) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Quiet a long script - would be nice as a multi-file gist or repo.
BTW the convention I've seen is passing args to main instead of getting from inside it.
Also I don't see the benefit of your returning
0
and then doingsys.exit(0)
. And there are no other return statements in main. So you can just leave out the return and the exit - the default/implied behavior will bereturn None
and exit with zero.