-
-
Save liquidgenius/416e827721fc4af99cc353e532eaa86c to your computer and use it in GitHub Desktop.
Downloads, archives, analyzes and plots Facebook Messenger conversations (individual and group)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
__author__ = 'Sushain K. Cherivirala' | |
import argparse | |
import cmd | |
import collections | |
import contextlib | |
import copy | |
import datetime | |
import functools | |
import getpass | |
import glob | |
import gzip | |
import http.cookiejar | |
import importlib.util | |
import itertools | |
import json | |
import logging | |
import math | |
import operator | |
import os | |
import pprint | |
import re | |
import shlex | |
import shutil | |
import statistics | |
import subprocess | |
import sys | |
import tempfile | |
import typing | |
import urllib | |
import urllib.parse | |
import numpy # statistics doesn't provide a weighted average function (yet) | |
import lxml.html | |
# Postgres, while far less portable/lightweight, has far better support for JSON than SQLite | |
# and Facebook's thread info responses change far too quickly to keep up with. The --bare | |
# option was added in order to somewhat compensate for this (very) breaking change. | |
psycopg2_installed = importlib.util.find_spec('psycopg2') | |
if psycopg2_installed: | |
import psycopg2 | |
import psycopg2.extensions | |
import psycopg2.extras | |
else: | |
logging.warning('Failed to import psycopg2, only bare mode supported (no database).') | |
psycopg2_polyfill = collections.namedtuple('pyscopg2', ['extensions', 'extras']) | |
psycopg2_extensions_polyfill = collections.namedtuple('psycopg2_extensions', ['cursor', 'connection']) | |
psycopg2 = psycopg2_polyfill(extensions=psycopg2_extensions_polyfill(None, None), extras=None) | |
if importlib.util.find_spec('matplotlib'): | |
import matplotlib | |
import matplotlib.pyplot as plot | |
else: | |
logging.warning('Failed to import matplotlib, plotting will not be available.') | |
matplotlib = plot = None | |
if importlib.util.find_spec('wordcloud'): | |
import wordcloud | |
else: | |
logging.warning('Failed to import wordcloud, word clouds will not be available.') | |
wordcloud = None | |
try: | |
import selenium # noqa: F401 | |
from selenium import webdriver | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.common.action_chains import ActionChains | |
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities | |
except ImportError: | |
logging.warning('Failed to import selenium, browser authentication will not be available.') | |
webdriver = None | |
# TODO: Break up lines and decrease max-line-length | |
# TODO: CI config | |
# TODO: Prevent errors in interactive mode (functools.wrap) | |
############# | |
# Constants # | |
############# | |
id_to_user_map = { | |
'777993547': 'Jean', | |
'1253817276': 'Alexander', | |
'1311112684': 'Keerthana', | |
'1333603699': 'Saloni', | |
'1338262658': 'Sushain', | |
'1412264090': 'Michelle H.', | |
'1626215140': 'Maxine', | |
'1694710481': 'Sameer', | |
'1814644642': 'Devin', | |
'1841753743': 'Christina', | |
'100000241495175': 'Eric', | |
'100000284933876': 'Ani', | |
'100000534453859': 'Ashley', | |
'100000986269083': 'Prachi', | |
'100001184052364': 'Shreya', | |
'100002398916527': 'Amisha', | |
'100002421417870': 'Vijay', | |
'100002475584038': 'Ben', | |
'100002576434633': 'Snigdha', | |
'100002628181062': 'Pallavi', | |
'100002827417675': 'Rohan', | |
'100002878482600': 'Tiffany', | |
'100003127069904': 'Tiffany Do', | |
'100003355055997': 'Karen', | |
'100003971632140': 'Sara', | |
'100004252695708': 'Michelle N.', | |
'100004322110944': 'Rowena', | |
'100004476751719': 'Benji', | |
'100006790448156': 'Serena', | |
'100009196845865': 'Brittany', | |
'100012341320095': 'Spoorthi', | |
'100012529272199': 'Nikki', | |
'100025545846385': 'Brittany 2', | |
} | |
groups = [ | |
494248544089735, 1513200892278424, 322604171221575, 1021123947914529, | |
879550675408978, 940010492714431, 1700273163527834, 1097674336985252, | |
888706481258668, 851545464945488, 1378878545487465, | |
] | |
profanity = [ | |
r'\bfuck+(?:ing|ed|er)?\b', r'\b(?:dip)?shit+(?:ty+)?\b', r'\bdamn(?:it+)?\b', | |
r'\bgoddamn\b', r'\bdick\b', r'\bbullshit+\b', r'\bbastard\b', r'\bhell+\b', | |
r'\bbitch(?:ass)?\b', r'\bass+\b', r'\ba(?:ss)?hole\b', r'\bmotherfuck+(?:ing|ed|er)\b', | |
] | |
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.112 Safari/537.36' | |
########### | |
# Globals # | |
########### | |
Session = collections.namedtuple('Session', ['username', 'id', 'opener', 'dtsg']) | |
session = Session(None, None, None, None) | |
postgres_connection_string = None | |
######## | |
# Code # | |
######## | |
def id_to_user(userid: typing.Union[str, int]) -> str: | |
userid = str(userid).replace('fbid:', '') | |
if userid in id_to_user_map: | |
return id_to_user_map[userid] | |
else: | |
return 'Unknown_' + str(hash(str(userid))).replace('-', '')[:4] | |
def user_to_id(user: str) -> typing.Optional[str]: | |
return dict(zip(map(str.lower, id_to_user_map.values()), id_to_user_map.keys())).get(user.lower()) | |
def init_db() -> None: | |
conn = psycopg2.connect(postgres_connection_string) | |
cursor = conn.cursor() | |
cursor.execute(""" | |
CREATE TABLE threads ( | |
id VARCHAR(100) PRIMARY KEY, | |
info JSONB | |
) | |
""") | |
cursor.execute(""" | |
CREATE TABLE messages ( | |
id VARCHAR(100) PRIMARY KEY, | |
thread_id VARCHAR(100) REFERENCES threads(id) NOT NULL, | |
info JSONB | |
) | |
""") | |
cursor.execute("CREATE INDEX ON messages(((info->>'timestamp_precise')::TIMESTAMP))") | |
cursor.execute("CREATE INDEX ON messages((info->'message_sender'->>'id'))") | |
conn.commit() | |
conn.close() | |
def insert_messages( | |
cursor: psycopg2.extensions.cursor, | |
messages: typing.List[typing.Dict[str, typing.Any]], | |
thread_info: typing.Dict[str, typing.Any], | |
) -> None: | |
thread_key = thread_info['thread_key'] | |
thread_id = int(thread_key['thread_fbid'] or thread_key['other_user_id']) | |
cursor.execute(""" | |
INSERT INTO threads (id, info) | |
VALUES (%s, %s) | |
ON CONFLICT (id) DO UPDATE | |
SET info = EXCLUDED.info | |
""", (thread_id, json.dumps(thread_info))) | |
messages_values = list(map(lambda msg: (msg['message_id'], thread_id, json.dumps(msg)), messages)) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO messages (id, thread_id, info) | |
VALUES %s | |
ON CONFLICT (id) DO UPDATE | |
SET info = EXCLUDED.info | |
""", messages_values) | |
def dump_db(path: str, database_name: str) -> None: | |
logging.info('Starting database dump...') | |
with tempfile.NamedTemporaryFile(mode='w+') as db: | |
with subprocess.Popen(['pg_dump', database_name, '--verbose'], stdout=db, stderr=subprocess.PIPE, universal_newlines=True) as proc: | |
for line in proc.stderr: | |
logging.debug(line.rstrip()) | |
logging.info(f'Completed dumping the database ({os.path.getsize(db.name) / 1024 ** 2:.2f} MB).') | |
with gzip.open(path, 'wb') as db_compressed: | |
with open(db.name, 'rb') as db: | |
db_compressed.writelines(db) | |
logging.info(f'Completed compressing the database ({os.path.getsize(path) / 1024 ** 2:.2f} MB).') | |
def load_db(path: str, database_name: str) -> None: | |
args = shlex.split(f'psql --set ON_ERROR_STOP=on --single-transaction {database_name}') | |
subprocess.run(args, stdin=gzip.open(path), check=True) | |
def get_new_messages( | |
conversation_id: typing.Union[str, int], | |
group: bool = False, | |
old_messages: typing.Optional[typing.List[typing.Dict[str, typing.Any]]]=None, | |
limit: int = 2000, | |
) -> typing.Tuple[typing.Dict[str, typing.Any], typing.List[typing.Dict[str, typing.Any]]]: | |
global session | |
if not (session.opener and session.dtsg and session.id): | |
session = login() | |
opener, dtsg = session.opener, session.dtsg | |
logging.info(f'Fetching messages from conversation {conversation_id}' + (f' ({id_to_user(conversation_id)}).' if not group else '.')) | |
if old_messages: | |
newest_message_date = datetime.datetime.fromtimestamp(old_messages[-1]['timestamp_precise'] / 1e3) | |
logging.info(f'{len(old_messages)} messages currently downloaded.') | |
start_time = datetime.datetime.now() | |
new_messages: typing.List[typing.Dict[str, typing.Any]] = [] | |
info = None | |
before = int(datetime.datetime.now().timestamp() * 1e3) | |
failures = 0 | |
while True: | |
# TODO: Parallelize downloads (multiple queries via graphql?) | |
try: | |
data = { | |
'batch_name': 'MessengerGraphQLThreadFetcherRe', | |
'__dyn': '7AzkXxaA4ojgDxyLqzGomzEbHGbGey8WhLFwgoqwWhE98nwgUaoepovHyodEbbxW4E4u3ucDBwJx62i2PxOcG4K1Zxa2m4oqyUf8oCK251G6XDwnU567oeo5m4pHxC326U6OfBwHx-8xubxy1by8sxeEgzU5m6dopUhwIUa8', | |
'__req': '6', | |
'__rev': '3473034', | |
'__pc': 'PHASED:DEFAULT', | |
'__user': session.id, | |
'fb_dtsg': dtsg, | |
'queries': json.dumps({ | |
'o0': { | |
'doc_id': 1515220501901239, | |
'query_params': { | |
'id': str(conversation_id), | |
'message_limit': limit, | |
'load_messages': 1, | |
'load_read_receipts': False, | |
'before': before, | |
}, | |
}, | |
}), | |
} | |
with opener.open('https://www.facebook.com/api/graphqlbatch/', data=urllib.parse.urlencode(data).encode()) as response: | |
data = json.loads(response.read().decode().splitlines()[0])['o0']['data']['message_thread'] | |
data.pop('last_message', None) | |
new_messages_batch = data.pop('messages')['nodes'] | |
info = data | |
before = int(new_messages_batch[0]['timestamp_precise']) - 1 | |
except Exception as err: | |
failures += 1 | |
logging.warning(f'Failed to fetch messages before {before} with limit {limit} (failure #{failures}): {err}.') | |
if failures > 2: | |
logging.info(f'Changing limit from {limit} to {limit / 2}.') | |
limit //= 2 | |
failures = 0 | |
if limit < 10: | |
logging.error(f'Giving up after fetching {len(new_messages)} messages.') | |
raise err | |
continue | |
failures = 0 | |
new_messages = new_messages_batch + new_messages | |
oldest_message_date = datetime.datetime.fromtimestamp(before / 1e3) | |
if len(new_messages_batch) < limit or (old_messages and oldest_message_date < newest_message_date): | |
logging.info(f'Completed fetching {len(new_messages)} messages in conversation {conversation_id}.') | |
break | |
else: | |
logging.info(f'Fetched {limit} messages before {oldest_message_date}, fetched {len(new_messages)} messages so far.') | |
if old_messages: | |
new_messages = list(filter(lambda x: datetime.datetime.fromtimestamp(int(x['timestamp_precise']) / 1e3) > newest_message_date, new_messages)) | |
logging.info(f'Added {len(new_messages)} messages to existing {len(old_messages)} messages for a total of {len(new_messages) + len(old_messages)}.') | |
logging.info(f'The data retrieval took {datetime.datetime.now() - start_time} seconds.') | |
return info, new_messages | |
def get_messages( | |
cursor: psycopg2.extensions.cursor, | |
conversation_id: str, | |
query: typing.Optional[str] = None, | |
regular_expression: bool = False, | |
case_sensitive: bool = False, | |
) -> typing.List[typing.Dict[str, typing.Any]]: | |
if query and not regular_expression: | |
if case_sensitive: | |
cursor.execute( | |
""" | |
SELECT id, info->'message_sender'->>'id', info->>'timestamp_precise', info->'message'->>'text', info->'sticker' | |
FROM messages | |
WHERE thread_id = %s AND info->'message'->>'text' LIKE %s | |
AsyncIterator ORDER BY info->>'timestamp_precise' | |
""", | |
(conversation_id, f'%{query}%'), | |
) | |
else: | |
cursor.execute( | |
""" | |
SELECT id, info->'message_sender'->>'id', info->>'timestamp_precise', info->'message'->>'text', info->'sticker' | |
FROM messages | |
WHERE thread_id = %s AND LOWER(info->'message'->>'text') LIKE %s | |
ORDER BY info->>'timestamp_precise' | |
""", | |
(conversation_id, f'%{query.lower()}%'), | |
) | |
else: | |
cursor.execute( | |
""" | |
SELECT id, info->'message_sender'->>'id', info->>'timestamp_precise', info->'message'->>'text', info->'sticker' | |
FROM messages | |
WHERE thread_id = %s | |
ORDER BY info->>'timestamp_precise' | |
""", | |
(conversation_id, ), | |
) | |
messages = cursor.fetchall() | |
if query and regular_expression: # TODO: use Postgres' built-in regex handling | |
regex = re.compile(query, flags=(0 if case_sensitive else re.IGNORECASE)) | |
messages = list(filter(lambda x: x[2] and bool(regex.search(x[2])), messages)) | |
# TODO: consider psycopg2's DictCursor | |
return list(map( | |
lambda x: { | |
'id': x[0], | |
'author': x[1], | |
'timestamp': int(x[2]), | |
'body': x[3], | |
'sticker': x[4], | |
}, | |
messages, | |
)) | |
def all_days_span(oldest: datetime.datetime, newest: datetime.datetime) -> typing.List[datetime.date]: | |
all_days = [] | |
start_date = oldest.date() | |
while start_date <= newest.date(): | |
all_days.append(start_date) | |
start_date = start_date + datetime.timedelta(days=1) | |
return all_days | |
def termgraph(data: typing.Sequence[typing.Tuple[str, int]], buffer: int = 10) -> None: | |
columns = shutil.get_terminal_size((80, 20)).columns | |
x_padding = min(len(max(data, key=lambda x: len(x[0]))[0]), columns // 5) | |
max_y = max(data, key=operator.itemgetter(1))[1] | |
width = columns - x_padding - len(str(max_y)) - buffer | |
step = max_y / width | |
for x, y in data: | |
padded_x = f'{x: >{x_padding}}:' if len(x) <= x_padding else f'{x}\n{" " * x_padding}:' | |
bar = '|' if y < step else '▇' * int(y / step) | |
print(f'{padded_x} {bar} {y}') | |
# @profile | |
# NOTE: at one point (circa Python 3.3) this function used to be very slow, | |
# as of Python 3.6 it is very fast. Perhaps due to a new native dict implementation? | |
# Regardless, the profiling was a poor man's optimization technique rather than | |
# using more proper data structures. Here be dragons! | |
def messages_stats( | |
messages: typing.List[typing.Dict[str, typing.Any]], | |
plot_message_count: bool = False, | |
plot_cumulative_message_count: bool = False, | |
word_clouds: bool = False, | |
limit_plot_to_streak: bool = False, | |
) -> None: | |
start_time = datetime.datetime.now() | |
oldest, newest = datetime.datetime.max, datetime.datetime.min | |
# TODO: count more message types? reactions? | |
message_counts: typing.DefaultDict[str, typing.Counter[str]] = collections.defaultdict(lambda: collections.Counter({'sticker': 0, 'text': 0, 'other': 0})) | |
message_contents: typing.DefaultDict[str, typing.List[str]] = collections.defaultdict(list) | |
days_spoken = set() | |
daysMessages: typing.DefaultDict[datetime.date, typing.DefaultDict[str, int]] = collections.defaultdict(lambda: collections.defaultdict(int)) | |
stickerCounts: typing.DefaultDict[str, typing.Counter[str]] = collections.defaultdict(lambda: collections.Counter()) | |
responseTimes: typing.DefaultDict[str, typing.List[float]] = collections.defaultdict(list) | |
message_streaks: typing.List[typing.Tuple[str, int]] = [] | |
users = set() | |
last_message_user = None | |
current_message_streak = 0 | |
last_timestamp = None | |
for message in messages: | |
date = datetime.datetime.fromtimestamp(message['timestamp'] / 1e3) | |
oldest = min(oldest, date) | |
newest = max(newest, date) | |
user = id_to_user(message['author']) | |
daysMessages[date.date()][user] += 1 | |
days_spoken.add(date.date()) | |
users.add(user) | |
if last_message_user == user: | |
current_message_streak += 1 | |
last_timestamp = date | |
else: | |
if last_message_user: | |
message_streaks.append((last_message_user, current_message_streak)) | |
last_message_user = user | |
current_message_streak = 1 | |
if last_timestamp: | |
responseTimes[user].append((date - last_timestamp).total_seconds()) | |
last_timestamp = date | |
text = message['body'] | |
message_counts[user]['all'] += 1 | |
if text and len(text): | |
message_counts[user]['text'] += 1 | |
message_contents[user].append(text) | |
else: | |
if message['sticker']: | |
message_counts[user]['sticker'] += 1 | |
stickerCounts[user][message['sticker']['url']] += 1 | |
else: | |
message_counts[user]['other'] += 1 | |
# pprint.pprint(message) | |
print(f"Conversations amongst {' & '.join(users)} between {oldest} and {newest}:\n") | |
message_content = dict(map(lambda x: (x[0], '\n'.join(x[1])), message_contents.items())) | |
total_counts = collections.Counter({'sticker': 0, 'text': 0}) | |
for person, counts in message_counts.items(): | |
total_counts['sticker'] += counts['sticker'] | |
total_counts['text'] += counts['text'] | |
total_counts['all'] += counts['all'] | |
print(f"{person} sent {counts['all']} total messages, {counts['text']} text messages ({float(counts['text']) / counts['all']:.2%}) and {counts['sticker']} stickers ({float(counts['sticker']) / counts['all']:.2%}). " | |
f"On average, the text messages were {float(len(message_content[person])) / counts['text']:.2f} characters long which makes for a total of {len(message_content[person])} characters.") | |
top_messages = dict(map(lambda x: (x[0], sorted(x[1].items(), key=operator.itemgetter(1), reverse=True)[0][0]), daysMessages.items())) | |
top_messages_counts = sorted(list(collections.Counter(top_messages.values()).items()), key=operator.itemgetter(1), reverse=True) | |
if len(top_messages_counts) == 1: | |
print(f'{top_messages_counts[0][0]} talked the most every day...') | |
else: | |
print(f'{top_messages_counts[0][0]} talks the most, with {top_messages_counts[0][1]} day(s) when they sent the most messages, and {top_messages_counts[1][0]} is the quiet one with {top_messages_counts[1][1]} day(s).') | |
print(f"\nSo, a total of {total_counts['all']} messages, {total_counts['text']} text messages ({float(total_counts['text']) / total_counts['all']:.2%}) and {total_counts['sticker']} stickers ({float(total_counts['sticker']) / total_counts['all']:.2%}).") | |
all_days = all_days_span(oldest, newest) | |
print(f"That makes for an average of {float(total_counts['all']) / len(all_days):.2f} messages per day!") | |
print(f'Over the span of {len(all_days)} day(s), {len(set(all_days) - days_spoken)} day(s) went without conversation ({float(len(set(all_days) - days_spoken)) / len(all_days):.2%}).') | |
print(f"So, if we take that into account, it makes for an average of {float(total_counts['all']) / len(days_spoken):.2f} messages on days with conversation!") | |
profanity_counts: typing.DefaultDict[str, typing.Dict[str, int]] = collections.defaultdict(dict) | |
for user in users: | |
for word in profanity: | |
matches = re.findall(word, message_content[user], flags=re.IGNORECASE) | |
if matches: | |
most_common = collections.Counter(map(str.lower, matches)).most_common(1)[0] | |
profanity_counts[user][most_common[0]] = most_common[1] | |
profanity_total_counts = list(reversed(sorted(list(map(lambda x: (x[0], sum(x[1].values())), profanity_counts.items())), key=operator.itemgetter(1)))) | |
print(f"\n{profanity_total_counts[0][0]} has the potty mouth with {profanity_total_counts[0][1]} profane word(s) said whereas {', '.join(map(lambda x: '%s has said %s profane word(s)' % x, profanity_total_counts[1:]))}.") | |
for user in sorted(users, key=lambda x: - dict(profanity_total_counts).get(x, 0)): | |
user_profanity_counts = list(reversed(sorted(profanity_counts[user].items(), key=operator.itemgetter(1)))) | |
if user_profanity_counts: | |
print('%s\'s profanity of choice seems to be "%s" (%s occurences), they\'re also fans of %s.' % (user, user_profanity_counts[0][0], user_profanity_counts[0][1], ', '.join(map(lambda x: '"%s" (%s)' % x, user_profanity_counts[1:])) or 'apparently not much else')) | |
else: | |
print(f'{user} hasn\'t been the slightest bit profane.') | |
print('\nJust in case you\'re curious, the most eventful day was %s, when %s messages were sent :D' % typing.cast(typing.Tuple[str, str], tuple(max(map(lambda x: (x[0], functools.reduce(lambda s, a: s + a[1], x[1].items(), 0)), daysMessages.items()), key=operator.itemgetter(1))))) | |
longest_seq: typing.List[datetime.date] = [] | |
current_seq: typing.List[datetime.date] = [] | |
for day in sorted(list(days_spoken)): | |
if len(current_seq) > len(longest_seq): | |
longest_seq = copy.copy(current_seq) | |
if current_seq and current_seq[-1] + datetime.timedelta(days=1) == day: | |
current_seq.append(day) | |
else: | |
current_seq = [day] | |
if len(current_seq) > len(longest_seq): | |
longest_seq = copy.copy(current_seq) | |
print(f'The longest streak of days with at least one message lasted {len(longest_seq)} days, from {longest_seq[0]} to {longest_seq[-1]}!') | |
if current_seq and datetime.datetime.now().date() - current_seq[-1] <= datetime.timedelta(days=1): | |
print(f'On the other hand, the current streak is {len(current_seq)} days, from {current_seq[0]} to {current_seq[-1]}.') | |
else: | |
print(f'On the other hand, the current streak is 0 days, you haven\'t conversed since {current_seq[-1]} :(') | |
print(f"\nNow, on to stickers. There were an average of {float(total_counts['sticker']) / len(days_spoken):.2f} stickers used on days with conversation!") | |
for user in users: | |
print(f"Out of {user}'s {message_counts[user]['sticker']} stickers, the five most used were: " + ', '.join(list(map(lambda x: '%s (%s)' % x, stickerCounts[user].most_common(5))))) | |
message_streaks_per_user = {} | |
for user in users: | |
message_streaks_per_user[user] = collections.Counter(map(operator.itemgetter(1), filter(lambda x: x[0] == user, message_streaks))) | |
if len(users) == 2 and len(message_streaks) > 1: | |
print('\nSince there are only two people in this conversation, we can do some more calculations!') | |
user1 = message_streaks[0][0] | |
user2 = message_streaks[1][0] | |
sum1, num1, sum2, num2 = 0.0, 0, 0.0, 0 | |
last_message_streak: typing.Tuple[typing.Optional[str], int] = (None, 0) | |
for messageStreak in message_streaks: | |
if last_message_streak[0] == user1 and messageStreak[0] == user2: | |
sum1 += messageStreak[1] / last_message_streak[1] | |
num1 += 1 | |
elif last_message_streak[0] == user2 and messageStreak[0] == user1: | |
sum2 += messageStreak[1] / last_message_streak[1] | |
num2 += 1 | |
last_message_streak = messageStreak | |
print(f'{user1} sends {numpy.average(list(message_streaks_per_user[user1].keys()), weights=list(message_streaks_per_user[user1].values())):.2f} consecutive message on average and for each message, {user2} responds with {sum1 / num1:.2f} messages on average.') | |
print(f'On the other hand, {user2} sends {numpy.average(list(message_streaks_per_user[user2].keys()), weights=list(message_streaks_per_user[user2].values())):.2f} consecutive message on average and for each message, {user1} responds with {sum2 / num2:.2f} messages on average.') | |
print(f'When {user1} sends a message, {user2} tends to respond in {statistics.median(responseTimes[user2]):.1f} seconds (median response time).') | |
print(f'On the other hand, when {user2} sends a message, {user1} tends to respond in {statistics.median(responseTimes[user1]):.1f} seconds (median response time).') | |
print(f'\nThe data compilation took {datetime.datetime.now() - start_time} seconds.') | |
colors = ['b', 'r', 'g', 'c'] | |
if plot_message_count or plot_cumulative_message_count: | |
days_messages_list = sorted(daysMessages.items(), key=operator.itemgetter(0)) | |
fig = plot.figure() | |
subplot_count = len(list(filter(operator.truth, [plot_message_count, plot_cumulative_message_count]))) | |
if plot_message_count: | |
ax1 = fig.add_subplot(subplot_count, 1, 1) | |
plot.xlabel('Date') | |
plot.ylabel('Quantity') | |
plot.title('Number of Messages') | |
plots1 = [] | |
if plot_cumulative_message_count: | |
ax2 = fig.add_subplot(subplot_count, 1, 2 if plot_message_count else 1) | |
plot.xlabel('Date') | |
plot.ylabel('Quantity') | |
plot.title('Number of Messages over Time') | |
plots2 = [] | |
for i, user in enumerate(users): | |
user_messages = list(map(lambda x: (x[0], x[1][user]), filter(lambda y: user in y[1], days_messages_list))) | |
user_days = list(map(operator.itemgetter(0), user_messages)) | |
for day in filter(lambda x: x not in user_days, all_days): | |
user_messages.append((day, 0)) | |
user_messages = sorted(user_messages, key=operator.itemgetter(0)) | |
if limit_plot_to_streak: | |
user_messages = list(filter(lambda x: x[0] >= longest_seq[0] and x[0] <= longest_seq[-1], user_messages)) | |
if plot_message_count: | |
plt, = ax1.plot(*zip(*user_messages), f'.{colors[i % len(colors)]}-', label=user) | |
ax1.set_ylim(bottom=0) # TODO: the top gets cut off sometimes | |
plots1.append(plt) | |
if plot_cumulative_message_count: | |
cumulative_user_messages = list(itertools.accumulate(user_messages, func=lambda x, y: (y[0], x[1] + y[1]))) | |
plt, = ax2.plot(*zip(*cumulative_user_messages), f'.{colors[i % len(colors)]}-', label=user + ' (cumulative)') | |
ax2.set_ylim(bottom=0) # TODO: the top gets cut off sometimes | |
plots2.append(plt) | |
if plot_message_count: | |
ax1.legend(handles=plots1) | |
if plot_cumulative_message_count: | |
ax2.legend(handles=plots2, loc='lower right') | |
plot.show() | |
if word_clouds: | |
wordcloud.STOPWORDS.update(['T', 't', 'P', ':P', 'im', 'p', 'http', 'https', 'd', 'o']) | |
wordcloud.STOPWORDS.update(['u', 'ur', 'i']) | |
wordcloud.STOPWORDS.update(['T', 't', 'P', ':P', 'lol', 'LOL', 'yeah', 'okay', 'oh', 'im', 'p', 'http', 'https', 'd', 'o', 'want', 'go', 'png', 'skc']) | |
wordcloud.STOPWORDS.update(['dont', 'hes', 'whens', 'weve', 'hed', 'theres', 'havent', 'theyll', 'whos', 'theyd', 'youve', 'well', 'theyve', 'wont', 'mustnt', 'isnt', 'ill', 'whys', 'youd', 'wasnt', 'shouldnt', 'youre', 'arent', 'id', 'werent', 'im', 'cant', 'hadnt', 'couldnt', 'doesnt', 'hows', 'its', 'wheres', 'ive', 'didnt', 'whats', 'heres', 'theyre', 'hasnt', 'wouldnt', 'wed', 'shant', 'lets', 'hell', 'shed', 'youll', 'were', 'shes', 'thats']) # noqa: E501 | |
wordcloud.STOPWORDS.update(['think', 'make', 'one', 'wait', 'people', 'yea', 'ok', 'haha', 'guy', 'right', 'good', 'know', 'also', 'guys', 'mean', 'also']) | |
fig = plot.figure() | |
word_cloud_kwargs = {'background_color': 'white', 'max_words': 500, 'stopwords': wordcloud.STOPWORDS, 'font_path': '/Library/Fonts/Futura.ttc'} | |
if len(users) > 10: | |
all_content = '\n'.join(message_content.values()) | |
wc = wordcloud.WordCloud(width=1000, height=1500, **word_cloud_kwargs) | |
wc.generate(all_content) | |
f = fig.add_subplot(1, 2, 1) | |
f.axes.get_xaxis().set_visible(False) | |
f.axes.get_yaxis().set_visible(False) | |
f.set_title('Everyone') | |
plot.imshow(wc) | |
if session.id: | |
wc = wordcloud.WordCloud(width=1000, height=1500, **word_cloud_kwargs) | |
wc.generate(message_content[id_to_user(session.id)]) | |
f = fig.add_subplot(1, 2, 2) | |
f.axes.get_xaxis().set_visible(False) | |
f.axes.get_yaxis().set_visible(False) | |
f.set_title('Me') | |
plot.imshow(wc) | |
else: | |
rows = (len(users) // 5) + 1 | |
for i, user in enumerate(users): | |
wc = wordcloud.WordCloud(width=1000, height=1000, **word_cloud_kwargs) | |
wc.generate(message_content[user]) | |
f = fig.add_subplot(rows, math.ceil(len(users) / rows), i + 1) | |
f.axes.get_xaxis().set_visible(False) | |
f.axes.get_yaxis().set_visible(False) | |
f.set_title(user) | |
plot.imshow(wc) | |
plot.axis('off') | |
plot.show() | |
def all_messages_stats(cursor: psycopg2.extensions.cursor, plot_message_count: bool = False) -> None: | |
cursor.execute("SELECT thread_id, info->>'timestamp_precise', info->'message_sender'->>'id' FROM messages") | |
messages = list(map(lambda x: {'id': x[0], 'timestamp': x[1], 'author': x[2]}, cursor.fetchall())) | |
oldest, newest = datetime.datetime.max, datetime.datetime.min | |
day_message_user_counts: typing.DefaultDict[datetime.date, typing.DefaultDict[str, int]] = collections.defaultdict(lambda: collections.defaultdict(int)) | |
for message in messages: | |
date = datetime.datetime.fromtimestamp(int(message['timestamp']) / 1e3) | |
oldest = min(oldest, date) | |
newest = max(newest, date) | |
user = id_to_user(message['author']) | |
day_message_user_counts[date.date()][user] += 1 | |
days_message_counts = dict(map(lambda x: (x[0], sum(x[1].values())), day_message_user_counts.items())) | |
missed_days = set(all_days_span(oldest, newest)) - set(days_message_counts.keys()) | |
day_span_length = (newest - oldest).days | |
print(f'You have a total of {len(messages)} messages spanning {oldest} through {newest} ({day_span_length} days)!') | |
print(f'That makes an average of {len(messages) / day_span_length:.02f} messages per day unless you account for the {len(missed_days)} days without conversation, which makes it {len(messages) / (day_span_length - len(missed_days)):.02f} per day.') | |
last_missed_days = sorted(missed_days)[-1] | |
most_eventful_day, most_eventful_day_num_messages = max(days_message_counts.items(), key=operator.itemgetter(1)) | |
print(f'The last day without messages was {last_missed_days}, {(datetime.datetime.now().date() - last_missed_days).days} days ago, and the most eventful day was {most_eventful_day} with {most_eventful_day_num_messages} messages.') | |
print('\nConversations:\n') | |
cursor.execute("SELECT DISTINCT thread_id, info->'message_sender'->>'id' FROM messages") | |
conversation_authors: typing.DefaultDict[str, typing.Set[str]] = collections.defaultdict(set) | |
for msg in cursor.fetchall(): | |
conversation_authors[msg[0]].add(msg[1]) | |
user = id_to_user(session.id if session.id else collections.Counter(itertools.chain.from_iterable(conversation_authors.values())).most_common(1)[0][0]) | |
cursor.execute('SELECT thread_id, COUNT(*) FROM messages GROUP BY thread_id') | |
message_nums = list(sorted(map(lambda x: (', '.join(sorted(map(id_to_user, conversation_authors[x[0]]), key=lambda x: (x != user, x))), x[1]), cursor.fetchall()), key=operator.itemgetter(1))) | |
termgraph(message_nums) | |
print('\nMessage authors:\n') | |
cursor.execute("SELECT info->'message_sender'->>'id', COUNT(*) FROM messages GROUP BY info->'message_sender'->>'id'") | |
message_nums = list(sorted(filter(lambda x: not x[0].startswith('Unknown_'), map(lambda x: (id_to_user(x[0]), x[1]), cursor.fetchall())), key=operator.itemgetter(1))) | |
termgraph(message_nums) | |
if plot_message_count: | |
days_message_counts.update(dict(map(lambda x: (x, 0), missed_days))) | |
days_message_counts_list = sorted(days_message_counts.items(), key=operator.itemgetter(0)) | |
plt, = plot.plot(list(map(operator.itemgetter(0), days_message_counts_list)), list(map(operator.itemgetter(1), days_message_counts_list)), '.b-', label='All') | |
plot.xlabel('Date') | |
plot.ylabel('Number of Messages') | |
plot.title('Number of Messages over time') | |
plot.legend(handles=[plt]) | |
plot.show() | |
def update_conversation( | |
cursor: psycopg2.extensions.cursor, | |
conversation_id: typing.Union[str, int], | |
group: bool=False, | |
limit: int=500, | |
save: bool=True, | |
) -> typing.Tuple[int, int]: | |
if cursor: | |
cursor.execute("SELECT info->>'timestamp_precise' FROM messages WHERE thread_id = %s ORDER BY info->>'timestamp_precise'", (str(conversation_id), )) | |
old_message_timestamps = list(map(lambda x: {'timestamp_precise': int(x[0])}, cursor.fetchall())) | |
else: | |
old_message_timestamps = [] | |
if len(old_message_timestamps): | |
thread_info, new_messages = get_new_messages(conversation_id, old_messages=old_message_timestamps, limit=limit, group=group) | |
else: | |
thread_info, new_messages = get_new_messages(conversation_id, group=group, limit=limit * 10) | |
if save: | |
if len(new_messages) != 0: | |
insert_messages(cursor, new_messages, thread_info) | |
logging.info(f'Inserted {len(new_messages)} messages in database.') | |
else: | |
logging.info('No new messages to insert in database.') | |
if cursor is None: | |
pprint.pprint( # type: ignore # since pprint types are wrong | |
{ | |
'thread_info': thread_info, | |
'messages': new_messages, | |
}, | |
width=shutil.get_terminal_size((80, 20)).columns, | |
compact=True, | |
) | |
return len(new_messages) + len(old_message_timestamps), len(new_messages) | |
def update_conversations( | |
cursor: psycopg2.extensions.cursor, | |
conversation_ids: typing.List[str], | |
limit: int=500, | |
save: bool=True, | |
) -> typing.Tuple[int, int]: | |
num_conversation_messages = {} | |
num_added_messages = 0 | |
for conversation_id in conversation_ids: | |
num_messages, num_new_messages = update_conversation(cursor, conversation_id, limit=limit, save=save) | |
num_conversation_messages[conversation_id] = num_messages | |
num_added_messages += num_new_messages | |
message_nums = list(map(lambda x: (id_to_user(x[0]), x[1]), sorted(num_conversation_messages.items(), key=operator.itemgetter(1)))) | |
termgraph(message_nums) | |
return num_added_messages, sum(map(operator.itemgetter(1), message_nums)) | |
def update_all_individual_conversations(cursor: psycopg2.extensions.cursor, limit: int = 500, save: bool = True) -> None: | |
conversation_ids = list(id_to_user_map.keys()) | |
total_num_messages_inserted, total_num_messages = update_conversations(cursor, conversation_ids, limit=limit, save=save) | |
logging.info(f'Inserted {total_num_messages_inserted} new messages in all individual conversations for a total of {total_num_messages} messages.') | |
def print_messages(messages: typing.Sequence[typing.Union[str, typing.Dict[str, typing.Any]]], interactive: bool=False) -> None: | |
current_line = 0 | |
max_author_length = max(map(len, map(id_to_user, map(operator.itemgetter('author'), filter(lambda x: not isinstance(x, str), messages)))), default=0) | |
page_size = shutil.get_terminal_size((80, 20)) | |
for message in messages: | |
if isinstance(message, str): | |
output = message | |
else: | |
author = id_to_user(message['author']) | |
if message['body']: | |
body = message['body'] | |
elif message['sticker']: | |
body = f"[STICKER: {message['sticker']['url']}]" | |
else: | |
body = str(message) | |
timestamp = datetime.datetime.fromtimestamp(message['timestamp'] / 1e3).strftime('%m-%d-%Y %H:%M:%S') | |
output = f'{timestamp}\t{author.rjust(max_author_length)}: {body}' | |
output_lines = sum(map(lambda x: math.ceil(len(x) / page_size.columns), output.split('\n'))) | |
if current_line + output_lines > (page_size.lines - 2): | |
if input('\nPress enter to continue (q to quit)... ').lower() in {'q', 'quit', 'exit', 'no', 'n'}: | |
return | |
if not interactive: | |
os.system('cls' if os.name == 'nt' else 'clear') | |
current_line = 0 | |
print(output) | |
current_line += output_lines | |
def search_conversation( | |
cursor: psycopg2.extensions.cursor, | |
conversation_id: str, | |
query: str, | |
latest_first: bool = False, | |
author_filter: typing.Optional[typing.Union[str, int]] = None, | |
regular_expression: bool = False, | |
case_sensitive: bool = False, | |
interactive: bool = False, | |
) -> None: | |
messages = get_messages(cursor, conversation_id, query=query, regular_expression=regular_expression, case_sensitive=case_sensitive) | |
print(f'{len(messages)} message results found for search query {repr(query)}.\n') | |
if messages: | |
regex = re.compile(query, flags=(0 if case_sensitive else re.IGNORECASE)) | |
instance_counts = reversed(sorted(collections.Counter(itertools.chain.from_iterable(map(lambda x: map(repr, regex.findall(x['body'])), messages))).items(), key=operator.itemgetter(1))) | |
termgraph(list(instance_counts)) | |
print('\n') | |
author_counts = collections.Counter(map(lambda x: id_to_user(x['author']), messages)).items() | |
termgraph(list(author_counts)) | |
if input('\nPress enter to continue (q to quit)... ').lower() in {'q', 'quit', 'exit', 'no', 'n'}: | |
return | |
if not interactive: | |
os.system('cls' if os.name == 'nt' else 'clear') | |
if latest_first: | |
messages = list(reversed(messages)) | |
if author_filter: | |
author_user = id_to_user(author_filter) | |
messages = list(filter(lambda x: id_to_user(x['author']) == author_user, messages)) | |
print_messages(messages, interactive=interactive) | |
def grep_conversation( | |
cursor: psycopg2.extensions.cursor, | |
conversation_id: str, | |
args: typing.List[str], | |
latest_first: bool = False, | |
interactive: bool = False, | |
) -> None: | |
grep_output_re = re.compile(r'(?P<line_num>\d+)[:-](?P<message>.*)') | |
def process_grep_line(grep_output: str) -> typing.Union[str, typing.Dict[str, typing.Any]]: | |
message_line_matches = grep_output_re.match(grep_output) | |
if message_line_matches: | |
line_num_string, message_body = message_line_matches.groups() | |
message = messages[int(line_num_string) - 1] | |
return dict(message, **{'body': message_body}) | |
return grep_output | |
messages = list(filter(operator.itemgetter('body'), get_messages(cursor, conversation_id))) | |
grep_input = '\n'.join(map(operator.itemgetter('body'), messages)) | |
proc = subprocess.run(['grep', '-n', '--color=always'] + args, universal_newlines=True, stdout=subprocess.PIPE, check=True, input=grep_input) # TODO: add color | |
filtered_messages = list(map(process_grep_line, proc.stdout.splitlines())) | |
print_messages(filtered_messages, interactive=interactive) | |
def login(password: typing.Optional[str] = None) -> Session: | |
if not password: | |
password = getpass.getpass(f'Password ({session.username}): ') | |
cookie_jar = http.cookiejar.CookieJar() | |
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookie_jar)) | |
opener.addheaders = [('User-agent', user_agent)] # type: ignore # since OpenerDirector types are incomplete | |
opener.open('https://m.facebook.com/login.php') | |
login_page = lxml.html.parse(opener.open('https://m.facebook.com/login.php')) | |
login_form = dict(login_page.forms[0].fields) | |
login_form.update({ | |
'email': session.username, | |
'pass': password, | |
}) | |
opener.open('https://m.facebook.com/login.php?refsrc=https%3A%2F%2Fm.facebook.com%2Flogin.php&lwv=100&refid=9', data=json.dumps(login_form).encode()) | |
dtsg = re.findall(r'\["DTSGInitialData",\[\],\{"token":"(.*?)"}', opener.open('https://www.facebook.com/').read().decode()) | |
if not dtsg: | |
logging.critical('Login failed.') | |
sys.exit(1) | |
else: | |
dtsg = dtsg[0] | |
logging.info(f'Login successful (dtsg: {dtsg}).\n') | |
cookies = {cookie.name: cookie.value for cookie in cookie_jar} | |
return Session( | |
username=session.username, | |
id=cookies['c_user'], | |
opener=opener, | |
dtsg=dtsg, | |
) | |
def browser_login(username: str) -> typing.Tuple[str, typing.Dict[str, str]]: | |
firefox_capabilities = DesiredCapabilities.FIREFOX | |
firefox_capabilities['marionette'] = True | |
with contextlib.closing(webdriver.Firefox(capabilities=firefox_capabilities)) as driver: | |
driver.get('https://www.facebook.com/login.php') | |
action_chain = ActionChains(driver) | |
action_chain.send_keys_to_element(driver.find_element_by_name('email'), username) | |
action_chain.move_to_element(driver.find_element_by_name('pass')).click() | |
action_chain.perform() | |
dtsg = WebDriverWait(driver, 2 * 60).until( | |
lambda driver: | |
re.findall(r'\["DTSGInitialData",\[\],\{"token":"(.*?)"}', driver.page_source) if driver and 'checkpoint' not in driver.current_url else False, | |
)[0] | |
cookies = dict(map(lambda x: (x['name'], x['value']), driver.get_cookies())) | |
return dtsg, cookies | |
def valid_conversation(id_or_name: typing.Union[str, int]) -> str: | |
if not id_to_user(id_or_name).startswith('Unknown') or str(id_or_name) in map(str, groups): | |
return str(id_or_name).replace('fbid:', '') | |
user_as_id = user_to_id(str(id_or_name)) | |
if user_as_id is not None: | |
return user_as_id | |
else: | |
raise argparse.ArgumentTypeError(f'{id_or_name} is not a valid Facebook ID or recognized name') | |
def valid_path(path: str) -> str: | |
if os.access(os.path.dirname(path) or '.', os.W_OK): | |
return path | |
else: | |
raise argparse.ArgumentTypeError(f'{path} is not a valid path for dumping the message database') | |
def valid_cookies(cookie_string: str) -> typing.Dict[str, str]: | |
try: | |
cookie_string = cookie_string.strip(';') | |
return dict(map(lambda x: typing.cast(typing.Tuple[str, str], tuple(x.strip().split('='))), cookie_string.split(';'))) | |
except Exception: | |
raise argparse.ArgumentTypeError(f'{cookie_string} is an invalid cookie string') | |
class Shell(cmd.Cmd): | |
intro = 'Welcome to the shell. Type help or ? to list commands.\n' | |
prompt = '\033[92m(shell)\033[0m ' | |
def __init__( | |
self, | |
cursor: psycopg2.extensions.cursor, | |
conn: psycopg2.extensions.connection, | |
args: argparse.Namespace, | |
save: bool, | |
) -> None: | |
super().__init__() | |
self.cursor = cursor | |
self.conn = conn | |
self.args = args | |
self.save = save | |
def do_update(self, arg: str) -> None: | |
"""Update conversation(s) (all without argument): update Joe""" | |
group_limit = self.args.limit * self.args.group_limit_multiplier | |
if arg: | |
for conversation in map(valid_conversation, shlex.split(arg)): | |
if conversation in groups: | |
update_conversation(self.cursor, conversation, group=True, limit=group_limit, save=self.save) | |
else: | |
update_conversation(self.cursor, conversation, limit=args.limit, save=self.save) | |
else: | |
update_all_individual_conversations(self.cursor, limit=args.limit, save=self.save) | |
for group in groups: | |
update_conversation(self.cursor, group, group=True, limit=group_limit, save=self.save) | |
def complete_update(self, text: str, line: str, begidx: int, endidx: int) -> typing.List[str]: | |
return list(filter(lambda x: x.startswith(text), list(id_to_user_map.values()) + list(map(str, groups)))) | |
def do_search(self, arg: str) -> None: | |
"""Search a conversation: search Joe term""" | |
user, query = shlex.split(arg) | |
search_conversation( | |
self.cursor, | |
valid_conversation(user), | |
query, | |
latest_first=self.args.latest_first, | |
author_filter=self.args.author, | |
regular_expression=self.args.regular_expression, | |
case_sensitive=self.args.case_sensitive, | |
interactive=True, | |
) | |
def complete_search(self, text: str, line: str, begidx: int, endidx: int) -> typing.List[str]: | |
return self.complete_update(text, line, begidx, endidx) if line.count(' ') == 1 else [] | |
def do_grep(self, arg: str) -> None: | |
user, *grep_args = shlex.split(arg) | |
grep_conversation( | |
self.cursor, | |
valid_conversation(user), | |
grep_args, | |
latest_first=self.args.latest_first, | |
interactive=True, | |
) | |
def do_dump(self, arg: str) -> None: | |
"""Dump the database: dump""" | |
dump_db(self.args.dump_db or arg, self.args.db_name) | |
def do_load(self, arg: str) -> None: | |
"""Load compressed database: load filename""" | |
load_db(self.args.load_db or arg, self.args.db_name) | |
def complete_load(self, text: str, line: str, begidx: int, endidx: int) -> typing.List[str]: | |
return glob.glob(text + '*') | |
def do_statistics(self, arg: str) -> None: | |
"""Show statistics for conversations (all without an argument): statistics Joe Jane""" | |
conversations = shlex.split(arg) | |
if len(conversations): | |
for conversation in conversations: | |
messages_stats( | |
get_messages(self.cursor, valid_conversation(conversation)), | |
plot_message_count=getattr(self.args, 'plot_message_count', None), | |
plot_cumulative_message_count=getattr(self.args, 'plot_cumulative_message_count', None), | |
word_clouds=getattr(self.args, 'word_clouds', None), | |
limit_plot_to_streak=getattr(self.args, 'limit_plot_to_streak', None), | |
) | |
else: | |
all_messages_stats(self.cursor, plot_message_count=getattr(self.args, 'plot_message_count', None)) | |
def complete_statistics(self, text: str, line: str, begidx: int, endidx: int) -> typing.List[str]: | |
return self.complete_update(text, line, begidx, endidx) | |
def do_exit(self, arg: str) -> None: | |
"""Exits the shell: exit""" | |
def postcmd(self, stop: bool, line: str) -> bool: | |
if self.save: | |
self.conn.commit() | |
return line == 'exit' | |
def main(args: argparse.Namespace) -> None: | |
levels = [logging.WARNING, logging.INFO, logging.DEBUG] | |
logging.basicConfig( | |
format='[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s', | |
level=levels[min(len(levels) - 1, args.verbose)], | |
) | |
if args.init_db: | |
init_db() | |
return logging.info('Database initialization complete.') | |
global session | |
session = Session(username=args.username, opener=None, id=None, dtsg=None) | |
if getattr(args, 'browser', None): | |
try: | |
args.dtsg, args.cookies = browser_login(args.username) | |
logging.info(f"Login successful (dtsg: {args.dtsg}, cookies: {';'.join(map(lambda x: '%s=%s' % x, args.cookies.items()))}).\n") | |
except Exception as e: | |
return logging.error(f'Login failed: {repr(e)}.') | |
if args.dtsg and args.cookies: | |
cookie_jar = http.cookiejar.CookieJar() | |
cookies = [ | |
http.cookiejar.Cookie( # type: ignore # since Cookie types are broken | |
0, name, value, None, False, '', False, False, '', | |
False, False, None, True, None, None, {'HttpOnly': None}, | |
) for (name, value) in args.cookies.items() | |
] | |
for cookie in cookies: | |
cookie_jar.set_cookie(cookie) | |
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookie_jar)) | |
opener.addheaders = [('User-agent', user_agent)] # type: ignore # since OpenerDirector types are incomplete | |
session = Session( | |
username=args.username, | |
opener=opener, | |
id=args.cookies['c_user'], | |
dtsg=args.dtsg, | |
) | |
elif args.password: | |
session = login(password=args.password) | |
if not args.bare: | |
global postgres_connection_string | |
db_password = getpass.getpass(f'Postgres password ({args.db_username}): ') if args.db_password is None else args.db_password # can be empty | |
postgres_connection_string = "dbname='%s' user='%s' host='%s' password='%s'" % (args.db_name, args.db_username, args.db_host, db_password) | |
if args.bare: | |
conn, cursor = None, None | |
else: | |
conn = psycopg2.connect(postgres_connection_string) | |
cursor = conn.cursor() | |
save = not args.dry_run | |
group_limit = args.limit * args.group_limit_multiplier | |
if args.interactive: | |
Shell(cursor, conn, args, save).cmdloop() | |
elif args.update: | |
for conversation in args.update: | |
if conversation in groups: | |
update_conversation(cursor, conversation, group=True, limit=group_limit, save=save) | |
else: | |
update_conversation(cursor, conversation, limit=args.limit, save=save) | |
elif args.update_all or args.update_individuals or args.update_groups: | |
if args.update_all or args.update_individuals: | |
update_all_individual_conversations(cursor, limit=args.limit, save=save) | |
if args.update_all or args.update_groups: | |
for group in groups: | |
update_conversation(cursor, group, group=True, limit=group_limit, save=save) | |
elif args.search: | |
search_conversation( | |
cursor, | |
args.search[0], | |
args.search[1], | |
latest_first=args.latest_first, | |
author_filter=args.author, | |
regular_expression=args.regular_expression, | |
case_sensitive=args.case_sensitive, | |
) | |
elif args.grep: | |
grep_conversation( | |
cursor, | |
args.grep[0], | |
shlex.split(args.grep[1]), | |
latest_first=args.latest_first, | |
) | |
elif args.dump_db: | |
dump_db(args.dump_db, args.db_name) | |
elif args.load_db: | |
load_db(args.load_db, args.db_name) | |
elif args.statistics is not None: | |
if len(args.statistics): | |
for conversation in args.statistics: | |
messages_stats( | |
get_messages(cursor, conversation), | |
plot_message_count=getattr(args, 'plot_message_count', None), | |
plot_cumulative_message_count=getattr(args, 'plot_cumulative_message_count', None), | |
word_clouds=getattr(args, 'word_clouds', None), | |
limit_plot_to_streak=getattr(args, 'limit_plot_to_streak', None), | |
) | |
else: | |
all_messages_stats(cursor, plot_message_count=getattr(args, 'plot_message_count', None)) | |
if save and conn: | |
conn.commit() | |
if conn: | |
conn.close() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser( | |
description='Download, archive, analyze and plot Facebook Messenger conversations (individual and group)', | |
epilog=', '.join(typing.cast(typing.List[str], filter(None, [ | |
'selenium not installed, browser authentication disabled' if not webdriver else None, | |
'matplotlib not installed, message plotting disabled' if not matplotlib else None, | |
'wordcloud not installed, word clouds disabled' if not wordcloud else None, | |
]))), | |
) | |
parser.add_argument('--verbose', '-v', action='count', help='Add verbosity (maximum -vv, default -v)', default=1) | |
mode_named_group = parser.add_argument_group('mode') | |
mode_group = mode_named_group.add_mutually_exclusive_group(required=True) | |
mode_group.add_argument('-i', '--interactive', action='store_true', help='Interactive mode') | |
mode_group.add_argument('-k', '--update', type=valid_conversation, nargs='+', metavar='ID/NAME', help='Update a conversation') | |
mode_group.add_argument('-a', '--update-all', action='store_true', help='Update all conversations') | |
mode_group.add_argument('-n', '--update-individuals', action='store_true', help='Update all individual conversations') | |
mode_group.add_argument('-g', '--update-groups', action='store_true', help='Update all group conversations') | |
if psycopg2_installed: | |
mode_group.add_argument('-s', '--search', metavar=('ID/NAME', 'QUERY'), nargs=2, help='Search a conversation') | |
mode_group.add_argument('-gr', '--grep', metavar=('ID/NAME', 'ARGS'), nargs=2, help='Grep a conversation') | |
mode_group.add_argument('-t', '--statistics', type=valid_conversation, nargs='*', metavar='ID/NAME', help='Display conversation(s) statistics (all conversations statistics void of an argument)') | |
mode_group.add_argument('--init-db', action='store_true', help='Initialize Postgres database') | |
mode_group.add_argument('--dump-db', metavar='PATH', type=valid_path, help='Dump compressed Postgres database here') | |
mode_group.add_argument('--load-db', metavar='PATH', type=open, help='Load compressed Postgres database from here') | |
datebase_group = parser.add_argument_group('database', 'Postgres connection options') | |
datebase_group.add_argument('-du', '--db-username', metavar='USERNAME', default=getpass.getuser(), help='Postgres username') | |
datebase_group.add_argument('-dn', '--db-name', metavar='DATABASE', default='fb_messages', help='Postgres database name') | |
datebase_group.add_argument('-dh', '--db-host', metavar='HOST', default='localhost', help='Postgres host URL') | |
datebase_group.add_argument('-dp', '--db-password', metavar='PASSWORD', default=os.environ.get('POSTGRES_PASSWORD'), help='Postgres password (default from environment variable POSTGRES_PASSWORD; interactive if unspecified)') | |
auth_group = parser.add_argument_group('authentication', 'Conversation authentication options') | |
auth_group.add_argument('-u', '--username', default='sushain97', help='Facebook account username') | |
auth_group.add_argument('-p', '--password', help='Facebook account password') | |
if webdriver: | |
auth_group.add_argument('--browser', action='store_true', default=False, help='Facebook browser authentication') | |
auth_group.add_argument('--browser-timeout', type=int, default=2, help='Facebook browser authentication timeout in minutes') | |
auth_group.add_argument('--dtsg', help='Facebook dtsg value (must use --cookies as well)') | |
auth_group.add_argument('--cookies', type=valid_cookies, help='Facebook cookies value (must use --dtsg as well)') | |
stats_group = parser.add_argument_group('statistics', 'Conversation statistics options') | |
if matplotlib: | |
stats_group.add_argument('-P', '--plot-message-count', action='store_true', default=False, help='Plot individual message count over time') | |
stats_group.add_argument('-Q', '--plot-cumulative-message-count', action='store_true', default=False, help='Plot individual cumulative message count over time') | |
stats_group.add_argument('-S', '--limit-plot-to-streak', action='store_true', default=False, help='Limit message plot to time since streak started') | |
if wordcloud: | |
stats_group.add_argument('-W', '--word-clouds', action='store_true', default=False, help='Display individual message word clouds') | |
search_group = parser.add_argument_group('search', 'Conversation search options') | |
search_group.add_argument('-F', '--latest-first', action='store_true', default=False, help='Show latest messages first') | |
search_group.add_argument('-I', '--regular-expression', action='store_true', default=False, help='Treat search query as regular expression') | |
search_group.add_argument('-A', '--author', type=valid_conversation, metavar='ID/NAME', help='Show only messages from this author') | |
search_group.add_argument('-C', '--case-sensitive', action='store_true', default=False, help='Case sensitive search') | |
download_group = parser.add_argument_group('download', 'Conversation download options') | |
download_group.add_argument('-L', '--limit', type=int, default=500, help='Message download limit') | |
download_group.add_argument('-M', '--group-limit-multiplier', action='count', default=2, help='Multiply message download limit for groups') | |
download_group.add_argument('-D', '--dry-run', action='store_true', default=False, help='Don\'t save to database') | |
download_group.add_argument('-B', '--bare', action='store_true', default=not psycopg2_installed, help='Run without database (update results redirected to STDOUT)') | |
args = parser.parse_args() | |
if not psycopg2_installed: | |
args.search = args.statistics = None | |
args.init_db = args.dump_db = args.load_db = False | |
if bool(args.dtsg) ^ bool(args.cookies): | |
parser.error('--dtsg and --cookies must both be set for manual authentication.') | |
args.dry_run = args.dry_run or args.bare | |
if args.bare and (args.search or args.statistics or args.init_db or args.dump_db or args.load_db): | |
parser.error('--bare is incompatible with any operations that require a database (search, statistics, init_db, dump_db, load_db)') | |
if args.search: | |
args.search[0] = valid_conversation(args.search[0]) | |
plot_messages_arg_required = getattr(args, 'limit_plot_to_streak', False) | |
plotting_messages = getattr(args, 'plot_cumulative_message_count', False) or getattr(args, 'plot_message_count', False) | |
if plot_messages_arg_required and not plotting_messages: | |
parser.error('--plot_message-count or --plot-cumulative-message-count must be set when --limit-plot-to-streak is.') | |
main(args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
certifi==2018.1.18 | |
chardet==3.0.4 | |
cycler==0.10.0 | |
flake8==3.5.0 | |
flake8-commas==1.0.0 | |
flake8-polyfill==1.0.2 | |
flake8-quotes==1.0.0 | |
idna==2.6 | |
lxml==4.1.1 | |
matplotlib==2.1.2 | |
mccabe==0.6.1 | |
numpy==1.14.0 | |
pep8-naming==0.7.0 | |
Pillow==5.0.0 | |
psycopg2==2.7.3.2 | |
pycodestyle==2.3.1 | |
pyflakes==1.6.0 | |
pyparsing==2.2.0 | |
python-dateutil==2.6.1 | |
pytz==2017.3 | |
selenium==3.8.1 | |
six==1.11.0 | |
urllib3==1.22 | |
wordcloud==1.3.1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[flake8] | |
max-line-length = 290 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment