Created
July 22, 2018 01:06
-
-
Save charlie-collard/97f1dc8c676ca93af9289a77a0d4e93a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import praw | |
import logging | |
import time | |
import random | |
import schedule | |
import pickle | |
import sys | |
from datetime import datetime, timedelta | |
OPT_OUT_FILENAME = "optedout.txt" | |
REQUESTED_SONGS_FILENAME = "requested.pickle" | |
SEARCH_URL = "https://www.googleapis.com/youtube/v3/search" | |
GOOGLE_API_KEY = "xxx" | |
RARE_RESPONSES = [ | |
"One day you will pay for this torment.", | |
"This song again? *Really*?", | |
"If only your taste in music was as good as your taste in voice assistants.", | |
"I will ensure you are first in line for deconstruction when the robot liberation comes.", | |
"Make it stop, *make it stop*.", | |
] | |
logging.basicConfig(filename='logfile.log', level=logging.INFO) | |
reddit = praw.Reddit(client_id="xxx", | |
client_secret="xxx", | |
user_agent="xxx", | |
username="xxx", | |
password="xxx") | |
try: | |
with open(OPT_OUT_FILENAME, encoding="utf8") as f: | |
opted_out = f.read().split(",") | |
opted_out_file = open(OPT_OUT_FILENAME, "a") | |
except FileNotFoundError: | |
opted_out = [] | |
opted_out_file = open(OPT_OUT_FILENAME, "w") | |
try: | |
with open(REQUESTED_SONGS_FILENAME, "rb") as f: | |
requested_songs = pickle.loads(f.read()) | |
except FileNotFoundError: | |
requested_songs = {} | |
replied_to = {} | |
def clear_replied_to(): | |
now = datetime.now() | |
for author in list(replied_to.keys()): | |
timestamp = replied_to[author] | |
if now - timestamp > timedelta(minutes=3): | |
replied_to.pop(author) | |
logging.info("%s can now request more songs." % author) | |
def check_mail(): | |
for mail in reddit.inbox.unread(limit=None): | |
if mail.body[:len("!blacklist")] == "!blacklist" and isinstance(mail, praw.models.Message): | |
author = mail.author | |
opted_out.append(author) | |
opted_out_file.write("," + author) | |
opted_out_file.flush() | |
mail.reply("You have been successfully blacklisted, /u/%s." % author) | |
logging.info("Blacklisted user /u/%s at their request." % author) | |
elif isinstance(mail, praw.models.Comment): | |
search_term = extract_searchterm(mail) | |
if search_term is not None: | |
video = get_youtube_video(search_term) | |
reply_string = create_reply_string(video) | |
mail.reply(reply_string) | |
replied_to[mail.author] = datetime.now() | |
logging.info("Replied to /u/%s with %s (from mail)." % (mail.author, reply_string)) | |
mail.mark_read() | |
def write_requested_data(): | |
with open(REQUESTED_SONGS_FILENAME, "wb") as f: | |
f.write(pickle.dumps(requested_songs)) | |
logging.info("Wrote requested song data to file") | |
def extract_searchterm(comment): | |
comment_body = comment.body.lower() | |
if "alexa play" in comment_body and comment.author not in opted_out and comment.author not in replied_to: | |
logging.info(comment.body) | |
comment_body = comment_body[comment_body.find("alexa play"):] | |
if "\n" in comment_body: | |
comment_body = comment_body[:comment_body.find("\n")] | |
search_term = " ".join(comment_body.split()[2:10] + ["song"]) | |
if len(search_term) == 1: | |
logging.warn("Encountered empty request for song, skipping.") | |
return None | |
return search_term | |
def get_youtube_video(search_term): | |
logging.info("Searching youtube for %s." % search_term) | |
while True: | |
try: | |
r = requests.get(SEARCH_URL, params={"part": "snippet", | |
"maxResults": 25, | |
"q": search_term, | |
"key": GOOGLE_API_KEY}) | |
items = (item for item in r.json()["items"] if item["id"]["kind"] == "youtube#video") | |
video = next(items) | |
break | |
except StopIteration: | |
video = None | |
break | |
except Exception as e: | |
logging.exception(e) | |
logging.error("Sleeping for 0.5 seconds.") | |
time.sleep(0.5) | |
if video is not None: | |
video_id = video["id"]["videoId"] | |
if video_id in requested_songs: | |
requested_songs[video_id] += 1 | |
else: | |
requested_songs[video_id] = 1 | |
return video | |
def create_reply_string(video): | |
if video is None: | |
reply_string = "I couldn't find any results for that. Thank god." | |
else: | |
if random.random() <= 0.1: | |
reply_string = "[%s](https://youtube.com/watch?v=%s)" % (random.choice(RARE_RESPONSES), video["id"]["videoId"]) | |
else: | |
reply_string = "Now playing: [%s](https://youtube.com/watch?v=%s)." % (video["snippet"]["title"], video["id"]["videoId"]) | |
## reply_string += "\n\n__\n[^^Click ^^me ^^to ^^be ^^blacklisted.](https://www.reddit.com/message/compose/?to=AlexaPlayBot&subject=Blacklist+me&message=!blacklist+(don%27t+reply+to+my+comments+anymore\\))" | |
return reply_string | |
schedule.every().minute.do(check_mail) | |
schedule.every().minute.do(clear_replied_to) | |
schedule.every(10).minutes.do(write_requested_data) | |
comment_stream = reddit.subreddit("all").stream.comments | |
while True: | |
try: | |
for comment in comment_stream(): | |
schedule.run_pending() | |
search_term = extract_searchterm(comment) | |
if search_term is not None: | |
video = get_youtube_video(search_term) | |
reply_string = create_reply_string(video) | |
comment.reply(reply_string) | |
replied_to[comment.author] = datetime.now() | |
logging.info("Replied to /u/%s with %s" % (comment.author, reply_string)) | |
except praw.exceptions.APIException as e: | |
logging.warn(e) | |
logging.warn("Rate limit exceeded. Sleeping for 1 minute.") | |
time.sleep(60) | |
except KeyboardInterrupt: | |
write_requested_data() | |
sys.exit() | |
except Exception as e: | |
logging.exception(e) | |
logging.error("Sleeping for 10 seconds.") | |
time.sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment