- open up tdesktop (ie the Telegram desktop client)
- go to your favorite group
- export all messages as json -- you can skip all media
- head to wherever tdesktop downloaded it to -- maybe Downloads/ChatExport_yyyy-mm-dd/
- run generate.py there
- optionally: weep a little
Last active
October 21, 2021 00:44
-
-
Save zhongfu/fa2fe44634d7e8ff4f52f5a59de3c745 to your computer and use it in GitHub Desktop.
lobsterdao chat stats
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# counts number of rows where the value in COL_TO_READ | |
# is less than RANK | |
import csv | |
import sys | |
if len(sys.argv) < 4: | |
print(f"{sys.argv[0]} FILENAME COL_TO_READ RANKS...") | |
sys.exit(1) | |
fn = sys.argv[1] | |
col = sys.argv[2] | |
ranks_raw = sys.argv[3:] | |
try: | |
ranks = [int(r) for r in ranks_raw] | |
except ValueError: | |
print("ranks should be numbers") | |
sys.exit(2) | |
with open(fn, 'r', newline='') as f: | |
reader = csv.DictReader(f) | |
count = 0 | |
end = False | |
for rank in ranks: | |
while True: | |
try: | |
row = next(reader) | |
except StopIteration: | |
end = True | |
if int(row[col]) < rank or end: | |
print(f"{count} users with {rank} {col}") | |
count += 1 | |
break | |
if not end: | |
count += 1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import csv | |
import json | |
import pytz | |
from datetime import datetime | |
# tdesktop dumps messages in local tz | |
# so we change the tz from utc+8 to utc+1.. utc+2..? why is DST a thing | |
source_tz = pytz.timezone('Asia/Singapore') | |
target_tz = pytz.timezone('CET') | |
# change this to the message id that you'd like to stop at | |
id_limit = None | |
messages = dict() | |
with open('result.json', 'r') as f: | |
messages_list = json.load(f)['messages'] | |
for msg in messages_list: | |
msgid = msg['id'] | |
if id_limit and msgid >= id_limit: | |
break | |
messages[msgid] = msg | |
msg['date'] = datetime.fromisoformat(msg['date']).astimezone(source_tz).astimezone(target_tz) | |
def get_uid(msg): | |
uid = msg.get('from_id') or msg.get('actor_id') | |
assert isinstance(uid, str), "uid is not str" | |
return uid | |
uid_map = dict() | |
for msgid, msg in messages.items(): | |
uid = get_uid(msg) | |
name = msg.get('from') or msg.get('actor') or '' | |
uid_map[uid] = name | |
def most_replies(): | |
parents = dict() | |
msgs_with_replies = dict() | |
for msgid, msg in messages.items(): | |
if 'reply_to_message_id' in msg: | |
current = msgid | |
parent = msg['reply_to_message_id'] | |
while parent in parents: | |
parent = parents[parent] | |
parents[current] = parent | |
msgs_with_replies[parent] = msgs_with_replies.get(parent, 0) + 1 | |
msgs_with_replies = dict(sorted(msgs_with_replies.items(), key=lambda tup: tup[1], reverse=True)) | |
# str or list | |
def parse_text(text): | |
if isinstance(text, str) or text is None: | |
return text | |
elif isinstance(text, list): | |
bits = list() | |
for bit in text: | |
if isinstance(bit, str): | |
bits.append(bit) | |
elif isinstance(bit, dict): | |
bits.append(bit['text']) | |
else: | |
raise ValueError(f"Unknown msg text component with type {type(text)}") | |
return ''.join(bits) | |
else: | |
raise ValueError(f"Got msg with weird type {type(text)}") | |
with open('largest_threads.csv', 'w', newline='') as f: | |
fields = ["replies", "msg_id", "msg_link", "sender", "sender_id", "date", "message"] | |
writer = csv.writer(f) | |
writer.writerow(fields) | |
for msgid, replies in msgs_with_replies.items(): | |
parts = [str(replies), str(msgid), f"https://t.me/lobsters_chat/{msgid}"] | |
if msgid in messages: | |
msg = messages[msgid] | |
parts.append(msg.get('from') or msg.get('actor') or '') | |
parts.append(get_uid(msg)) | |
parts.append(msg['date'].strftime("%Y-%m-%dT%H:%M:%S")) | |
if msg['type'] == 'message': | |
parts.append(parse_text(msg.get('text')) or '(media?)') | |
elif msg['type'] == 'service': | |
action = msg.get('action') | |
assert isinstance(action, str), f"service message with no action? {msgid}" | |
parts.append(f"(action: {msg.get('action')})") | |
else: | |
raise ValueError(f"Message with weird type {msg['type']}!") | |
else: | |
parts.append('') | |
parts.append('') | |
parts.append('') | |
parts.append('(deleted?)') | |
writer.writerow(parts) | |
def most_active(): | |
user_msg_count = dict() | |
user_msg_count_replies = dict() | |
user_msg_count_days = dict() | |
user_msg_count_replies_days = dict() | |
for msgid, msg in messages.items(): | |
if msg['type'] != 'message': | |
continue | |
uid = get_uid(msg) | |
date = msg['date'].date() | |
if uid not in user_msg_count: | |
user_msg_count[uid] = 0 | |
if uid not in user_msg_count_days: | |
user_msg_count_days[uid] = set() | |
user_msg_count[uid] += 1 | |
user_msg_count_days[uid].add(date) | |
if 'reply_to_message_id' in msg: | |
if uid not in user_msg_count_replies: | |
user_msg_count_replies[uid] = 0 | |
if uid not in user_msg_count_replies_days: | |
user_msg_count_replies_days[uid] = set() | |
user_msg_count_replies[uid] += 1 | |
user_msg_count_replies_days[uid].add(date) | |
fields_msgs = ["rank", "messages", "name", "uid"] | |
fields_days = ["rank", "days", "name", "uid"] | |
user_msg_count = dict(sorted(user_msg_count.items(), key=lambda tup: tup[1], reverse=True)) | |
user_msg_count_replies = dict(sorted(user_msg_count_replies.items(), key=lambda tup: tup[1], reverse=True)) | |
user_msg_count_days = dict(sorted({uid: len(days) for uid, days in user_msg_count_days.items()}.items(), key=lambda tup: tup[1], reverse=True)) | |
user_msg_count_replies_days = dict(sorted({uid: len(days) for uid, days in user_msg_count_replies_days.items()}.items(), key=lambda tup: tup[1], reverse=True)) | |
with open('user_msg_count.csv', 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(fields_msgs) | |
rank = None | |
msgs = None | |
for num, (uid, cnt) in enumerate(user_msg_count.items()): | |
if rank == None or cnt < msgs: | |
rank = num + 1 | |
msgs = cnt | |
writer.writerow([rank, msgs, uid_map[uid], uid]) | |
with open('user_msg_count_replies.csv', 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(fields_msgs) | |
rank = None | |
msgs = None | |
for num, (uid, cnt) in enumerate(user_msg_count_replies.items()): | |
if rank == None or cnt < msgs: | |
rank = num + 1 | |
msgs = cnt | |
writer.writerow([rank, msgs, uid_map[uid], uid]) | |
with open('user_msg_count_days.csv', 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(fields_days) | |
rank = None | |
msgs = None | |
for num, (uid, cnt) in enumerate(user_msg_count_days.items()): | |
if rank == None or cnt < msgs: | |
rank = num + 1 | |
msgs = cnt | |
writer.writerow([rank, msgs, uid_map[uid], uid]) | |
with open('user_msg_count_replies_days.csv', 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(fields_days) | |
rank = None | |
msgs = None | |
for num, (uid, cnt) in enumerate(user_msg_count_replies_days.items()): | |
if rank == None or cnt < msgs: | |
rank = num + 1 | |
msgs = cnt | |
writer.writerow([rank, msgs, uid_map[uid], uid]) | |
if __name__ == '__main__': | |
most_replies() | |
most_active() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment