Last active
June 24, 2022 15:32
-
-
Save williballenthin/600a3898f43b7ad3f8aa4a5f4156941d to your computer and use it in GitHub Desktop.
extract entries from the osx sticky database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
parse osx sticky databases. | |
author: Willi Ballenthin <[email protected]> | |
license: Apache 2.0 | |
usage: | |
$ python extract_stickies.py /path/to/input.bin /path/to/output/directory/ | |
''' | |
import re | |
import sys | |
import struct | |
import hashlib | |
import logging | |
import os.path | |
import datetime | |
logger = logging.getLogger('osx.stickydatabase') | |
logging.basicConfig(level=logging.INFO) | |
# recovered empirically | |
STREAMTYPED_HEADER = b'\x04\x0Bstreamtyped' | |
def md5(buf): | |
m = hashlib.md5() | |
m.update(buf) | |
return m.hexdigest() | |
def carve_databases(buf): | |
''' | |
carve sticky databases from the given binary data. | |
assume the databases are separated by the header, or until end of file. | |
''' | |
if not buf.startswith(STREAMTYPED_HEADER): | |
# scan forward until the first header | |
_, _, buf = buf.partition(STREAMTYPED_HEADER) | |
else: | |
buf = buf[len(STREAMTYPED_HEADER):] | |
while buf: | |
db, _, buf = buf.partition(STREAMTYPED_HEADER) | |
db = STREAMTYPED_HEADER + db | |
yield db | |
def read_u32(buf, offset): | |
v = struct.unpack_from('<I', buf, offset)[0] | |
return v, offset + 4 | |
def read_bytes(buf, offset, size): | |
return buf[offset:offset+size], offset + size | |
def read_str(buf, offset): | |
# layout: | |
# | |
# length u32 | |
# s [char] | |
size, offset = read_u32(buf, offset) | |
s, offset = read_bytes(buf, offset, size) | |
return s.decode('utf-8'), offset | |
def read_dict(buf, offset): | |
# layout: | |
# | |
# count u32 | |
# keys [str] | |
# value lengths [u32] | |
# values [value] | |
keys = [] | |
sizes = [] | |
values = [] | |
count, offset = read_u32(buf, offset) | |
logger.debug('dict: found %d items', count) | |
for _ in range(count): | |
key, offset = read_str(buf, offset) | |
logger.debug('dict: read key: %s', key) | |
keys.append(key) | |
for _ in range(count): | |
size, offset = read_u32(buf, offset) | |
logger.debug('dict: read size: 0x%x', size) | |
sizes.append(size) | |
for i in range(count): | |
value, offset = read_bytes(buf, offset, sizes[i]) | |
logger.debug('dict: read value, 0x%x bytes', len(value)) | |
values.append(value) | |
return {keys[i]: values[i] for i in range(count)}, offset | |
def read_buffer(buf, offset): | |
size, offset = read_u32(buf, offset) | |
if size == 0x80000000: | |
# the buffer has some padding first. | |
# | |
# layout: | |
# | |
# flags u32 == 0x80000000 | |
# size u32 | |
# padding-size u32 | |
# padding [u8] | |
# b [u8] | |
flags = size | |
size, offset = read_u32(buf, offset) | |
padsize, offset = read_u32(buf, offset) | |
padding = buf[offset:offset+padsize] | |
offset += padsize | |
b = buf[offset:offset+size] | |
offset += size | |
return b, offset | |
else: | |
# buffer is directly inline | |
# | |
# layout: | |
# | |
# size u32 | |
# b [u8] | |
b, offset = read_bytes(buf, offset, size) | |
return b, offset | |
def read_value(buf, offset): | |
# i'm not 100% confident on these interpretations. | |
# but, seems to make sense for right now. | |
tag, offset = read_u32(buf, offset) | |
if tag == 0x01: | |
logger.debug('value: found buffer') | |
return read_buffer(buf, offset) | |
elif tag == 0x03: | |
logger.debug('value: found dict') | |
return read_dict(buf, offset) | |
else: | |
raise NotImplementedError('value type: %08x' % tag) | |
def parse_sticky(buf): | |
# example of header:: | |
# | |
# 08 5B 31 30 37 30 31 63 5D 72 74 66 64 | |
# . [ 1 0 7 0 1 c ] r t f d | |
# | ------ name ---------- -- magic - | |
# | 1 2 3 4 5 6 7 8 | |
# +-----------------------^ | |
namelen = buf[0] | |
if sys.version_info[0] < 3: | |
namelen = ord(namelen) | |
header_descriptor = '<b%ds4sI' % (namelen) | |
header_size = struct.calcsize(header_descriptor) | |
header = buf[:header_size] | |
body = buf[header_size:] | |
namelen, name, magic, zero = struct.unpack(header_descriptor, header) | |
name = name.decode('ascii') | |
assert magic == b'rtfd' | |
assert zero == 0x0 | |
logger.info('found sticky: %s', name) | |
# the top level object is a value (specifically, a dict) | |
sticky, offset = read_value(body, 0x0) | |
assert isinstance(sticky, dict) | |
# all the value objects are values. | |
# in the case of files (name != '.'), then this is file content (buffer). | |
# for the metadata file (name == '.'), then this is a dict serialized into a buffer. | |
for key in sticky.keys(): | |
sticky[key], offset = read_value(sticky[key], 0x0) | |
# unwrap the metadata | |
sticky['.'], _ = read_dict(sticky['.'], 0x0) | |
# the metadata file contains a mapping from filename to timestamp (time_t) | |
for filename, buf in sticky['.'].items(): | |
# 0D F0 29 54 B6 01 00 00 00 00 00 00 00 00 00 00 | |
# ^^ ^^ ^^ ^^ ?? ?? | |
# time_t unk | |
q = struct.unpack_from('<I', buf, 0x0)[0] | |
try: | |
ts = datetime.datetime.fromtimestamp(q) | |
except (OSError, ValueError): | |
ts = datetime.datetime.min | |
sticky['.'][filename] = ts | |
for filename, buf in sticky.items(): | |
if filename == '.': | |
continue | |
logger.info('found file: %s timestamp: %s', filename, sticky['.'][filename].isoformat('T')) | |
return { | |
'name': name, | |
'stickies': { | |
filename: { | |
'buf': sticky[filename], | |
'ts': sticky['.'][filename], | |
} | |
for filename in sticky.keys() | |
if filename != '.' | |
} | |
} | |
def carve_stickies(buf): | |
for match in re.finditer(b'(.)\[([0-9a-f]+)\]rtfd', buf): | |
start = match.span()[0] | |
try: | |
yield parse_sticky(buf[start:]) | |
except Exception: | |
logger.warning('failed to parse sticky', exc_info=True) | |
outdir = sys.argv[2] | |
with open(sys.argv[1], 'rb') as f: | |
buf = f.read() | |
for i, db in enumerate(carve_databases(buf)): | |
logger.debug('found database, size: 0x%08x bytes, hash: %s', len(db), md5(db)) | |
dbdir = os.path.join(outdir, 'database-%d' % i) | |
logger.info('creating database directory: %s', dbdir) | |
try: | |
os.makedirs(dbdir) | |
except: | |
pass | |
with open(os.path.join(dbdir, 'metadata.txt'), 'wb') as f: | |
f.write(('input file: %s\n' % (sys.argv[1])).encode('utf-8')) | |
f.write(('input md5: %s\n' % (md5(buf))).encode('ascii')) | |
f.write(('recovered database index: %d\n' % (i)).encode('ascii')) | |
f.write(('recovered database md5: %s\n' % (md5(db))).encode('ascii')) | |
for sticky in carve_stickies(db): | |
stickydir = os.path.join(dbdir, 'sticky-' + sticky['name'].strip('[]')) | |
logger.info('creating sticky directory: %s', stickydir) | |
try: | |
os.makedirs(stickydir) | |
except: | |
pass | |
with open(os.path.join(stickydir, 'metadata.txt'), 'wb') as f: | |
f.write(('input file: %s\n' % (sys.argv[1])).encode('ascii')) | |
f.write(('input md5: %s\n' % (md5(buf))).encode('ascii')) | |
f.write(('recovered database index: %d\n' % (i)).encode('ascii')) | |
f.write(('recovered database md5: %s\n' % (md5(db))).encode('ascii')) | |
for filename, s in sticky['stickies'].items(): | |
f.write(('recovered filename: %s\n' % (filename)).encode('utf-8')) | |
f.write(('recovered timestamp for %s: %s\n' % (filename, s['ts'].isoformat('T'))).encode('utf-8')) | |
f.write(('recovered md5 for %s: %s\n' % (filename, md5(s['buf']))).encode('utf-8')) | |
with open(os.path.join(stickydir, filename), 'wb') as g: | |
g.write(s['buf']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment