Last active
July 13, 2023 00:41
-
-
Save jonsoini/06239ded5e2a4a296530d7a7b18828a7 to your computer and use it in GitHub Desktop.
Run some basic analytics on your signal desktop data.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This takes the default text output from SigTop and parses it | |
# https://github.com/tbvdm/sigtop | |
# This uses the default text format. | |
# It probably would have been better to start with the sigtop json or sqlite export, ah well, this was a fun experiment. | |
# example CLI command: | |
# python3 signal-data-parser.py my-sigtop-file.txt --json my-output.json | |
import argparse | |
import pandas as pd | |
import re | |
from collections import defaultdict | |
from operator import itemgetter | |
from itertools import groupby | |
def process_text_data(file_path): | |
with open(file_path, 'r') as file: | |
text_data = file.readlines() | |
# Initialize empty lists to store the extracted data | |
names = [] | |
sent_dates = [] | |
messages = [] | |
hashtags = [] | |
reactions = [] | |
# Regular expression for extracting hashtags | |
hashtag_pattern = re.compile(r"#([\w*]+)") | |
# Regular expression for extracting reactions | |
reaction_pattern = re.compile(r"Reaction: (\S+) from (.*)") | |
# Initialize variables to hold the current message information | |
current_name = None | |
current_sent_date = None | |
current_message = [] | |
current_reactions = [] | |
# Iterate over each line in the text data | |
for line in text_data: | |
line = line.strip() # Remove leading/trailing white spaces | |
if line.startswith("From:"): # Sender info | |
# If a new message block starts and there was a previous message, store the previous message | |
if current_message: | |
# Extract hashtags from the message text | |
message_text = " ".join(current_message) | |
current_hashtags = re.findall(hashtag_pattern, message_text) | |
# Append the extracted information to the lists | |
names.append(current_name) | |
sent_dates.append(current_sent_date) | |
messages.append(message_text) | |
hashtags.append(current_hashtags) | |
reactions.append(current_reactions) | |
# Extract name | |
from_info = line[len("From:"):].strip().split(" (") | |
current_name = from_info[0].strip() | |
# Reset the current message information for the next block | |
current_sent_date = None | |
current_message = [] | |
current_reactions = [] | |
elif line.startswith("Sent:"): # Sent date info | |
current_sent_date = line[len("Sent:"):].strip() | |
elif line.startswith("Reaction:"): # Reaction info | |
current_reactions.append(re.findall(reaction_pattern, line)[0]) # Add reaction to the list | |
elif not line.startswith("Received:") and not line.startswith("Attachment:") and not line.startswith("Type:") and line != "": | |
# Message text (only if the line is not empty and does not start with "Received:", "Attachment:", or "Type:") | |
current_message.append(line) | |
# Store the last message if it hasn't been stored yet | |
if current_message: | |
message_text = " ".join(current_message) | |
current_hashtags = re.findall(hashtag_pattern, message_text) | |
names.append(current_name) | |
sent_dates.append(current_sent_date) | |
messages.append(message_text) | |
hashtags.append(current_hashtags) | |
reactions.append(current_reactions) | |
# Create a DataFrame from the extracted data | |
df = pd.DataFrame({ | |
"Name": names, | |
"Sent Date": sent_dates, | |
"Message Text": messages, | |
"Hashtags": hashtags, | |
"Reactions": reactions | |
}) | |
return df | |
def total_hashtags(df): | |
hashtag_counts = defaultdict(int) | |
for hashtag_list in df['Hashtags']: | |
for hashtag in hashtag_list: | |
hashtag_counts[hashtag] += 1 | |
return hashtag_counts | |
def total_messages(df): | |
message_counts = defaultdict(int) | |
for name in df['Name']: | |
message_counts[name] += 1 | |
return message_counts | |
def total_reactions(df): | |
reaction_counts = defaultdict(int) | |
reactions_by_person = defaultdict(lambda: defaultdict(int)) | |
for name, reaction_list in zip(df['Name'], df['Reactions']): | |
for reaction, _ in reaction_list: | |
reaction_counts[reaction] += 1 | |
reactions_by_person[name][reaction] += 1 | |
return reaction_counts, reactions_by_person | |
def main(): | |
parser = argparse.ArgumentParser(description='Process a text message file.') | |
parser.add_argument('filename', type=str, help='The path to the text file to process') | |
parser.add_argument('--csv', type=str, help='The filename to output the DataFrame as a CSV file') | |
parser.add_argument('--json', type=str, help='The filename to output the DataFrame as a JSON file') | |
args = parser.parse_args() | |
df = process_text_data(args.filename) | |
hashtag_counts = total_hashtags(df) | |
message_counts = total_messages(df) | |
reaction_counts, reactions_by_person = total_reactions(df) | |
# Print DataFrame, hashtag counts, message counts, and reaction counts | |
print(df.head()) | |
print("\nTotal hashtag counts:") | |
for hashtag, count in sorted(hashtag_counts.items(), key=lambda item: item[1], reverse=True): | |
print(f"{hashtag}: {count}") | |
print("\nTotal message counts:") | |
for name, count in sorted(message_counts.items(), key=lambda item: item[1], reverse=True): | |
print(f"{name}: {count}") | |
print("\nTotal reaction counts:") | |
for reaction, count in sorted(reaction_counts.items(), key=lambda item: item[1], reverse=True): | |
print(f"{reaction}: {count}") | |
print("\nTop 5 reactions used by each person:") | |
print("{:<20} {:<30}".format('Name', 'Top 5 Reactions')) | |
for name, reactions in reactions_by_person.items(): | |
top_reactions = sorted(reactions.items(), key=lambda item: item[1], reverse=True)[:5] | |
top_reactions_string = ', '.join([f"{reaction} {count}" for reaction, count in top_reactions]) | |
print("{:<20} {:<30}".format(name, top_reactions_string)) | |
# Output DataFrame as CSV or JSON file if requested | |
if args.csv: | |
df.to_csv(args.csv, index=False) | |
hashtag_df = pd.DataFrame(list(hashtag_counts.items()), columns=['Hashtag', 'Count']) | |
hashtag_df.to_csv(args.csv.replace('.csv', '-hashtag-report.csv'), index=False) | |
message_df = pd.DataFrame(list(message_counts.items()), columns=['Name', 'Message Count']) | |
message_df.to_csv(args.csv.replace('.csv', '-message-report.csv'), index=False) | |
reaction_df = pd.DataFrame([(name, reaction, count) for name, reactions in reactions_by_person.items() for reaction, count in reactions.items()], columns=['Name', 'Reaction', 'Count']) | |
reaction_df.to_csv(args.csv.replace('.csv', '-reaction-report.csv'), index=False) | |
if args.json: | |
df.to_json(args.json, orient='records') | |
hashtag_df = pd.DataFrame(list(hashtag_counts.items()), columns=['Hashtag', 'Count']) | |
hashtag_df.to_json(args.json.replace('.json', '-hashtag-report.json'), orient='records') | |
message_df = pd.DataFrame(list(message_counts.items()), columns=['Name', 'Message Count']) | |
message_df.to_json(args.json.replace('.json', '-message-report.json'), orient='records') | |
reaction_df = pd.DataFrame([(name, reaction, count) for name, reactions in reactions_by_person.items() for reaction, count in reactions.items()], columns=['Name', 'Reaction', 'Count']) | |
reaction_df.to_json(args.json.replace('.json', '-reaction-report.json'), orient='records') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment