Skip to content

Instantly share code, notes, and snippets.

@theabbie
Created January 30, 2025 09:45
Show Gist options
  • Save theabbie/6ba4e6070970271f5e35e680e9309fda to your computer and use it in GitHub Desktop.
Save theabbie/6ba4e6070970271f5e35e680e9309fda to your computer and use it in GitHub Desktop.
Cron-powered Repost bot for Instagram, Reddit and Imgur with Rate-limit Queue.
import os
import json
import re
import instaloader
import requests
from flask import Flask, request, jsonify
from urllib.parse import urlparse
import firebase_admin
from firebase_admin import credentials, firestore
import praw
reddit = praw.Reddit(
client_id=os.getenv("REDDIT_CLIENT_ID"),
client_secret=os.getenv("REDDIT_CLIENT_SECRET"),
user_agent="theabbie",
)
app = Flask(__name__)
cred_json = os.getenv('FIREBASE')
cred_dict = eval(cred_json)
cred = credentials.Certificate(cred_dict)
firebase_admin.initialize_app(cred)
db = firestore.client()
QUEUE_DOC = "reelspub"
def is_valid_url(url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False
def load_queue():
doc_ref = db.collection('queues').document(QUEUE_DOC)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict().get('queue', [])
else:
return []
def save_queue(queue):
doc_ref = db.collection('queues').document(QUEUE_DOC)
doc_ref.set({'queue': queue})
def add_to_queue(task):
queue = load_queue()
queue.append(task)
save_queue(queue)
def clean_caption(caption):
caption = re.sub(r"#\S+", "", caption)
caption = re.sub(r"@\S+", "", caption)
caption = re.sub(r"http\S+", "", caption)
caption = re.sub(r"\s+", " ", caption)
caption = caption.strip()
cleaned_caption = ""
for char in caption:
if char.isalpha() or char in [",", "'", " "]:
cleaned_caption += char
else:
break
if len(cleaned_caption) > 100:
cleaned_caption = cleaned_caption[:100] + "..."
return cleaned_caption
def get_access_token():
refresh_token = os.getenv("IMGUR_REFRESH_TOKEN")
client_id = os.getenv("IMGUR_CLIENT_ID")
client_secret = os.getenv("IMGUR_CLIENT_SECRET")
data = {
"refresh_token": refresh_token,
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "refresh_token",
}
response = requests.post("https://api.imgur.com/oauth2/token", data=data)
response.raise_for_status()
return response.json()["access_token"]
def upload_video_to_imgur(video_path, access_token, title="Funny Video"):
with open(video_path, "rb") as video_file:
data = {"type": "file", "title": title, "disable_audio": "0"}
files = {"video": video_file}
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.post("https://api.imgur.com/3/upload", data=data, files=files, headers=headers)
response.raise_for_status()
return response.json()["data"]["id"]
def upload_image_to_imgur(video_path, access_token, title="Funny Video"):
with open(video_path, "rb") as video_file:
data = {"type": "file", "title": title}
files = {"image": video_file}
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.post("https://api.imgur.com/3/upload", data=data, files=files, headers=headers)
response.raise_for_status()
return response.json()["data"]["id"]
def share_image(image_id, access_token, title="Funny Video"):
data = {
"title": title,
"topic": "Funny",
"terms": "1"
}
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.post(f"https://api.imgur.com/3/gallery/image/{image_id}", data=data, headers=headers)
response.raise_for_status()
return response.json()
def process_instagram_reel(reel_url):
loader = instaloader.Instaloader()
try:
shortcode = reel_url.split("/")[-2]
post = instaloader.Post.from_shortcode(loader.context, shortcode)
if not post.is_video:
raise Exception("The provided link is not a video")
title = clean_caption(post.caption) if post.caption else "Funny Video"
video_url = post.video_url
video_response = requests.get(video_url, stream=True)
video_response.raise_for_status()
video_path = "/tmp/downloaded_video.mp4"
with open(video_path, "wb") as video_file:
for chunk in video_response.iter_content(chunk_size=8192):
video_file.write(chunk)
access_token = get_access_token()
image_id = upload_video_to_imgur(video_path, access_token, title=title)
share_image(image_id, access_token, title=title)
os.remove(video_path)
return title
except Exception as e:
raise Exception(f"Failed to process reel: {str(e)}")
def process_reddit_post(reddit_url):
submission = reddit.submission(url=reddit_url)
title = submission.title if hasattr(submission, 'title') else "Cool"
if hasattr(submission, 'media') and submission.media and 'reddit_video' in submission.media:
video_url = submission.media['reddit_video']['fallback_url']
video_response = requests.get(video_url, stream=True)
video_response.raise_for_status()
video_path = "/tmp/reddit_video.mp4"
with open(video_path, "wb") as video_file:
for chunk in video_response.iter_content(chunk_size=8192):
video_file.write(chunk)
access_token = get_access_token()
image_id = upload_video_to_imgur(video_path, access_token, title=title)
share_image(image_id, access_token, title=title)
os.remove(video_path)
return f"Reddit video '{title}' uploaded"
elif hasattr(submission, 'url') and submission.url.lower().endswith(('jpg', 'jpeg', 'png')):
image_url = submission.url
file_extension = os.path.splitext(image_url)[1]
image_path = f"/tmp/reddit_image{file_extension}"
image_response = requests.get(image_url, stream=True)
image_response.raise_for_status()
with open(image_path, "wb") as image_file:
for chunk in image_response.iter_content(chunk_size=8192):
image_file.write(chunk)
access_token = get_access_token()
image_id = upload_image_to_imgur(image_path, access_token, title=title)
share_image(image_id, access_token, title=title)
os.remove(image_path)
return f"Reddit image '{title}' uploaded"
else:
return "Reddit post is not a video or image"
def process_post(url):
parsed_url = urlparse(url)
if "reddit.com" in parsed_url.netloc:
return process_reddit_post(url)
elif "instagram.com" in parsed_url.netloc:
return process_instagram_reel(url)
else:
raise ValueError("Unsupported URL")
@app.route("/instagram", methods=["POST"])
def instagram():
reel_url = request.form.get("description")
if not reel_url or not is_valid_url(reel_url):
return jsonify({"error": "Invalid or missing Instagram reel URL"}), 400
add_to_queue({"reel_url": reel_url, "retries": 3})
return jsonify({"message": "Reel added to queue"}), 200
def clean(queue):
filtered_queue = [task for task in queue if task["retries"] > 0]
seen = {}
for task in filtered_queue:
shortcode = task["reel_url"]
if shortcode not in seen or seen[shortcode]["retries"] < task["retries"]:
seen[shortcode] = task
return list(seen.values())
@app.route("/retry", methods=["GET"])
def retry():
queue = load_queue()
if queue:
task = queue.pop(0)
try:
process_post(task["reel_url"])
except Exception:
task["retries"] -= 1
if task["retries"] > 0:
queue.append(task)
queue = clean(queue)
save_queue(queue)
return jsonify({"status": "Retry completed", "remaining": len(queue)}), 200
@app.route("/view", methods=["GET"])
def view_queue():
queue = load_queue()
return jsonify({"queue": queue})
@app.route("/")
def home():
return "Welcome to the Flask app! Use the /instagram, /retry, or /view routes."
if __name__ == "__main__":
app.run(debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment