Created
July 19, 2017 08:47
-
-
Save junmakii/864aa7d9207923061e5a614ac9cb5f9e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # !/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| # Copyright (C) 2017 Jun Makii | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details. | |
| # You should have received a copy of the GNU General Public License | |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| """A utility for Amazon Polly. | |
| Examples | |
| $ python aws_polly_utils.py cmd -c 'get_speech(**kwargs)' -t 'Hello World' --VoiceId Emma | |
| $ python aws_polly_utils.py cmd -c 'get_speech(**kwargs)' -f /tmp/text.txt --VoiceId Mizuki | |
| ``example.py``:: | |
| aws_polly_utils.get_speech(**dict( | |
| aws_polly_utils.parse_args(['cmd']), | |
| output='out.mp3', | |
| output_dir='/tmp', | |
| text='Hello World')) | |
| """ | |
| import os | |
| import sys | |
| import itertools | |
| import functools | |
| import argparse | |
| import csv | |
| import subprocess | |
| import tempfile | |
| import boto3 | |
| import json | |
| import re | |
| import logging | |
| import nltk.tokenize | |
| import code | |
| import pprint | |
| import moviepy | |
| import logging | |
| import jmespath | |
| logger = logging.getLogger( | |
| os.path.splitext(os.path.basename(__file__))[0]) | |
| try: | |
| import IPython | |
| except: | |
| pass | |
| __author__ = "Jun Makii" | |
| __copyright__ = "Copyright (C) 2017 Jun Makii" | |
| __license__ = "GNU General Public License v3" | |
| __version__ = "1.0" | |
| ENCODING = 'utf-8' | |
| NAME = os.path.splitext(os.path.basename(__file__))[0].decode(ENCODING) | |
| COMMAND_OPTIONS = { | |
| "default": [ | |
| ], | |
| "cmd": [ | |
| { | |
| "args": ['-t', '--text'] | |
| }, | |
| { | |
| "args": ['-o', '--output'], | |
| "default": 'out.mp3', | |
| }, | |
| { | |
| "args": ['-e', '--encoding'], | |
| "default": 'utf-8', | |
| }, | |
| { | |
| "args": ['-f', '--text_file'] | |
| }, | |
| { | |
| "args": ["-c", "--command"], | |
| "default": [], | |
| "help": "_", | |
| "action": "append" | |
| }, | |
| { | |
| "args": ['-d', '--debug'], | |
| "action": "store_true", | |
| "help": "_" | |
| }, | |
| { | |
| "args": ['-i', '--input'], | |
| "action": "append", | |
| "default": [], | |
| "help": "_" | |
| }, | |
| { | |
| "args": ['-s', '--shell'], | |
| "action": "store_true", | |
| "help": "_" | |
| }, | |
| { | |
| "args": ["--aws_access_key_file"], | |
| "help": '_', | |
| "default": '/home/junmakii/workspace/texts/aws-accessKeys.csv', | |
| }, | |
| { | |
| "args": ['-V', '--VoiceId'], | |
| "help": '_', | |
| "default": 'Emma', | |
| }, | |
| ] | |
| } | |
| logger = logging.getLogger( | |
| NAME | |
| ) # type: logging.Logger | |
| def get_client( | |
| aws_access_key_id=None, | |
| aws_secret_access_key=None, | |
| aws_access_key_file=None, | |
| *args, **kwargs): | |
| """ | |
| :rtype: botocore.client.Polly | |
| """ | |
| if aws_access_key_file: | |
| with open(aws_access_key_file) as fp: | |
| aws_access_key_id, aws_secret_access_key = list(csv.reader(fp))[1] | |
| return boto3.client( | |
| 'polly', | |
| endpoint_url='https://polly.us-west-2.amazonaws.com', | |
| region_name='us-west-2', | |
| api_version='2016-06-10', | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key, | |
| ) | |
| def split_to_sentences(text, max_length=20): | |
| sentences = nltk.sent_tokenize(text) | |
| a = [] | |
| b = [] | |
| for sentence in sentences: | |
| length = len(reduce( | |
| lambda i, m: | |
| i + m, | |
| a + [sentence])) | |
| if length > max_length and len(a): | |
| b.append(' '.join(a)) | |
| a = [] | |
| a.append(sentence) | |
| b.append(' '.join(a)) | |
| return b | |
| def split_to_words(text, max_length=20): | |
| try: | |
| tokens = nltk.tokenize.word_tokenize(text) | |
| except Exception as err: | |
| print(text) | |
| raise err | |
| a = [] | |
| b = [] | |
| for token in tokens: | |
| length = len(reduce( | |
| lambda i, m: | |
| i + ' ' + m, | |
| a + [token])) | |
| if length > max_length: | |
| b.append(' '.join(a)) | |
| a = [] | |
| a.append(token) | |
| b.append(' '.join(a)) | |
| return b | |
| def split_text( | |
| text, | |
| max_length=3000, *args, **kwargs): | |
| sentences = split_to_sentences(text, max_length=max_length) | |
| words = map(lambda i: split_to_words(i, max_length=max_length), | |
| sentences) | |
| splited_sentences = list(itertools.chain.from_iterable(words)) | |
| return splited_sentences | |
| def speech( | |
| client, Text, VoiceId='Emma', | |
| OutputFormat='mp3', # ogg_vorbis, json, mp3, pcm | |
| *args, **kwargs): | |
| """ | |
| :rtype: dict | |
| """ | |
| return client.synthesize_speech( | |
| VoiceId=VoiceId, | |
| Text=Text, | |
| OutputFormat=OutputFormat) | |
| def get_voice_by_sentences( | |
| client, | |
| sentences=[], | |
| output_dir='/tmp', | |
| encoding=ENCODING, | |
| *args, **kwargs): | |
| """ | |
| :rtype: Union[str] | |
| """ | |
| client = get_client() | |
| counter = itertools.count(1) | |
| paths = [] | |
| for sentence in sentences: | |
| prefix = u"{0:08d}".format((next(counter))) + u'-' | |
| path = ( | |
| prefix | |
| + sentence[:100] | |
| .replace(u"'", u"") | |
| .replace(u'/', u'~') + u'.mp3') | |
| path = os.path.join( | |
| output_dir, | |
| (path if path else 'None')) | |
| voice = speech(client, sentence, **kwargs) | |
| stream = voice['AudioStream'] | |
| data = stream.read() | |
| with open(path.encode(ENCODING), 'wb') as fp: | |
| fp.write(data) | |
| stream.close() | |
| logger.info(path) | |
| paths.append(path) | |
| return paths | |
| def get_speech(text, output='out.mp3', output_dir='/tmp', *args, **kwargs): | |
| sentences = split_text(text) | |
| client = get_client() | |
| paths = [ | |
| os.path.abspath(i) | |
| for i in get_voice_by_sentences( | |
| client, sentences, output_dir, **kwargs) | |
| ] | |
| concatenate_audio(paths, output) | |
| def concatenate_audio( | |
| files, output='out.mp3', | |
| base_cmd=u'ffmpeg -f concat -safe 0 -i {} -c copy -loglevel quiet -y', | |
| *args, **kwargs): | |
| """ | |
| :rtype: subprocess.Popen | |
| """ | |
| files = sorted([ | |
| u"file '%s'" % os.path.abspath( | |
| file.replace("'", '')) | |
| for file in files | |
| ]) | |
| file_str = '\n'.join(files).encode(ENCODING) | |
| temp_file = tempfile.mktemp() | |
| with open(temp_file, 'w') as fp: | |
| fp.write(file_str) | |
| cmd = (base_cmd | |
| .format(temp_file).split(' ')) | |
| return subprocess.Popen(cmd + [output]) | |
| def add_arguments(parser, arguments=[], group_name=NAME): | |
| """ | |
| :rtype: argparse.Namespace | |
| """ | |
| for argument in arguments: | |
| _args = argument.get('args', []) | |
| del argument['args'] | |
| parser.add_argument(*_args, **argument) | |
| return parser | |
| def parse_args( | |
| argv, | |
| command_arguments=COMMAND_OPTIONS, | |
| parser=None, | |
| subparsers=None, | |
| ): | |
| """ | |
| :rtype: dict | |
| """ | |
| base_parser = ( | |
| parser | |
| if parser | |
| else argparse.ArgumentParser( | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| # formatter_class=argparse.RawDescriptionHelpFormatter, | |
| )) | |
| # base_parser.add_argument('arguments', nargs='*') | |
| subparsers = ( | |
| subparsers | |
| if subparsers | |
| else base_parser.add_subparsers()) | |
| for name, options in command_arguments.items(): | |
| if name == 'default': | |
| add_arguments(base_parser, options) | |
| else: | |
| command_subparser = subparsers.add_parser( | |
| name, | |
| # parents=[base_parser], | |
| # add_help=False, | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| add_arguments(command_subparser, options) | |
| options, unknown = base_parser.parse_known_args(argv) | |
| splited_args = itertools.izip( | |
| *[itertools.chain( | |
| unknown, | |
| itertools.repeat(None, 2 - 1))] * 2) | |
| values = dict(list(itertools.chain.from_iterable([ | |
| vars(options).items(), | |
| map(lambda i: (re.sub('^-*', '', i[0]), | |
| json.loads(i[1])), | |
| splited_args), | |
| ({} if sys.stdin.isatty() | |
| else json.loads(sys.stdin.read())).items() | |
| ]))) | |
| values = {k: (v.decode(ENCODING) if isinstance(v, str) else v) | |
| for k, v in values.items()} | |
| return values | |
| def main(argv, *args, **kwargs): | |
| """ | |
| :rtype: int | |
| """ | |
| kwargs.update(parse_args(argv[1:])) | |
| if kwargs.get('text_file'): | |
| with open(kwargs.get('text_file')) as fp: | |
| kwargs['text'] = fp.read().decode(kwargs.get('encoding')) | |
| for file in kwargs.get('input'): | |
| if file.endswith('.json'): | |
| with open(file) as fp: | |
| kwargs.update(json.load(fp)) | |
| if kwargs.get('command'): | |
| for command in kwargs.get('command'): | |
| exec(command) | |
| if kwargs.get('shell'): | |
| try: | |
| IPython.embed() | |
| except: | |
| code.interact(local=locals()) | |
| return 0 | |
| if __name__ == '__main__': | |
| sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment