Skip to content

Instantly share code, notes, and snippets.

@0187773933
Last active September 15, 2023 21:39
Show Gist options
  • Save 0187773933/845c0b0f7f0ec7977311d0d1654dd82f to your computer and use it in GitHub Desktop.
Save 0187773933/845c0b0f7f0ec7977311d0d1654dd82f to your computer and use it in GitHub Desktop.
Python Script to Download All Messages In All Categories in All Guilds the Bot Has Joined
#!/usr/bin/env python3
import requests
from pprint import pprint
from box import Box
from pathlib import Path
import shutil
import json
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
from slugify import slugify # pip install python-slugify
from PIL import Image , ImageDraw , ImageFont
import textwrap
OVERWRITE = False
# OVERWRITE = True
def download_file( options ):
try:
if OVERWRITE == False:
if options[ 1 ].is_file() == True:
if options[ 1 ].stat().st_size > 1:
return True
r = requests.get( options[ 0 ] , stream=True )
total_size = int( r.headers.get( "content-length" , 0 ) )
block_size = 1024
t = tqdm( total=total_size , unit="iB" , unit_scale=True )
with open( str( options[ 1 ] ) , "wb" ) as f:
for data in r.iter_content( block_size ):
t.update( len( data ) )
f.write( data )
t.close()
if total_size != 0 and t.n != total_size:
print( "ERROR , something went wrong" )
except Exception as e:
print( e )
def write_json( file_path , python_object ):
with open( file_path , 'w', encoding='utf-8' ) as f:
json.dump( python_object , f , ensure_ascii=False , indent=4 )
def read_json( file_path ):
try:
with open( file_path ) as f:
return json.load( f )
except Exception as e:
return {}
def batch_process( options ):
batch_size = len( options[ "batch_list" ] )
with ThreadPoolExecutor() as executor:
result_pool = list( tqdm( executor.map( options[ "function_reference" ] , iter( options[ "batch_list" ] ) ) , total=batch_size ) )
return result_pool
def create_image(text, font_path='arial.ttf', font_size=20, text_color=(237, 230, 211)):
# Set the width of the image
page_width_in_inches = 6.5
dpi = 300
image_width = int(page_width_in_inches * dpi)
padding = 10 # padding on each side of the text
# Load the font
font = ImageFont.truetype(font_path, 20)
# Calculate line width in pixels
line_width = image_width - 2 * padding
line_height = font.getsize('A')[1]
# Split the text by line breaks and wrap each line individually
lines = []
for paragraph in text.split('\n'):
lines += textwrap.wrap(paragraph, width=int(line_width / font.getsize('A')[0]))
lines.append('') # Add an empty line to represent the line break
# Create the image
image_height = line_height * len(lines) + 2 * padding
image = Image.new('RGB', (image_width, image_height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
# Add each line of text to the image and calculate the max width
max_text_width = 0
for i, line in enumerate(lines):
# calculate text width
text_width, text_height = draw.textbbox((0, 0), line, font=font)[2:]
draw.text((padding, padding + line_height * i), line, font=font, fill=(0, 0, 0))
max_text_width = max(max_text_width, text_width)
# Resize the image to the max text width
resized_image_width = max_text_width + 2 * padding
image = image.crop((0, 0, resized_image_width, image_height))
return image
class DisordChannelArchiverBot:
def __init__( self , config={} ):
self.config = Box( config )
self.headers = headers = {
"accept": "application/json, text/plain, */*" ,
"Authorization": f"Bot {self.config.token}"
}
def enumerate_request( self , limit=100 , after=False ):
# https://discord.com/developers/docs/resources/channel#get-channel-messages
# https://discord.com/developers/docs/reference#snowflakes
pass
def get_guilds( self ):
# https://discord.com/developers/docs/resources/user#get-current-user-guilds
limit = 200
params = { "limit": limit }
url = f"https://discord.com/api/users/@me/guilds"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
self.guilds = response.json()
def get_guild_channels( self , guild_id ):
# https://discord.com/developers/docs/resources/guild#get-guild-channels
params = {}
url = f"https://discord.com/api/guilds/{guild_id}/channels"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
result = response.json()
return result
def get_channel_messages( self , channel_id ):
# https://discord.com/developers/docs/resources/channel#get-channel-messages
limit = 100
params = { "limit": limit }
url = f"https://discord.com/api/channels/{channel_id}/messages"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
messages = response.json()
# arrives in reverse order , aka latest message = array[0] , first message = array[-1]
messages.reverse()
# pprint( messages )
if len( messages ) < limit:
return messages
finished = False
iterations = 1
while finished == False:
print( f"Gathering {limit} new messages , Round = {iterations} , Total = {len( messages )}" )
params[ "before" ] = messages[ 0 ][ "id" ]
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
new_messages = response.json()
new_messages.reverse()
messages = new_messages + messages
iterations += 1
if len( new_messages ) < limit:
finished = True
# print( len( messages ) )
return messages
def download_all_message_attachments( self , output_directory , messages ):
download_list = []
total_messages = len( messages )
# zfill_number = len( str( total_messages ) )
zfill_number = 3
item_total = 1
for message_index , message in enumerate( messages ):
# if "attachments" not in message:
# continue
if "content" in message:
if len( message[ "content" ] ) > 1:
output_path = output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.png' )
x = create_image( message[ 'content' ] , font_path=self.config.image_text_font_path , font_size=20 )
x.save( str( output_path ) )
item_total += 1
# for embed_index , embed in enumerate( message[ "embeds" ] ):
# if "thumbnail" not in embed:
# continue
# if "proxy_url" not in embed[ "thumbnail" ]:
# continue
# file_type = embed[ "thumbnail" ][ "url" ].split( "." )[ -1 ][ 0 : 3 ]
# if file_type == "jpe":
# file_type = "jpeg"
# download_list.append([
# embed[ "thumbnail" ][ "proxy_url" ] ,
# output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.{file_type}' )
# ])
# item_total += 1
for attachment_index , attachment in enumerate( message[ "attachments" ] ):
if "url" not in attachment:
# pprint( message )
continue
if "filename" not in attachment:
# pprint( message )
continue
download_list.append([
attachment[ "url" ] ,
output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}{Path( attachment[ "filename" ] ).suffix}' )
])
item_total += 1
# pprint( download_list )
batch_process({
"max_workers": 10 ,
"batch_list": download_list ,
"function_reference": download_file
})
def get_channel( self , channel_id ):
params = {}
url = f"https://discord.com/api/channels/{channel_id}"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
result = response.json()
# pprint( result )
return result
def archive_channel( self , channel_id , output_base_directory=False , save_json=True ):
channel = self.get_channel( channel_id )
if "name" not in channel:
return False
if output_base_directory == False:
output_base_directory = Path.cwd().joinpath( "downloads" , channel[ "name" ] )
# output_base_directory.mkdir( parents=True , exist_ok=True )
message_archive_save_path = output_base_directory.joinpath( f'{channel[ "name" ]}.json' )
attachment_base_directory = output_base_directory.joinpath( channel[ "name" ] )
# shutil.rmtree( str( attachment_base_directory ) , ignore_errors=True )
attachment_base_directory.mkdir( parents=True , exist_ok=True )
print( f"1.) Downloading Message Archive of {channel[ 'name' ]}" )
messages = self.get_channel_messages( channel_id )
if save_json == True:
write_json( str( message_archive_save_path ) , messages )
print( f"2.) Downloading Attachments from {channel[ 'name' ]}" )
self.download_all_message_attachments( attachment_base_directory , messages )
def archive_all( self ):
self.get_guilds()
total_guilds = len( self.guilds )
for guild_index , guild in enumerate( self.guilds ):
# 1.) Prep Download Folder For Each Guild
guild_name_slug = slugify( guild[ "name" ] )
guild_output_dir = self.config.output_dir.joinpath( guild_name_slug )
guild_output_dir.mkdir( parents=True , exist_ok=True )
guild_download_tracker_path = guild_output_dir.joinpath( "downloaded.json" )
guild_download_tracker = read_json( str( guild_download_tracker_path ) )
# 2.) Get all the channels in the guild
g_channels = self.get_guild_channels( guild[ "id" ] )
# 3.) Find and Sort Channels By Category
g_categories = { channel[ "id" ]: { "name": channel[ "name" ] , "channels": [] } for channel in g_channels if channel[ "type" ] == 4 }
for channel in g_channels:
if channel[ "type" ] == 4:
continue
if "parent_id" in channel:
if channel[ "parent_id" ] in g_categories:
g_categories[ channel[ "parent_id" ] ][ "channels" ].append( channel )
else:
g_categories[ channel[ "id" ] ] = { "name": channel[ "name" ] , "channels": [ channel ] }
# pprint( g_categories )
# 4.) Save JSON Structure of Guild
write_json( guild_output_dir.joinpath( f"{guild_name_slug}.json" ) , g_categories )
# 5.) Download All Channel Messages
total_categories = len( g_categories )
for category_index , category in enumerate( g_categories ):
total_channels = len( g_categories[ category ][ "channels" ] )
for channel_index , channel in enumerate( g_categories[ category ][ "channels" ] ):
print( f"Downloading Channel Messages === Guild [ {guild_index+1} ] of {total_guilds} || Category [ {category_index+1} ] of {total_categories} || Channel [ {channel_index+1} ] of {total_channels}" )
channel_output_dir = guild_output_dir.joinpath( channel[ "name" ] )
channel_output_dir.mkdir( parents=True , exist_ok=True )
channel[ "downloaded_messages" ] = self.get_channel_messages( channel[ "id" ] )
# channel_messages = [ x for x in channel_messages if x[ "id" ] not in ]
channel_messages_json_path = channel_output_dir.joinpath( f"{channel[ 'name' ]}.json" )
write_json( str( channel_messages_json_path ) , channel[ "downloaded_messages" ] )
# 6.) Download Attachments
for category_index , category in enumerate( g_categories ):
total_channels = len( g_categories[ category ][ "channels" ] )
for channel_index , channel in enumerate( g_categories[ category ][ "channels" ] ):
print( f"Downloading Channel Attachements === Guild [ {guild_index+1} ] of {total_guilds} || Category [ {category_index+1} ] of {total_categories} || Channel [ {channel_index+1} ] of {total_channels}" )
channel_output_dir = guild_output_dir.joinpath( channel[ "name" ] )
self.download_all_message_attachments( channel_output_dir , channel[ "downloaded_messages" ] )
if __name__ == "__main__":
bot = DisordChannelArchiverBot({
"token": "asdf" ,
"output_dir": Path.cwd().joinpath( "DOWNLOAD_ALL" ) ,
"image_text_font_path": "/Users/morpheous/WORKSPACE/PYTHON/DiscordChannelArchiver/fonts/comic_sans.ttf" ,
})
bot.archive_all()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment