Created
May 20, 2015 05:04
-
-
Save pedramamini/51f450fe8ec24a8705d3 to your computer and use it in GitHub Desktop.
Extract URLs and related contact information from your OSX Messages.app database.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Extract URLs and related contact information from your OSX Messages.app database. | |
# | |
# TODO | |
# - automatically resolve username and discover contacts database (by largest item count if there is more than one). | |
# - make a machine parseable format. | |
# - keep track of last found URL (by hash?), allow for periodic run of script and addition to output. | |
# - update to latest gruber regex. | |
import re | |
import sys | |
import time | |
import sqlite3 | |
# change these: | |
MESSAGES_DB = "/Users/pedram/Library/Messages/chat.db" | |
CONTACTS_DB = "/Users/pedram/Library/Application Support/AddressBook/Sources/CADDFCF3-FE1E-40E4-A1D0-5F3E011A9AB9/AddressBook-v22.abcddb" | |
# no need to edit below this line. | |
DEBUG = False | |
# https://gist.github.com/gruber/8891611 | |
URL_EXTRACT = re.compile(ur'(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?\xab\xbb\u201c\u201d\u2018\u2019]))') | |
# apple epoch is 1/1/2001, whereas unix is 1/1/1971. | |
UNIX_TS_MOD = 978307200 | |
# SQL queries. | |
QUERY_MESSAGES = """ | |
SELECT | |
message.date AS date, | |
handle.id AS recipient, | |
message.text AS text | |
FROM | |
message, handle | |
WHERE | |
message.handle_id = handle.ROWID AND | |
( | |
LOWER(text) LIKE '%www.%' OR | |
LOWER(text) LIKE '%.com%' OR | |
LOWER(text) LIKE '%.net%' OR | |
LOWER(text) LIKE '%.org%' OR | |
LOWER(text) LIKE '%http://%' OR | |
LOWER(text) LIKE '%https://%' | |
) | |
ORDER BY | |
message.date DESC | |
""" | |
QUERY_CONTACTS = """ | |
SELECT | |
ZABCDRECORD.Z_PK AS record_id, | |
ZABCDRECORD.ZFIRSTNAME AS first, | |
ZABCDRECORD.ZLASTNAME AS last, | |
ZABCDPHONENUMBER.ZOWNER AS number_id, | |
ZABCDPHONENUMBER.ZFULLNUMBER AS number | |
FROM | |
ZABCDRECORD, | |
ZABCDPHONENUMBER | |
WHERE | |
ZABCDRECORD.Z_PK = ZABCDPHONENUMBER.ZOWNER AND | |
ZABCDPHONENUMBER.ZFULLNUMBER LIKE ? | |
""" | |
######################################################################################################################## | |
class db_wrapper: | |
def __init__ (self, db_path, mode="rw"): | |
""" | |
""" | |
self.db_path = db_path | |
self.mode = mode | |
self.conn_str = "file:" + self.db_path + "?mode=" + self.mode | |
self.conn = sqlite3.connect(self.db_path) | |
self.conn.row_factory = sqlite3.Row | |
self.conn.text_factory = str | |
self.cursor = self.conn.cursor() | |
#################################################################################################################### | |
def execute (self, query, params=(), MAX_ATTEMPTS=5): | |
""" | |
Execute the supplied query. | |
@type query: str | |
@param query: SQL query to execute. | |
@type params: Tuple | |
@param params: Optional SQL parameters to bind. | |
@rtype: self | |
@return: self. | |
""" | |
query = query.lstrip().rstrip() | |
succeeded = False | |
attempts = 0 | |
if type(params) != tuple: | |
params = (params, ) | |
while not succeeded and attempts < MAX_ATTEMPTS: | |
try: | |
self.cursor.execute(query, params) | |
self.conn.commit() | |
succeeded = True | |
except sqlite3.Error, e: | |
finding = "\n[FAIL] sqlite3 #%d: %s\n\n" % (attempts, e) | |
finding += "query: %s\n" % query | |
finding += "params: %s\n\n" % str(params) | |
sys.stderr.write(finding) | |
attempts += 1 | |
time.sleep(.5) | |
# if execute() failed, now is the time to whine about it. | |
if not succeeded: | |
raise Exception("sqlite failure count exceeded max of %d" % MAX_ATTEMPTS) | |
return self | |
#################################################################################################################### | |
def get (self): | |
""" | |
Retrieve the first item from the underlying cursor. | |
@rtype: str | |
@return: First row. | |
""" | |
return self.cursor.fetchone() | |
#################################################################################################################### | |
def iterate (self): | |
""" | |
Iterate over the cursor. | |
@rtype: Iterative row | |
@return: Yield row dictionary. | |
""" | |
for row in self.cursor.fetchall(): | |
yield row | |
######################################################################################################################## | |
if __name__ == "__main__": | |
messages = db_wrapper(MESSAGES_DB, mode="memory") | |
contacts = db_wrapper(CONTACTS_DB, mode="memory") | |
# keep a set of URLs. | |
urls = set() | |
for record in messages.execute(QUERY_MESSAGES).iterate(): | |
timestamp = time.ctime(UNIX_TS_MOD + record['date']) | |
if DEBUG: | |
print timestamp | |
print "\t", record['recipient'], record['text'] | |
# this ugly shit below will splice and wrap the generic number with sql wild card '%'. | |
recipient_normalized = "%" + "".join([a + b for a, b in zip(record['recipient'][2:], "%" * 20)]) | |
if DEBUG: | |
print "\t", recipient_normalized | |
# query the contact db using the normalized recipient we made above. | |
for contact in contacts.execute(QUERY_CONTACTS, recipient_normalized).iterate(): | |
name = "" | |
if contact['first']: | |
name += contact['first'] | |
if contact['last']: | |
name += " " + contact['last'] | |
if not name: | |
name = "unknown" | |
if DEBUG: | |
print "\t", name | |
# apply the gruber regex to detect and extract URLs from the SMS body. | |
for url in URL_EXTRACT.findall(record['text']): | |
url = url[0] | |
urls.add(url) | |
if DEBUG: | |
print "\t\t", url | |
# output the gleaned URL synopsis. | |
if not DEBUG: | |
print timestamp, name, url | |
# show the unique set of discovered URLs. | |
# for url in urls: | |
# print url |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment