Last active
December 17, 2021 17:01
-
-
Save brianv0/35f36a32366a2c34be8d to your computer and use it in GitHub Desktop.
iOS messages html and json dump with file copying
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import json | |
import sys | |
import os | |
import datetime | |
import codecs | |
import sqlite3 | |
import re | |
import struct | |
import sha | |
import shutil | |
from dateutil import parser | |
from dateutil.tz import tzlocal, tzutc | |
dirname = sys.argv[1] | |
targetdir = sys.argv[2] | |
messages_db = '3d0d7e5fb2ce288813306e4d4636395e047a3d28' | |
contacts_db = '31bb7ba8914766d4ba40d6dfb6113c8b614be442' | |
mbdb = "Manifest.mbdb" | |
COPY_FILES = True | |
conn = sqlite3.connect(dirname + "/" + messages_db) | |
def dict_factory(cursor, row): | |
d = {} | |
for idx, col in enumerate(cursor.description): | |
d[col[0]] = row[idx] | |
return d | |
conn.row_factory = dict_factory | |
curs = conn.cursor() | |
class Record: | |
def __str__(self): | |
return str(self.__dict__) | |
def __repr__(self): | |
return repr(self.__dict__) | |
class Reader: | |
def __init__(self, dirname): | |
self.dir = dirname | |
self.dat = open(dirname + "/" + mbdb,"r+b") | |
self.dat.read(6) | |
def __iter__(self): | |
self.dat.seek(6) | |
return self | |
def next(self): | |
n = self.readRecord() | |
if n is None: | |
raise StopIteration | |
return n | |
def readRecord(self): | |
rec = Record() | |
rec.domain = self._recString() | |
if len(rec.domain) == 0: | |
return None | |
rec.path = self._recString() | |
rec.link = self._recString() | |
rec.sha = sha.new("%s-%s" %(rec.domain, rec.path)).hexdigest() | |
rec.hash = self._recString() | |
rec.encKey = self._recString() | |
(rec.mode, rec.inode, rec.uid, rec.gid) =struct.unpack(">HQII",self.dat.read(18)) | |
(rec.mtime, rec.atime, rec.ctime) = struct.unpack(">III",self.dat.read(12)) | |
(rec.fsize, rec.prot, rec.propCount) = struct.unpack(">QBB", self.dat.read(10)) | |
if rec.propCount > 0: | |
rec.prop = {} | |
for i in range(rec.propCount): | |
key = self._recString() | |
rec.prop[key] = self._recString() | |
return rec | |
def _recString(self): | |
rd = self.dat.read(2) | |
if len(rd) < 2: | |
return u'' | |
dsize = struct.unpack(">H",rd)[0] | |
if dsize == 65535 or dsize == 0: | |
return u'' | |
st = self.dat.read(dsize) | |
return st | |
def verifyFileOnDisk(self,rec): | |
spath = os.path.join(self.dir, rec.sha) | |
stat = os.stat(spath) | |
if stat.st_size != rec.fsize: | |
print stat | |
print rec.fsize | |
print "File mismatch: " + rec.sha | |
print rec.hash | |
files = {} | |
contacts = {} | |
def build_contacts(): | |
contacts_conn = sqlite3.connect(dirname + "/" + contacts_db) | |
contacts_conn.row_factory = dict_factory | |
curs = contacts_conn.cursor() | |
def normalize(id): | |
if id.find("@") > 0: | |
return id | |
elif id.find("http") == 0: | |
return id | |
elif id.find("itunes") == 0: | |
return None | |
fixed = re.compile(r'[^\d.]*').sub('',id) | |
if len(fixed) == 0: | |
return None | |
fixed = "1" + fixed if fixed[0] != "1" else fixed | |
fixed = "+" + fixed if fixed[0] != "+" else fixed | |
return fixed | |
sql = """SELECT first first, last last, value FROM ABMultiValue, ABPerson WHERE record_id = ROWID AND value is not null""" | |
curs.execute(sql) | |
for row in curs.fetchall(): | |
id = normalize(row['value']) | |
if id is not None: | |
e = {'first':row['first'], 'last':row['last']} | |
contacts[id] = e | |
contacts_conn.close() | |
def full_name(contact): | |
if contact['alias']: | |
al = contact['alias'] | |
return (al['first'] or '') + (' ' if al['first'] and al['last'] else '') + (al['last'] or '') | |
return None | |
def full_id(contact): | |
return "%s (%s)" %(full_name(contact), contact['id']) if full_name(contact) else contact['id'] | |
def open_chats(): | |
chat_meta = """ | |
SELECT distinct chat.chat_identifier, h.id | |
from chat chat | |
JOIN chat_handle_join chj on (chat.rowid = chj.chat_id) | |
join handle h on (chj.handle_id = h.rowid) | |
order by chat.rowid | |
""" | |
curs.execute(chat_meta) | |
last = None | |
rows = curs.fetchall() | |
end = len(rows) - 1 | |
meta = None | |
houtput = None | |
joutput = None | |
for i in range(len(rows)): | |
row = rows[i] | |
cname = row["chat_identifier"] | |
if last == cname: | |
contact = {'id':row['id'], 'alias':contacts.get(row['id'], None)} | |
files[cname]['meta']['contacts'].append(contact) | |
if i < end: | |
continue | |
def init(): | |
contact = {'id':row['id'], 'alias':contacts.get(row['id'], None)} | |
files[cname] = {'html':cname+".html", 'json':cname+".json",'meta':{'chat':cname, 'contacts': [contact]}} | |
houtput = codecs.open(os.path.join(targetdir, cname + ".html"), "w+b",'utf-8') | |
joutput = codecs.open(os.path.join(targetdir, cname + ".json"), "w+b",'utf-8') | |
return meta, houtput, joutput | |
def close(chat, houtput, joutput): | |
m = files[chat]['meta'] | |
m['group'] = len(m['contacts']) > 1 | |
joutput.write(u'{"meta":') | |
joutput.write(json.dumps(m)) | |
joutput.write(u',\n"messages":[') | |
contactlist = ", ".join([full_id(contact) for contact in m['contacts']]) | |
houtput.write( | |
u""" | |
<html> | |
<head> | |
<meta charset="utf-8" /> | |
<link rel="stylesheet" href="./theme.css"> | |
</head> | |
<body> | |
<div class="info">%s</div><br> | |
""" %(contactlist)) | |
joutput.close() | |
houtput.close() | |
files[chat]['init'] = True | |
if last != cname: | |
if last in files: | |
close(last, houtput, joutput) | |
meta, houtput, joutput = init() | |
if i == end: | |
close(cname, houtput, joutput) | |
last = cname | |
def rewrite_path(path): | |
if path is None: | |
return None | |
if path[0] == u'~': | |
return path[2:] | |
if path.find(u"Library") > 0: | |
path = path[path.find(u"Library"):] | |
return path | |
def dump_messages(records): | |
message_dump = """ | |
SELECT | |
chat.chat_identifier as conversation, | |
h.id AS user_id, | |
case when m.service = 'SMS' then 1 else null end sms, | |
m.cache_roomnames as chatroom, | |
is_from_me from_me, | |
CASE | |
WHEN date > 0 THEN strftime('%Y-%m-%dT%H:%M:%SZ', date + 978307200, 'unixepoch') | |
ELSE NULL | |
END as timestamp, | |
text as text, | |
atch.filename filename | |
FROM chat chat | |
join chat_message_join cmj on chat.rowid = cmj.chat_id | |
join message m on cmj.message_id = m.rowid | |
LEFT JOIN handle h ON h.rowid = m.handle_id | |
LEFT JOIN message_attachment_join maj | |
ON maj.message_id = m.rowid | |
LEFT JOIN attachment atch on maj.attachment_id = atch.rowid | |
ORDER BY chat.chat_identifier, m.rowid asc, timestamp | |
""" | |
curs.execute(message_dump) | |
def init(chat): | |
houtput = codecs.open(os.path.join(targetdir,chat['html']), "a+b",'utf-8') | |
joutput = codecs.open(os.path.join(targetdir,chat['json']), "a+b",'utf-8') | |
return houtput, joutput | |
def close(houtput, joutput): | |
houtput.write(u'\n </body>\n</html>') | |
joutput.write(u']}') | |
houtput.close() | |
joutput.close() | |
last = None | |
while 1: | |
rows = curs.fetchmany(1000) | |
if len(rows) == 0: | |
close(houtput, joutput) | |
break | |
for row in rows: | |
dname = row['conversation'] | |
dat = dict(row) | |
del dat['conversation'] # redundant in json dump | |
dat['filename'] = rewrite_path(dat['filename']) | |
chat = files[dname] | |
chat_meta = chat['meta'] | |
if last != dname: | |
if last in files: | |
close(houtput, joutput) | |
houtput, joutput = init(chat) | |
joutput.write(json.dumps(dat)) | |
clz = u"me" if dat["from_me"] == 1 else u"them" | |
clz = clz if dat['sms'] is None else clz + ' sms' #extra class if sms | |
if chat_meta['group'] and dat['user_id'] is not None: | |
user = contacts.get(dat['user_id'], {'first': dat['user_id']}) | |
houtput.write(u'<span class="group">%s</span>' %(user['first'])) | |
houtput.write(u'<div class="' + clz + u'">') | |
def process_attachment(dat): | |
filename = dat['filename'] | |
print("Processing attachment: %s" %(filename)) | |
record = records.get(filename, None) | |
if record is None: | |
# Try without the file extension | |
record = records.get(u'.'.join(filename.split(u'.')[0:-1])) | |
if records is None: | |
print("Error: Unable to find attachment for %s with %s" %(dname, filename)) | |
print("repr:" + repr(filename)) | |
houtput.write(u'<span>Attachment Error:%s</span>' %(filename)) | |
return | |
else: | |
mms_dir = os.path.join(targetdir,dname) | |
sha_file = record.sha | |
if len(record.link) > 0: | |
new_record = records.get(record.link, None) | |
if new_record is not None: | |
sha_file = new_record.sha | |
oldname = os.path.join(dirname,sha_file) | |
newfile = u"%s-%s" %(sha_file, os.path.split(filename)[1]) | |
newname = os.path.join(mms_dir, newfile) | |
if COPY_FILES: | |
if not os.path.exists(mms_dir): | |
os.mkdir(mms_dir) | |
if not os.path.exists(newname) and os.path.exists(oldname): | |
print("Archiving attachment: %s to %s" %(filename, newname)) | |
shutil.copy2(os.path.join(dirname,sha_file), newname) | |
elif os.path.exists(newname): | |
print("Found file %s, skipping" %newname) | |
elif not os.path.exists(oldname): | |
print("Error: Unable to find attachment for %s with %s" %(dname, filename)) | |
print("repr:" + repr(filename)) | |
if os.path.exists(newname): | |
houtput.write(u'<a href="' + newname + u'">Attachment </a>') | |
else: | |
houtput.write(u'<span>Attachment Error:%s</span>' %(filename)) | |
if dat['filename'] is not None: | |
process_attachment(dat) | |
if dat['text'] is not None: | |
houtput.write(dat['text']) | |
houtput.write(u'</div>') | |
time = parser.parse(dat['timestamp']) | |
houtput.write(u'<time datetime="%s">%s</time>' %(dat['timestamp'], time.astimezone(tzlocal()).strftime("%a %b %d %y %I:%M %p"))) | |
joutput.write(u',') | |
last = dname | |
manifest_reader = Reader(dirname) | |
records = {i.path.decode('utf-8'):i for i in manifest_reader} | |
build_contacts() | |
open_chats() | |
dump_messages(records) | |
theme_css = u""" | |
.info { | |
font-size: 20px; | |
} | |
.them { | |
clear: both; | |
margin: 4px; | |
padding: 5px; | |
background-color: #cef; | |
border: 1px solid #bde; | |
border-radius: 8px 8px 8px 0px; | |
} | |
.sms { | |
background-color: #cfc; | |
border-color: #9e9; | |
} | |
.me { | |
clear: both; | |
margin: 4px; | |
padding: 5px; | |
background-color: #eee; | |
text-align: right; | |
border: 1px solid #ddd; | |
border-radius: 8px 8px 0px 8px; | |
} | |
time { | |
margin: 0px 4px 0px 0px; | |
float: right; | |
text-align: right; | |
font-size: 12px; | |
} | |
.group { | |
margin: 6px; | |
float: left; | |
} | |
""" | |
with f as codecs.open(os.path.join(targetdir,"theme.css"),"w+b", 'utf-8'): | |
f.write(theme_css) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You might need to install dateutil.
usage:
python message_backup.py [backup dir] [target dir]