Last active
January 26, 2023 17:12
-
-
Save typemytype/d056992b1a68b84f6abc3b458d77bad1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
based on | |
https://github.com/andrejbauer/slack-to-discord | |
!slackpreview | |
!slackimport | |
""" | |
import discord | |
from discord.ext import commands | |
import datetime | |
import json | |
import argparse | |
import sys | |
import re | |
import os | |
import urllib.request | |
arg_parser = argparse.ArgumentParser(description="A Discord bot for transferring a Slack archive to a channel") | |
arg_parser.add_argument('--prefix', dest='prefix', default='!', help='bot command prefix (default !)') | |
arg_parser.add_argument('--token', required=True, help='bot access token') | |
arg_parser.add_argument('--users', required=True, type=argparse.FileType('r'), help='slack users.json file') | |
arg_parser.add_argument('file', nargs='+') | |
# Parse the command-line | |
args = arg_parser.parse_args(sys.argv[1:]) | |
# Load user data | |
users = {} | |
for u in json.load(args.users): | |
if not u['deleted']: | |
users[u['id']] = u['real_name'] | |
# Process the input files | |
messagesMap = dict() | |
messages = [] | |
for fn in args.file: | |
downloadFolder = os.path.join(os.path.dirname(fn), "_downloads") | |
if not os.path.exists(downloadFolder): | |
os.mkdir(downloadFolder) | |
with open(fn, "rb") as fh: | |
for msg in json.load(fh): | |
# Unfold mentions | |
txt = re.sub(r'<@(\w+)>', | |
(lambda m: '@' + users.get(m.group(1), 'Unknown')), | |
msg["text"]) | |
# Unescape HTML characters | |
txt = re.sub(r'>', '>', txt) | |
txt = re.sub(r'<', '<', txt) | |
txt = re.sub(r'&', '&', txt) | |
# ignore | |
if txt.endswith(" has joined the channel"): | |
continue | |
if txt.endswith(" has left the channel"): | |
continue | |
if txt.startswith("added an integration to this channel: "): | |
continue | |
if txt.startswith("set the channel description: "): | |
continue | |
if txt == "This content can't be displayed.": | |
# its a release | |
txt = "" | |
for block in msg.get("blocks", []): | |
content = block.get("text", {}) | |
txt += content.get("text", "") | |
txt = re.sub(r"\\\/", "/", txt) | |
txt = re.sub(r"<http:", "http:", txt) | |
txt = re.sub(r"\.dmg\| ([0-9]+)>", ".dmg ", txt) | |
# extract image, files, not docs | |
if "files" in msg: | |
msg["localFiles"] = [] | |
for file in msg["files"]: | |
if file["mode"] != "docs" and "url_private_download" in file: | |
downloadPath = os.path.join(downloadFolder, f"{file['id']}_{file['name']}") | |
if not os.path.exists(downloadPath): | |
urllib.request.urlretrieve(file["url_private_download"], downloadPath) | |
msg["localFiles"].append(dict(path=downloadPath, filename=file['name'])) | |
messagesMap[msg["ts"]] = msg | |
# Split messages longer than 2000 characters | |
while len(txt) > 0: | |
msg["text"] = txt[:2000] | |
txt = txt[2000:] | |
messages.append(msg) | |
# Sort the messages by timestamp | |
messages.sort(key=(lambda msg: msg['ts'])) | |
print("Read {0} messages.".format(len(messages))) | |
# Create the bot | |
intents = discord.Intents.default() | |
intents.message_content = True | |
bot = commands.Bot(command_prefix=args.prefix, intents=intents) | |
print("Activating the bot. Press Ctrl-C to exit.") | |
def getFiles(msg): | |
files = None | |
if "localFiles" in msg: | |
files = [] | |
for file in msg["localFiles"]: | |
files.append(discord.File(file["path"], filename=file["filename"])) | |
return files | |
def format_message(msg): | |
"""Format the given message in Markdown, suitable for posting on Discord.""" | |
return "{timestamp} **{user}**: {text}".format( | |
timestamp = datetime.datetime.fromtimestamp(float(msg['ts'])).strftime('%Y-%m-%d %H:%M'), | |
user=users.get(msg.get('user'), 'Unknown'), | |
text=msg['text']) | |
# Set up the bot listener | |
@bot.command(pass_context=True) | |
async def slackimport(ctx): | |
n = len(messages) | |
k = 0 | |
print ("Sending {0} messages ...".format(n)) | |
for msg in messages: | |
k = k + 1 | |
if k % 10 == 0: | |
print("{0}/{1} messages sent ...".format(k, n)) | |
try: | |
if "replies" in msg: | |
threadName = msg['text'] | |
if len(threadName) > 100: | |
threadName = threadName[:96] + "..." | |
# thread = await ctx.channel.create_thread(name=threadName, type=discord.ChannelType.public_thread) | |
# await thread.send(format_message(msg), files=getFiles(msg)) | |
message = await ctx.send(format_message(msg), files=getFiles(msg)) | |
thread = await message.create_thread(name=threadName) | |
# its a thread | |
for reply in msg["replies"]: | |
replyMessage = messagesMap.get(reply["ts"]) | |
if replyMessage: | |
await thread.send(format_message(replyMessage), files=getFiles(replyMessage)) | |
elif "thread_ts" not in msg: | |
# Send the message to Discord (Markdown format) | |
await ctx.send(format_message(msg), files=getFiles(msg)) | |
except Exception as e: | |
print(f"Message {k} could not be sent: {e}") | |
print("Finished sending messages. My job is done, kill me now.") | |
@bot.command(pass_context=True) | |
async def slackpreview(ctx): | |
for msg in messages: | |
print("-" * 50) | |
print(format_message(msg)) | |
for file in msg.get("localFiles", []): | |
print(f" {file['filename']} ({file['path']})") | |
@bot.command(pass_context=True) | |
async def slackexit(ctx): | |
print("Logging out ...") | |
await bot.logout() | |
print("Stopping (do not worry about the error messages printed below) ...") | |
exit(0) | |
# Run the bot | |
bot.run(args.token) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment