Skip to content

Instantly share code, notes, and snippets.

@junmakii
Created July 19, 2017 08:47
Show Gist options
  • Select an option

  • Save junmakii/864aa7d9207923061e5a614ac9cb5f9e to your computer and use it in GitHub Desktop.

Select an option

Save junmakii/864aa7d9207923061e5a614ac9cb5f9e to your computer and use it in GitHub Desktop.
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Jun Makii
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""A utility for Amazon Polly.
Examples
$ python aws_polly_utils.py cmd -c 'get_speech(**kwargs)' -t 'Hello World' --VoiceId Emma
$ python aws_polly_utils.py cmd -c 'get_speech(**kwargs)' -f /tmp/text.txt --VoiceId Mizuki
``example.py``::
aws_polly_utils.get_speech(**dict(
aws_polly_utils.parse_args(['cmd']),
output='out.mp3',
output_dir='/tmp',
text='Hello World'))
"""
import os
import sys
import itertools
import functools
import argparse
import csv
import subprocess
import tempfile
import boto3
import json
import re
import logging
import nltk.tokenize
import code
import pprint
import moviepy
import logging
import jmespath
logger = logging.getLogger(
os.path.splitext(os.path.basename(__file__))[0])
try:
import IPython
except:
pass
__author__ = "Jun Makii"
__copyright__ = "Copyright (C) 2017 Jun Makii"
__license__ = "GNU General Public License v3"
__version__ = "1.0"
ENCODING = 'utf-8'
NAME = os.path.splitext(os.path.basename(__file__))[0].decode(ENCODING)
COMMAND_OPTIONS = {
"default": [
],
"cmd": [
{
"args": ['-t', '--text']
},
{
"args": ['-o', '--output'],
"default": 'out.mp3',
},
{
"args": ['-e', '--encoding'],
"default": 'utf-8',
},
{
"args": ['-f', '--text_file']
},
{
"args": ["-c", "--command"],
"default": [],
"help": "_",
"action": "append"
},
{
"args": ['-d', '--debug'],
"action": "store_true",
"help": "_"
},
{
"args": ['-i', '--input'],
"action": "append",
"default": [],
"help": "_"
},
{
"args": ['-s', '--shell'],
"action": "store_true",
"help": "_"
},
{
"args": ["--aws_access_key_file"],
"help": '_',
"default": '/home/junmakii/workspace/texts/aws-accessKeys.csv',
},
{
"args": ['-V', '--VoiceId'],
"help": '_',
"default": 'Emma',
},
]
}
logger = logging.getLogger(
NAME
) # type: logging.Logger
def get_client(
aws_access_key_id=None,
aws_secret_access_key=None,
aws_access_key_file=None,
*args, **kwargs):
"""
:rtype: botocore.client.Polly
"""
if aws_access_key_file:
with open(aws_access_key_file) as fp:
aws_access_key_id, aws_secret_access_key = list(csv.reader(fp))[1]
return boto3.client(
'polly',
endpoint_url='https://polly.us-west-2.amazonaws.com',
region_name='us-west-2',
api_version='2016-06-10',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
def split_to_sentences(text, max_length=20):
sentences = nltk.sent_tokenize(text)
a = []
b = []
for sentence in sentences:
length = len(reduce(
lambda i, m:
i + m,
a + [sentence]))
if length > max_length and len(a):
b.append(' '.join(a))
a = []
a.append(sentence)
b.append(' '.join(a))
return b
def split_to_words(text, max_length=20):
try:
tokens = nltk.tokenize.word_tokenize(text)
except Exception as err:
print(text)
raise err
a = []
b = []
for token in tokens:
length = len(reduce(
lambda i, m:
i + ' ' + m,
a + [token]))
if length > max_length:
b.append(' '.join(a))
a = []
a.append(token)
b.append(' '.join(a))
return b
def split_text(
text,
max_length=3000, *args, **kwargs):
sentences = split_to_sentences(text, max_length=max_length)
words = map(lambda i: split_to_words(i, max_length=max_length),
sentences)
splited_sentences = list(itertools.chain.from_iterable(words))
return splited_sentences
def speech(
client, Text, VoiceId='Emma',
OutputFormat='mp3', # ogg_vorbis, json, mp3, pcm
*args, **kwargs):
"""
:rtype: dict
"""
return client.synthesize_speech(
VoiceId=VoiceId,
Text=Text,
OutputFormat=OutputFormat)
def get_voice_by_sentences(
client,
sentences=[],
output_dir='/tmp',
encoding=ENCODING,
*args, **kwargs):
"""
:rtype: Union[str]
"""
client = get_client()
counter = itertools.count(1)
paths = []
for sentence in sentences:
prefix = u"{0:08d}".format((next(counter))) + u'-'
path = (
prefix
+ sentence[:100]
.replace(u"'", u"")
.replace(u'/', u'~') + u'.mp3')
path = os.path.join(
output_dir,
(path if path else 'None'))
voice = speech(client, sentence, **kwargs)
stream = voice['AudioStream']
data = stream.read()
with open(path.encode(ENCODING), 'wb') as fp:
fp.write(data)
stream.close()
logger.info(path)
paths.append(path)
return paths
def get_speech(text, output='out.mp3', output_dir='/tmp', *args, **kwargs):
sentences = split_text(text)
client = get_client()
paths = [
os.path.abspath(i)
for i in get_voice_by_sentences(
client, sentences, output_dir, **kwargs)
]
concatenate_audio(paths, output)
def concatenate_audio(
files, output='out.mp3',
base_cmd=u'ffmpeg -f concat -safe 0 -i {} -c copy -loglevel quiet -y',
*args, **kwargs):
"""
:rtype: subprocess.Popen
"""
files = sorted([
u"file '%s'" % os.path.abspath(
file.replace("'", ''))
for file in files
])
file_str = '\n'.join(files).encode(ENCODING)
temp_file = tempfile.mktemp()
with open(temp_file, 'w') as fp:
fp.write(file_str)
cmd = (base_cmd
.format(temp_file).split(' '))
return subprocess.Popen(cmd + [output])
def add_arguments(parser, arguments=[], group_name=NAME):
"""
:rtype: argparse.Namespace
"""
for argument in arguments:
_args = argument.get('args', [])
del argument['args']
parser.add_argument(*_args, **argument)
return parser
def parse_args(
argv,
command_arguments=COMMAND_OPTIONS,
parser=None,
subparsers=None,
):
"""
:rtype: dict
"""
base_parser = (
parser
if parser
else argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# formatter_class=argparse.RawDescriptionHelpFormatter,
))
# base_parser.add_argument('arguments', nargs='*')
subparsers = (
subparsers
if subparsers
else base_parser.add_subparsers())
for name, options in command_arguments.items():
if name == 'default':
add_arguments(base_parser, options)
else:
command_subparser = subparsers.add_parser(
name,
# parents=[base_parser],
# add_help=False,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
add_arguments(command_subparser, options)
options, unknown = base_parser.parse_known_args(argv)
splited_args = itertools.izip(
*[itertools.chain(
unknown,
itertools.repeat(None, 2 - 1))] * 2)
values = dict(list(itertools.chain.from_iterable([
vars(options).items(),
map(lambda i: (re.sub('^-*', '', i[0]),
json.loads(i[1])),
splited_args),
({} if sys.stdin.isatty()
else json.loads(sys.stdin.read())).items()
])))
values = {k: (v.decode(ENCODING) if isinstance(v, str) else v)
for k, v in values.items()}
return values
def main(argv, *args, **kwargs):
"""
:rtype: int
"""
kwargs.update(parse_args(argv[1:]))
if kwargs.get('text_file'):
with open(kwargs.get('text_file')) as fp:
kwargs['text'] = fp.read().decode(kwargs.get('encoding'))
for file in kwargs.get('input'):
if file.endswith('.json'):
with open(file) as fp:
kwargs.update(json.load(fp))
if kwargs.get('command'):
for command in kwargs.get('command'):
exec(command)
if kwargs.get('shell'):
try:
IPython.embed()
except:
code.interact(local=locals())
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment