Created
January 30, 2025 09:45
-
-
Save theabbie/6ba4e6070970271f5e35e680e9309fda to your computer and use it in GitHub Desktop.
Cron-powered Repost bot for Instagram, Reddit and Imgur with Rate-limit Queue.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import re | |
import instaloader | |
import requests | |
from flask import Flask, request, jsonify | |
from urllib.parse import urlparse | |
import firebase_admin | |
from firebase_admin import credentials, firestore | |
import praw | |
reddit = praw.Reddit( | |
client_id=os.getenv("REDDIT_CLIENT_ID"), | |
client_secret=os.getenv("REDDIT_CLIENT_SECRET"), | |
user_agent="theabbie", | |
) | |
app = Flask(__name__) | |
cred_json = os.getenv('FIREBASE') | |
cred_dict = eval(cred_json) | |
cred = credentials.Certificate(cred_dict) | |
firebase_admin.initialize_app(cred) | |
db = firestore.client() | |
QUEUE_DOC = "reelspub" | |
def is_valid_url(url): | |
try: | |
result = urlparse(url) | |
return all([result.scheme, result.netloc]) | |
except ValueError: | |
return False | |
def load_queue(): | |
doc_ref = db.collection('queues').document(QUEUE_DOC) | |
doc = doc_ref.get() | |
if doc.exists: | |
return doc.to_dict().get('queue', []) | |
else: | |
return [] | |
def save_queue(queue): | |
doc_ref = db.collection('queues').document(QUEUE_DOC) | |
doc_ref.set({'queue': queue}) | |
def add_to_queue(task): | |
queue = load_queue() | |
queue.append(task) | |
save_queue(queue) | |
def clean_caption(caption): | |
caption = re.sub(r"#\S+", "", caption) | |
caption = re.sub(r"@\S+", "", caption) | |
caption = re.sub(r"http\S+", "", caption) | |
caption = re.sub(r"\s+", " ", caption) | |
caption = caption.strip() | |
cleaned_caption = "" | |
for char in caption: | |
if char.isalpha() or char in [",", "'", " "]: | |
cleaned_caption += char | |
else: | |
break | |
if len(cleaned_caption) > 100: | |
cleaned_caption = cleaned_caption[:100] + "..." | |
return cleaned_caption | |
def get_access_token(): | |
refresh_token = os.getenv("IMGUR_REFRESH_TOKEN") | |
client_id = os.getenv("IMGUR_CLIENT_ID") | |
client_secret = os.getenv("IMGUR_CLIENT_SECRET") | |
data = { | |
"refresh_token": refresh_token, | |
"client_id": client_id, | |
"client_secret": client_secret, | |
"grant_type": "refresh_token", | |
} | |
response = requests.post("https://api.imgur.com/oauth2/token", data=data) | |
response.raise_for_status() | |
return response.json()["access_token"] | |
def upload_video_to_imgur(video_path, access_token, title="Funny Video"): | |
with open(video_path, "rb") as video_file: | |
data = {"type": "file", "title": title, "disable_audio": "0"} | |
files = {"video": video_file} | |
headers = {"Authorization": f"Bearer {access_token}"} | |
response = requests.post("https://api.imgur.com/3/upload", data=data, files=files, headers=headers) | |
response.raise_for_status() | |
return response.json()["data"]["id"] | |
def upload_image_to_imgur(video_path, access_token, title="Funny Video"): | |
with open(video_path, "rb") as video_file: | |
data = {"type": "file", "title": title} | |
files = {"image": video_file} | |
headers = {"Authorization": f"Bearer {access_token}"} | |
response = requests.post("https://api.imgur.com/3/upload", data=data, files=files, headers=headers) | |
response.raise_for_status() | |
return response.json()["data"]["id"] | |
def share_image(image_id, access_token, title="Funny Video"): | |
data = { | |
"title": title, | |
"topic": "Funny", | |
"terms": "1" | |
} | |
headers = {"Authorization": f"Bearer {access_token}"} | |
response = requests.post(f"https://api.imgur.com/3/gallery/image/{image_id}", data=data, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
def process_instagram_reel(reel_url): | |
loader = instaloader.Instaloader() | |
try: | |
shortcode = reel_url.split("/")[-2] | |
post = instaloader.Post.from_shortcode(loader.context, shortcode) | |
if not post.is_video: | |
raise Exception("The provided link is not a video") | |
title = clean_caption(post.caption) if post.caption else "Funny Video" | |
video_url = post.video_url | |
video_response = requests.get(video_url, stream=True) | |
video_response.raise_for_status() | |
video_path = "/tmp/downloaded_video.mp4" | |
with open(video_path, "wb") as video_file: | |
for chunk in video_response.iter_content(chunk_size=8192): | |
video_file.write(chunk) | |
access_token = get_access_token() | |
image_id = upload_video_to_imgur(video_path, access_token, title=title) | |
share_image(image_id, access_token, title=title) | |
os.remove(video_path) | |
return title | |
except Exception as e: | |
raise Exception(f"Failed to process reel: {str(e)}") | |
def process_reddit_post(reddit_url): | |
submission = reddit.submission(url=reddit_url) | |
title = submission.title if hasattr(submission, 'title') else "Cool" | |
if hasattr(submission, 'media') and submission.media and 'reddit_video' in submission.media: | |
video_url = submission.media['reddit_video']['fallback_url'] | |
video_response = requests.get(video_url, stream=True) | |
video_response.raise_for_status() | |
video_path = "/tmp/reddit_video.mp4" | |
with open(video_path, "wb") as video_file: | |
for chunk in video_response.iter_content(chunk_size=8192): | |
video_file.write(chunk) | |
access_token = get_access_token() | |
image_id = upload_video_to_imgur(video_path, access_token, title=title) | |
share_image(image_id, access_token, title=title) | |
os.remove(video_path) | |
return f"Reddit video '{title}' uploaded" | |
elif hasattr(submission, 'url') and submission.url.lower().endswith(('jpg', 'jpeg', 'png')): | |
image_url = submission.url | |
file_extension = os.path.splitext(image_url)[1] | |
image_path = f"/tmp/reddit_image{file_extension}" | |
image_response = requests.get(image_url, stream=True) | |
image_response.raise_for_status() | |
with open(image_path, "wb") as image_file: | |
for chunk in image_response.iter_content(chunk_size=8192): | |
image_file.write(chunk) | |
access_token = get_access_token() | |
image_id = upload_image_to_imgur(image_path, access_token, title=title) | |
share_image(image_id, access_token, title=title) | |
os.remove(image_path) | |
return f"Reddit image '{title}' uploaded" | |
else: | |
return "Reddit post is not a video or image" | |
def process_post(url): | |
parsed_url = urlparse(url) | |
if "reddit.com" in parsed_url.netloc: | |
return process_reddit_post(url) | |
elif "instagram.com" in parsed_url.netloc: | |
return process_instagram_reel(url) | |
else: | |
raise ValueError("Unsupported URL") | |
@app.route("/instagram", methods=["POST"]) | |
def instagram(): | |
reel_url = request.form.get("description") | |
if not reel_url or not is_valid_url(reel_url): | |
return jsonify({"error": "Invalid or missing Instagram reel URL"}), 400 | |
add_to_queue({"reel_url": reel_url, "retries": 3}) | |
return jsonify({"message": "Reel added to queue"}), 200 | |
def clean(queue): | |
filtered_queue = [task for task in queue if task["retries"] > 0] | |
seen = {} | |
for task in filtered_queue: | |
shortcode = task["reel_url"] | |
if shortcode not in seen or seen[shortcode]["retries"] < task["retries"]: | |
seen[shortcode] = task | |
return list(seen.values()) | |
@app.route("/retry", methods=["GET"]) | |
def retry(): | |
queue = load_queue() | |
if queue: | |
task = queue.pop(0) | |
try: | |
process_post(task["reel_url"]) | |
except Exception: | |
task["retries"] -= 1 | |
if task["retries"] > 0: | |
queue.append(task) | |
queue = clean(queue) | |
save_queue(queue) | |
return jsonify({"status": "Retry completed", "remaining": len(queue)}), 200 | |
@app.route("/view", methods=["GET"]) | |
def view_queue(): | |
queue = load_queue() | |
return jsonify({"queue": queue}) | |
@app.route("/") | |
def home(): | |
return "Welcome to the Flask app! Use the /instagram, /retry, or /view routes." | |
if __name__ == "__main__": | |
app.run(debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment