Last active
November 29, 2023 12:26
-
-
Save punchagan/f0e182bb9cbdcb16f82126490829d092 to your computer and use it in GitHub Desktop.
Whatsapp Broadcast Export to Zulip
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.zip | |
*.jpg | |
/*.txt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from datetime import datetime | |
from os.path import abspath, dirname, join | |
import re | |
import sys | |
import zulip | |
DATE_FORMAT = "%m/%d/%y, %H:%M" | |
def is_message(text): | |
return bool(re.match("^\w+:", text)) | |
def extract_messages(text): | |
date_messages = re.split( | |
"(\d{2}/\d{2}/\d{2}, \d{2}:\d{2}) - ", text.strip(), flags=re.MULTILINE | |
)[1:] | |
messages = [] | |
for date, message in zip(date_messages[::2], date_messages[1::2]): | |
date = datetime.strptime(date, DATE_FORMAT) | |
message = message.strip() | |
if not is_message(message): | |
continue | |
sender, text = message.split(":", 1) | |
lines = text.strip().splitlines() | |
files = [line for line in lines if line.endswith("(file attached)")] | |
text = "\n".join([line for line in lines if line not in files]) | |
files = [line.split(" ", 1)[0] for line in files] | |
parsed_message = { | |
"date": date, | |
"text": text, | |
"sender": sender, | |
"files": files, | |
} | |
messages.append(parsed_message) | |
return messages | |
def assert_broadcast(messages): | |
senders = {message["sender"] for message in messages} | |
assert len(senders) == 1, "Multiple senders - not a WhatsApp Broadcast!" | |
def upload_image(client, path): | |
with open(path, "rb") as fp: | |
result = client.call_endpoint( | |
"user_uploads", method="POST", files=[fp] | |
) | |
return result["uri"] | |
def send_zulip_message(client, zulip_info, message): | |
request = { | |
"type": "stream", | |
"to": zulip_info["stream"], | |
"subject": zulip_info["topic"], | |
"content": message["text"].strip(), | |
} | |
result = client.send_message(request) | |
print(result) | |
def filter_messages(messages, start_date): | |
if not start_date: | |
messages_ = messages | |
else: | |
messages_ = list(filter(lambda x: x["date"] > start_date, messages)) | |
return messages_ | |
def confirm_send(messages, start_date): | |
if len(messages) == 0: | |
print("No messages to send") | |
return False | |
if not start_date: | |
msg = "Sending all {count} messages. [y/N]? " | |
else: | |
msg = "Sending {count} messages after {start_date}. [y/N]? " | |
confirmation = input( | |
msg.format(start_date=start_date, count=len(messages)) | |
) | |
return confirmation.strip().lower() == "y" | |
def main(export_path, start_date, zuliprc_path, zulip_stream): | |
export_dir = dirname(export_path) | |
with open(export_path) as f: | |
text = f.read() | |
messages = extract_messages(text) | |
assert_broadcast(messages) | |
messages = filter_messages(messages, start_date) | |
if not confirm_send(messages, start_date): | |
return | |
client = zulip.Client(config_file=zuliprc_path) | |
zulip_info = { | |
"stream": zulip_stream, | |
"topic": export_path.rsplit(".", 1)[0].split("with", 1)[1].strip(), | |
} | |
for message in messages: | |
# FIXME: Skip messages that have been already sent? For now we assume | |
# that the archive file is cleaned up manually, of the older messages. | |
if message["files"]: | |
image_path = join(export_dir, message["files"][0]) | |
image_uri = upload_image(client, image_path) | |
message["text"] = "[]({})\n\n{}".format(image_uri, message["text"]) | |
send_zulip_message(client, zulip_info, message) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--zuliprc", type=open, default="~/.zuliprc") | |
parser.add_argument("--stream", type=str, default="general") | |
parser.add_argument("--start-date", type=str, default="") | |
parser.add_argument("export-path", type=str) | |
options = parser.parse_args() | |
args = vars(options) | |
start_date = datetime.strptime(options.start_date, DATE_FORMAT) | |
main(args["export-path"], start_date, options.zuliprc.name, options.stream) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment