Created
December 31, 2015 22:48
-
-
Save TheEnigmaBlade/24205c62280b056fde3d to your computer and use it in GitHub Desktop.
MAL soulmate finde. For posts in /r/anime or similar subreddits.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
########## | |
# Config # | |
########## | |
# Dependencies: requests, praw 3, beautiful soup 4 | |
# Required config info: | |
# - A Reddit post | |
# - Reddit username and password | |
# - A set of Reddit oauth keys, which you can get here: https://www.reddit.com/prefs/apps/ | |
# - Your logged-in MAL session ID cookie (MALSESSIONID), which you can find with browser dev tools | |
post_url = "https://www.reddit.com/r/anime/comments/3yqco3/find_your_mal_soulmate/" | |
output_file = "scores.csv" | |
username = "" | |
password = "" | |
oauth_key = "" | |
oauth_secret = "" | |
reddit_useragent = "script:Comment and flair scraper for /r/anime:v1.0 (run by /u/"+username+")" | |
# MAL | |
mal_username = "" | |
mal_session_cookie = "" | |
mal_useragent = "script:Compatibility score scraper:v1.0 (run by "+mal_username+")" | |
############################## | |
# The stuff that does stuff # | |
# DON'T EDIT BELOW THIS LINE # | |
############################## | |
import praw, requests | |
from requests.auth import HTTPBasicAuth | |
_oauth_scopes = {"identity", "read"} | |
def init_reddit_session(): | |
try: | |
print("Connecting to reddit...", end=" ", flush=True) | |
r = praw.Reddit(user_agent=reddit_useragent) | |
print("logging in...", end=" ", flush=True) | |
if username is None or password is None: | |
return None | |
client_auth = HTTPBasicAuth(oauth_key, oauth_secret) | |
headers = {"User-Agent": reddit_useragent} | |
data = {"grant_type": "password", "username": username, "password": password} | |
response = requests.post("https://www.reddit.com/api/v1/access_token", auth=client_auth, headers=headers, data=data) | |
response_content = response.json() | |
if "error" in response_content and response_content["error"] != 200: | |
print("failed!\nResponse code = {}".format(response_content["error"])) | |
return None | |
token = response_content["access_token"] | |
if response_content["token_type"] != "bearer": | |
return None | |
r.set_oauth_app_info(oauth_key, oauth_secret, "http://example.com/unused/redirect/uri") | |
r.set_access_credentials(_oauth_scopes, access_token=token) | |
r.config.api_request_delay = 1 | |
print("done!", flush=True) | |
return r | |
except Exception as e: | |
print("failed! Couldn't connect: {}".format(e)) | |
raise e | |
def destroy_reddit_session(r): | |
r.clear_authentication() | |
def get_post_comments(r, post_url): | |
# Get post | |
post = r.get_submission(url=post_url) | |
# Get comments | |
def recur_comments(comments, comment_texts, processed): | |
for comment in comments: | |
# Load more comments | |
if isinstance(comment, praw.objects.MoreComments): | |
recur_comments(comment.comments(), comment_texts, processed) | |
elif comment.author is not None and comment.author.name not in processed: #comment.id | |
# Top-level comment, maybe giving MAL profile | |
if comment.parent_id.startswith("t3_") and comment.body is not None: | |
comment_texts.append((comment.body, comment.author.name)) | |
processed.add(comment.author.name) | |
# Any other comment, maybe containing MAL profile | |
elif comment.author_flair_text is not None: | |
comment_texts.append((comment.author_flair_text, comment.author.name)) | |
processed.add(comment.author.name) | |
return comment_texts | |
comment_texts = recur_comments(post.comments, list(), set()) | |
return comment_texts | |
# MAL | |
import re | |
from time import time, sleep | |
from bs4 import BeautifulSoup | |
_mal_url_regex = re.compile("myanimelist\.net/(?:profile|animelist)/([a-z0-9_-]+)", re.I) | |
def find_mal_names(comment): | |
for match in _mal_url_regex.finditer(comment): | |
yield match.group(1) | |
_mal_url_base = "http://myanimelist.net/profile/{name}" | |
_mal_rate_limit = 2.0 | |
_mal_last = 0 | |
def get_mal_compatibility_score(name): | |
# Rate limit | |
global _mal_last | |
diff = time() - _mal_last | |
if diff < _mal_rate_limit: | |
sleep(diff) | |
# Send request | |
headers = {"User-Agent": mal_useragent} | |
cookies = {"MALSESSIONID": mal_session_cookie, "is_logged_in": "1"} | |
resp = requests.get(_mal_url_base.format(name=name), headers=headers, cookies=cookies) | |
_mal_last = time() | |
if resp.ok: | |
# I wish I didn't need to do this... | |
html = BeautifulSoup(resp.text, 'html.parser') | |
score = extract_score(html) | |
if score: | |
return score | |
else: | |
print("MAL score not found on page") | |
else: | |
print("MAL request failed, {}: {}".format(resp.status_code, resp.reason)) | |
return None | |
def extract_score(html): | |
score_panel = html.find(class_="user-compatability-graph") | |
if score_panel: | |
def convert_score(se): | |
if len(se) > 0: | |
s = se[0].string.strip().strip("%").replace("--", "-") | |
if s != "0": | |
return float(s) | |
return None | |
negative = score_panel.select(".bar-outer-negative span") | |
positive = score_panel.select(".bar-outer-positive span") | |
negative_score = convert_score(negative) | |
if negative_score: | |
return negative_score | |
return convert_score(positive) | |
return None | |
# Main | |
def main(): | |
# Get lists | |
print("Getting comments... ", end="", flush=True) | |
r = init_reddit_session() | |
comments = get_post_comments(r, post_url) | |
found_names = set() | |
for comment in comments: | |
for name in find_mal_names(comment[0]): | |
found_names.add((name, comment[1])) | |
destroy_reddit_session(r) | |
print("done!", flush=True) | |
# Get scores | |
print("Getting scores... ", end="", flush=True) | |
scores = list() | |
for name in found_names: | |
score = get_mal_compatibility_score(name[0]) | |
if score: | |
print("Score for /u/{} ({}): {}".format(name[1], name[0], score)) | |
scores.append((name[1], name[0], score)) | |
print("done!", flush=True) | |
# Sort by score | |
print("Sorting scores... ", end="", flush=True) | |
scores_sorted = sorted(scores, key=lambda x: x[2], reverse=True) | |
print("done!", flush=True) | |
# Save to file | |
print("Writing results... ", end="", flush=True) | |
with open(output_file, "w") as out: | |
out.write("Reddit,MAL,Score\n") | |
for score in scores_sorted: | |
out.write("{},{},{}\n".format(*score)) | |
print("done!", flush=True) | |
# Print highest | |
print("\nHighest 10 scores:") | |
for i, score in enumerate(scores_sorted, start=1): | |
print("{}. /u/{} ({}): {}".format(i, *score)) | |
if i >= 10: | |
break | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment