Last active
November 16, 2018 18:02
-
-
Save aheadley/7f55102ae4aa09badbc943cb7bfa8a6b to your computer and use it in GitHub Desktop.
A stupid script to read reddit threads out loud
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import hashlib | |
import tempfile | |
import os | |
import os.path | |
import urllib.parse | |
import re | |
import functools | |
import logging | |
import multiprocessing | |
import subprocess | |
import pprint | |
import time | |
import glob | |
import binascii | |
import signal | |
from xml.sax.saxutils import escape as xml_escape | |
import boto3 | |
# import praw | |
import requests | |
import langid | |
LOG_LEVEL = logging.INFO | |
TMP_FILE_PREFIX = 'reddit-reader-' | |
PLAY_CMD = 'mpv -vo null {}' | |
EXIT_SENTINEL = -1 | |
USER_AGENT = 'python:reddit-reader:1.0 (by /u/not_that_guy_either)' | |
VALID_AUDIO_FORMATS = ['mp3', 'ogg_vorbis', 'pcm'] | |
VALID_SAMPLE_RATES = ['8000', '16000', '22050'] | |
VALID_VOICE_PITCHES = ['low', 'medium', 'high'] | |
MESSAGE_TEMPLATE = """ | |
<speak><prosody rate="{rate}" pitch="{pitch}">{message}</prosody></speak> | |
""".strip() | |
PHONEME_TEMPLATE = '<phoneme ph="{}">{}</phoneme>' | |
EMPHASIS_TEMPLATE = '<emphasis level="{}">{}</emphasis>' | |
TOKEN_REPLACEMENT_MAP = { | |
r'lol': 'ph:l o l', | |
r'\<3': 'heart', | |
r'(>\s*)+': 'quote ', | |
r'\[(.*?)\]\((.*?)\)': r'\1', | |
r'&': r'&', | |
r'#x200B;': r'', | |
} | |
STARTUP_MESSAGE = 'reddit-reader v0.1' | |
def multiprocessify(func): | |
@functools.wraps(func) | |
def wrapper(*pargs, **kwargs): | |
return multiprocessing.Process(target=func, args=pargs, kwargs=kwargs) | |
return wrapper | |
def get_worker_logger(worker_name, level=LOG_LEVEL): | |
logging.basicConfig() | |
log = logging.getLogger('reddit-reader.{}-{:05d}'.format(worker_name, os.getpid())) | |
log.setLevel(level) | |
return log | |
def walk_replies(reply_listing): | |
try: | |
if reply_listing['kind'] != 'more': | |
for reply in reply_listing['data']['children']: | |
if reply['kind'] != 'more': | |
try: | |
if reply['data']['body'] != '[deleted]': | |
yield (reply['data']['author'], reply['data']['body']) | |
except KeyError as err: | |
logging.exception(err) | |
logging.warn(pprint.pformat(reply['data'])) | |
continue | |
if reply['data']['replies'] != '': | |
for reply in walk_replies(reply['data']['replies']): | |
yield reply | |
except TypeError as err: | |
logging.exception(err) | |
logging.warn(pprint.pformat(reply_listing)) | |
def walk_comments(thread): | |
if thread[0]['data']['children'][0]['data']['selftext']: | |
yield (thread[0]['data']['children'][0]['data']['author'], | |
thread[0]['data']['children'][0]['data']['selftext']) | |
for child in thread[1]['data']['children']: | |
if child['kind'] != 'more': | |
if child['data']['body'] != '[deleted]': | |
yield (child['data']['author'], child['data']['body']) | |
if child['data']['replies'] != '': | |
for reply in walk_replies(child['data']['replies']): | |
yield reply | |
@multiprocessify | |
def handle_messages(msg_queue, audio_queue, config): | |
log = get_worker_logger('processor', opts.log_level) | |
log.debug('Worker process starting') | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
log.debug('Using temp file dir: %s', tempfile.gettempdir()) | |
for old_fn in glob.glob(os.path.join(tempfile.gettempdir(), TMP_FILE_PREFIX + '*.*')): | |
log.debug('Deleteing old temp file: %s', old_fn) | |
os.unlink(old_fn) | |
polly_client = boto3.client('polly', | |
aws_access_key_id=config.access_key, | |
aws_secret_access_key=config.secret_key, | |
region_name='us-east-1', | |
) | |
voices = sorted(polly_client.describe_voices()['Voices'], key=lambda v: v['Id']) | |
voice_langs = set(v['LanguageCode'].split('-')[0] for v in voices if len(v['LanguageCode'].split('-')[0]) == 2) | |
log.debug('Found %d voices from language families: %s', | |
len(voices), ', '.join(voice_langs)) | |
lang_classifier = langid.langid.LanguageIdentifier.from_modelstring( | |
langid.langid.model, norm_probs=True) | |
lang_classifier.set_languages(voice_langs) | |
# there's a large delay while langid loads the model on the first classify() | |
# call, so we do that now | |
lang_classifier.classify(STARTUP_MESSAGE) | |
log.debug('Ready to work') | |
qitem = msg_queue.get() | |
while qitem != EXIT_SENTINEL: | |
nick, orig_msg = qitem | |
msg = clean_message(orig_msg) | |
if not msg: | |
log.warning('Skipping garbage message: "%s"', msg) | |
# continue | |
msg = MESSAGE_TEMPLATE.format( | |
rate=config.speech_rate, | |
pitch=nick2bucket(nick, VALID_VOICE_PITCHES), | |
message=msg, | |
) | |
if config.force_lang: | |
msg_lang = config.default_lang | |
log.debug('Forcing message language to: %s', msg_lang) | |
else: | |
msg_lang, prob = lang_classifier.classify(orig_msg) | |
log.debug('Detected message language as: %s (p%0.5f)', msg_lang, prob) | |
if prob < config.confidence_threshold: | |
msg_lang = config.default_lang | |
log.debug('Detection confidence too low (<p%0.2f), defaulting to: %s', | |
config.confidence_threshold, msg_lang) | |
if msg_lang not in voice_langs: | |
msg_lang = config.default_lang | |
log.warning('Detected language is not an available voice, defaulting to: %s', msg_lang) | |
msg_voices = [v for v in voices if v['LanguageCode'].startswith(msg_lang + '-')] | |
voice = nick2bucket(nick, msg_voices) | |
log.debug('Synthesizing with "%s" [%s@%s]: "%s"', | |
voice['Id'], config.audio_format, config.sample_rate, msg) | |
try: | |
resp = polly_client.synthesize_speech( | |
Text=msg, | |
TextType='ssml', | |
VoiceId=voice['Id'], | |
OutputFormat=config.audio_format, | |
SampleRate=config.sample_rate, | |
) | |
except Exception as err: | |
log.warning(err) | |
log.exception(err) | |
else: | |
tmp_f, tmp_fn = tempfile.mkstemp( | |
suffix='.' + config.audio_format, | |
prefix=TMP_FILE_PREFIX, | |
) | |
os.close(tmp_f) | |
log.debug('Writing audio to: %s', tmp_fn) | |
with open(tmp_fn, 'wb') as tmp_f: | |
try: | |
tmp_f.write(resp['AudioStream'].read()) | |
except Exception as err: | |
log.error('Failed to write audio file (%s): %s', tmp_fn, err) | |
log.exception(err) | |
raise err | |
else: | |
audio_queue.put((tmp_fn, nick, orig_msg)) | |
log.debug('Queued audio file: %s', tmp_fn) | |
qitem = msg_queue.get() | |
log.debug('Worker process stopping') | |
@multiprocessify | |
def play_audio(audio_queue, play_cmd, opts): | |
log = get_worker_logger('player', opts.log_level) | |
log.debug('Worker process starting') | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
with open('/dev/null', 'w+b') as dev_null: | |
log.debug('Ready to work') | |
qitem = audio_queue.get() | |
while qitem != EXIT_SENTINEL: | |
fn, nick, msg = qitem | |
log.debug('Playing audio file: %s', fn) | |
try: | |
log.info('/u/%s >>> %s', nick, msg) | |
subprocess.call(play_cmd.format(fn).split(), stdout=dev_null, stderr=dev_null) | |
except Exception as err: | |
log.error('Failed to play audio file (%s): %s', fn, err) | |
log.exception(err) | |
else: | |
os.unlink(fn) | |
log.debug('Deleted audio file: %s', fn) | |
time.sleep(0.3) | |
qitem = audio_queue.get() | |
log.debug('Worker process stopping') | |
def nick2bucket(nick, buckets): | |
return buckets[int(binascii.hexlify(nick.strip().lower().encode('utf-8')), 16) % len(buckets)] | |
def clean_message(msg): | |
return ' '.join(clean_token(t) for t in msg.split()).strip() | |
def _clean_url(token): | |
try: | |
url = urllib.parse.urlparse(token) | |
if url.scheme.startswith('http'): | |
domain_parts = url.netloc.split('.') | |
# if first part of the domain is 1-3 homogeneous characters, drop it | |
# catches things like (i.)imgur.com or (www.)example.com while skipping | |
# something like bbc.uk | |
if 3 >= len(domain_parts[0]) and 1 == len(set(domain_parts[0])): | |
token = '.'.join(domain_parts[1:]) | |
else: | |
token = url.netloc | |
token = EMPHASIS_TEMPLATE.format('strong', token) | |
except Exception: pass | |
return token | |
def _clean_replacements(token): | |
for rgx, ph in TOKEN_REPLACEMENT_MAP.items(): | |
if re.match(rgx, token, re.I): | |
if ph is None: | |
token = '' | |
elif ph.startswith('ph:'): | |
token = PHONEME_TEMPLATE.format(ph[3:], token) | |
else: | |
token = ph | |
return token | |
def _clean_duplicates(token): | |
# reduce characters repeated 4+ times in a row to 2 characters | |
token = re.sub(r'((.)\2)(\2{2,})', r'\1', token) | |
if not token.startswith('<phoneme'): | |
token = xml_escape(token.strip()) | |
if token.startswith('*') and token.endswith('*') and len(token) >= 3: | |
token = EMPHASIS_TEMPLATE.format('strong', token.strip('*')) | |
return token | |
def clean_token(token): | |
return _clean_duplicates(_clean_url(_clean_replacements(_clean_url(token)))) | |
def main(opts): | |
log = get_worker_logger('main', opts.log_level) | |
json_url = opts.url + '.json' | |
log.debug('Pulling JSON from reddit: %s', json_url) | |
thread = requests.get(json_url, headers={'User-Agent': USER_AGENT}).json() | |
audio_queue = multiprocessing.Queue(opts.queue_size) | |
text_queue = multiprocessing.Queue(opts.queue_size) | |
log.debug('Starting workers...') | |
proc_handle = handle_messages(text_queue, audio_queue, opts) | |
proc_handle.start() | |
player_handle = play_audio(audio_queue, PLAY_CMD, opts) | |
player_handle.start() | |
try: | |
for comment in walk_comments(thread): | |
if comment[0] in ['AutoModerator']: | |
continue | |
log.debug('Queueing comment: %s: %s', comment[0], comment[1]) | |
text_queue.put(comment) | |
except KeyboardInterrupt: | |
log.warning('Caught ^C, exiting...') | |
log.debug('Sending processor exit signals') | |
text_queue.put(EXIT_SENTINEL) | |
audio_queue.put(EXIT_SENTINEL) | |
try: | |
log.debug('Waiting for text processor to exit') | |
proc_handle.join() | |
log.debug('Waiting for audio processor to exit') | |
player_handle.join() | |
except KeyboardInterrupt: | |
log.warning('Not waiting for graceful exit, terminating workers') | |
if proc_handle.is_alive(): | |
proc_handle.terminate() | |
if player_handle.is_alive(): | |
player_handle.terminate() | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('url', metavar='URL') | |
parser.add_argument('--default-lang', default='en') | |
parser.add_argument('--force-lang', action='store_true') | |
parser.add_argument('--confidence-threshold', default=0.4, type=float) | |
parser.add_argument('--audio-format', default='mp3', choices=VALID_AUDIO_FORMATS) | |
parser.add_argument('--sample-rate', default='22050', choices=VALID_SAMPLE_RATES) | |
parser.add_argument('--speech-rate', default=1.1, type=float) | |
parser.add_argument('--access-key', default=os.environ.get('AWS_ACCESS_KEY_ID')) | |
parser.add_argument('--secret-key', default=os.environ.get('AWS_SECRET_ACCESS_KEY')) | |
parser.add_argument('--queue-size', default=8, type=int) | |
parser.add_argument('-q', '--quiet', action='count', default=0) | |
parser.add_argument('-v', '--verbose', action='count', default=0) | |
opts = parser.parse_args() | |
opts.log_level = LOG_LEVEL + (10 * -opts.verbose) + (10 * opts.quiet) | |
main(opts) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
boto3==1.9.41 | |
botocore==1.12.41 | |
certifi==2018.10.15 | |
chardet==3.0.4 | |
docutils==0.14 | |
idna==2.7 | |
jmespath==0.9.3 | |
langid==1.1.6 | |
numpy==1.15.4 | |
praw==6.0.0 | |
prawcore==1.0.0 | |
python-dateutil==2.7.5 | |
requests==2.20.1 | |
s3transfer==0.1.13 | |
six==1.11.0 | |
update-checker==0.16 | |
urllib3==1.24.1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment