Last active
August 25, 2025 16:48
-
-
Save thiagomgd/04dddb307b421d5f10986414d018c1ba to your computer and use it in GitHub Desktop.
Reddit Downloader with search
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Before running: | |
# Install 'praw' dependency (pip3 install praw) | |
# Register for reddit and imgur API application. (IMGUR only necessary for downloading an album) | |
# Replace those IDs and secrets on config | |
# On the config: | |
# desired_files: filetypes you want to download | |
# wanted_links: domains which links that are going to be saved to an output file | |
# To run on command line, it takes 2 arguments: search term and (optional) limit. If not specified, limit is 24. | |
# Example: python3 gist_reddit_downloader_search.py 'Boku No Hero Season 5' 12 | |
# Because the download script was targeted for anime discussions, subreddit and flair are fixed in the script. | |
import os | |
import json | |
import praw | |
import re | |
import requests | |
import string | |
import sys | |
import time | |
from urllib import parse | |
config = { | |
"client_id": "REDDIT_CLIENT_ID", | |
"client_secret": "REDDIT_CLIENT_SECRET", | |
"imgur_client_id": "IMGUR_CLIENT_ID", | |
"desired_files": ( | |
".gif", | |
".gifv", | |
".webm", | |
".jpeg", | |
".jpg", | |
".png", | |
".mp4" | |
), | |
"wanted_links": ( | |
"gyazo", | |
"youtube", | |
"myanimelist", | |
"wikipedia", | |
"youtu.be", | |
"twitter" | |
) | |
} | |
ignored = {} | |
exceptions = {} | |
errors_file = None | |
saved_links = None | |
print_output = None | |
post = {} | |
user_agent = "Reddit Downloader 0.1" | |
def my_print(text): | |
print_output.write("{}\n".format(text)) | |
def parse_url(url): | |
return(parse.urlparse(url)) | |
def valid_filename(filename): | |
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) | |
return ''.join(c for c in filename if c in valid_chars) | |
def format_filename(folder, title, file, extension='', separator=' - '): | |
return folder + valid_filename(title[:100] + separator + file + extension) | |
def download_file(filename, url, service, id, title=None): | |
try: | |
if not os.path.isfile(filename): | |
os.makedirs(os.path.dirname(filename), exist_ok=True) | |
r = requests.get(url, timeout=30) | |
with open(filename, 'wb') as outfile: | |
outfile.write(r.content) | |
except Exception as e: | |
errors_file.write("{} | {} | {}\n".format(url, filename, e)) | |
def save_link(folder, link): | |
text = "{}-{} | {} - {}\n" | |
saved_links.write(text.format(post["id"], post["title"], link["title"], link["url"])) | |
def save_exception(link): | |
text = "Exception: {}-{} | {} - {}\n" | |
saved_links.write(text.format(post["id"], post["title"], link["title"], link["url"])) | |
def ignore_link(folder, link): | |
domain = parse_url(link["url"]).netloc | |
if domain not in ignored: | |
exceptions[domain] = [] | |
exceptions[domain].append(link) | |
def get_service_and_id(link): | |
parts = link.split('/') | |
service = parts[2] | |
id = parts[-1] | |
return service, id | |
def download_link(folder, link, service=None, id=None): | |
if not service: | |
service, id = get_service_and_id(link["url"]) | |
url = link["url"] | |
fn = format_filename(folder, link["title"], url[url.rfind('/')+1:]) | |
download_file(fn, url, service, id, link["title"]) | |
def special_imgur_album(folder, link): | |
url = link["url"] | |
rf = url.rfind('#') | |
if rf > 0: | |
url = url[:url.rfind('#')] | |
albumname = url[url.rfind('/')+1:] | |
down_url = 'https://api.imgur.com/3/album/{}/images?client_id={}'.format(albumname, config["imgur_client_id"]) | |
r = requests.get(down_url) | |
try: | |
api_data = json.loads(r.content.decode("utf-8")) | |
data = api_data["data"] | |
df = format_filename(folder, link["title"], albumname) | |
df = df + "/" | |
i = 1 | |
if 'error' in data: | |
errors_file.write("{} | {} | {}\n".format("special_imgur_album", link["url"], data['error'])) | |
return | |
for img in data: | |
ttl = str(i)+" - "+(img.get("title") or img["id"]) | |
download_link(df, {'url': img["link"], 'title': ttl}, 'imgur', img["id"]) | |
i = i+1 | |
except Exception as inst: | |
print("ERROR::: ", albumname) | |
print(inst) | |
def special_imgur_image(folder, link): | |
url = link["url"] | |
fn = url[url.rfind('/')+1:] | |
url = url[:url.rfind('/')+1] + "download/" + fn | |
# TODO: change to download_file() | |
r = requests.get(url) | |
if "Content-Type" in r.headers: | |
try: | |
ft = r.headers["Content-Type"] | |
fn = fn + "." + ft[ft.rfind('/')+1:] | |
fn = format_filename(folder, link["title"], fn) | |
if not os.path.isfile(fn): | |
os.makedirs(os.path.dirname(fn), exist_ok=True) | |
with open(fn, 'wb') as outfile: | |
outfile.write(r.content) | |
except Exception as e: | |
errors_file.write("{} | {}\n".format(url, e)) | |
else: | |
errors_file.write("{} | {}\n".format("IGNORED: ", url)) | |
ignore_link(folder, link) | |
def special_imgur_gifv(folder, link): | |
url = link["url"] | |
gif_url = url.replace("gifv", "gif") | |
url = url.replace("gifv", "mp4") | |
my_print(" SKIPPING IMGUR GIFV {} {}".format(url[url.rfind('/')+1:], link["url"])) | |
save_link(folder, link) | |
def special_gfycat(folder, link): | |
gfycat_api_url = 'https://api.gfycat.com/v1/gfycats/{}' | |
name = link["url"] | |
name = name[name.rfind('/')+1:] | |
api_request = requests.get(gfycat_api_url.format(name)) | |
api_data = json.loads(api_request.content.decode("utf-8")) | |
if api_data and "gfyItem" in api_data: | |
url = api_data["gfyItem"]["webmUrl"] | |
fn = format_filename(folder, link["title"], name, '.webm') | |
# TODO: change to download_link | |
download_file(fn, url, 'gfycat', name, link["title"]) | |
else: | |
errors_file.write("{} | {} | {}\n".format("GFYCAT ", link, api_data)) | |
def special_streamable(folder, link): | |
apiurl = 'https://api.streamable.com/videos/{}' | |
name = link["url"] | |
name = name[name.rfind('/')+1:] | |
api_request = requests.get(apiurl.format(name)) | |
try: | |
my_print(link["url"]) | |
api_data = json.loads(api_request.content.decode("utf-8")) | |
if api_data and "files" in api_data: | |
data = api_data["files"] | |
if "mp4" in data: | |
url = data["mp4"]["url"] | |
url = "https://"+url[2:] # removes '//' at beginning | |
fn = format_filename(folder, link["title"], name, '.mp4') | |
download_file(fn, url, 'streamable', name, link["title"]) | |
else: | |
errors_file.write("{} | {}\n".format("streamable", api_data)) | |
except: | |
errors_file.write("{} | {}\n".format("streamable", api_request.content)) | |
actions = { | |
"download": download_link, | |
"special_imgur_album": special_imgur_album, | |
"special_imgur_image": special_imgur_image, | |
"special_imgur_gifv": special_imgur_gifv, | |
"special_gfycat": special_gfycat, | |
"special_streamable": special_streamable, | |
"save": save_link, | |
"ignore": ignore_link, | |
} | |
def get_links(comment): | |
# Links without title are not processed for now :( | |
# Anything that isn't a square closing bracket | |
name_regex = "[^]]+" | |
# http:// or https:// followed by anything but a closing parentheses | |
url_regex = "http[s]?://[^)]+" | |
markup_regex = "\[({0})]\(\s*({1})\s*\)".format(name_regex, url_regex) | |
ret = re.findall(markup_regex, comment.body) | |
links = [] | |
for itm in ret: | |
links.append({"title": itm[0], "url": itm[1]}) | |
return links | |
def check_link_action(url): | |
if url.endswith(".gifv"): | |
return "special_imgur_gifv" | |
# elif any(x in url for x in is_file): | |
elif url.endswith(config["desired_files"]): | |
return "download" | |
elif "imgur.com/a/" in url: | |
return "special_imgur_album" | |
elif "imgur.com/gallery/" in url: | |
return "special_imgur_album" | |
elif "imgur" in url: | |
return "special_imgur_image" | |
elif "gfycat" in url: | |
return "special_gfycat" | |
elif "streamable" in url: | |
return "special_streamable" | |
elif any(x in url for x in config["wanted_links"]): | |
return "save" | |
else: | |
return "ignore" | |
def download_links(folder, links): | |
for link in links: | |
link_action = check_link_action(link["url"]) | |
my_print(link["url"]) | |
actions[link_action](folder, link) | |
def format_comment_dict(c): | |
return { | |
"author": c.author.name if c.author else "", | |
"body": c.body, | |
"controversiality": c.controversiality, | |
"depth": c.depth, | |
"gilded": c.gilded, | |
"id": c.id, | |
"permalink": "https://reddit.com" + c.permalink, | |
"score": c.score | |
} | |
def download_posts_media(reddit, submission_list): | |
global ignored, exceptions, errors_file, saved_links, print_output, post | |
folder = "downloaded/" | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
errors_file = open("{}error_logs.txt".format(folder), "a+", encoding="utf-8") | |
saved_links = open("{}saved_links.txt".format(folder), "a+", encoding="utf-8") | |
print_output = open("{}output.txt".format(folder), "a+", encoding="utf-8") | |
if not isinstance(submission_list, list): | |
submission_list = [submission_list] | |
for idx, submission_id in enumerate(submission_list, start=1): | |
print('Downloading post {}'.format(idx)) | |
comments = [] | |
ignored = {} | |
exceptions = {} | |
submission = reddit.submission(id=submission_id) | |
my_print('Getting submission {} {}'.format(submission_id, submission.title)) | |
download_folder = "{}{}/".format(folder, valid_filename(submission.subreddit.display_name)) | |
comments_folder = "{}[{}]-{}/".format(download_folder, submission.id, submission.title) | |
post = {"url": submission.url, "title": submission.title, "id": submission.id} | |
if not os.path.exists(comments_folder): | |
os.makedirs(comments_folder) | |
submission.comments.replace_more(limit=None) | |
comment_queue = submission.comments[:] # Seed with top-level | |
i = 0 | |
while comment_queue: | |
i = i+1 | |
comment = comment_queue.pop(0) | |
comments.append(format_comment_dict(comment)) | |
url_list = get_links(comment) | |
download_links(comments_folder, url_list) | |
comment_queue.extend(comment.replies) | |
errors_file.flush() | |
saved_links.flush() | |
print_output.flush() | |
os.fsync(errors_file.fileno()) | |
os.fsync(saved_links.fileno()) | |
os.fsync(print_output.fileno()) | |
time.sleep(0.1) | |
errors_file.close() | |
saved_links.close() | |
print_output.close() | |
def search_posts(reddit, query='', subreddit='anime', sort='new', t='year', flair='Episode', l=None): | |
if flair and flair != '': | |
query = ' '.join((query, 'flair_name:"{}"'.format(flair))) | |
sr = reddit.subreddit(subreddit) | |
post_list = sr.search(query, sort=sort, syntax='lucene', | |
time_filter=t, limit=l) | |
posts = [post.id for post in post_list] | |
return posts | |
reddit = praw.Reddit( | |
user_agent=user_agent, client_id=config["client_id"], client_secret=config["client_secret"]) | |
search_term = sys.argv[1] | |
limit = int(sys.argv[2]) if len(sys.argv) >= 3 else 24 | |
posts = search_posts(reddit, search_term, l=limit) | |
print('{} posts found'.format(len(posts))) | |
download_posts_media(reddit, posts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment