Created
March 4, 2019 12:13
-
-
Save Elfsong/62dd0addecbc911693deddb33a3c3b91 to your computer and use it in GitHub Desktop.
pinyin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import ast | |
import clr | |
import json | |
import os | |
import re | |
from StoryTelling.tts import append_pronunciation | |
from StoryTelling.tts.tools import OnlineTts | |
from Speech.Cognitive.Services import TimeLine | |
def pinyin_transformer(pinyin): | |
pinyin_tone = { | |
'a': ["ā", "á", "ǎ", "à", "a"], | |
'o': ["ō", "ó", "ǒ", "ò", "o"], | |
'e': ["ē", "é", "ě", "è", "e"], | |
'i': ["ī", "í", "ǐ", "ì", "i"], | |
'u': ["ū", "ú", "ǔ", "ù", "u"], | |
'lv': ["lǖ", "lǘ", "lǚ", "lǜ", "lü"], | |
"nv": ["nǖ", "nǘ", "nǚ", "nǜ", "nü"] | |
} | |
for item in pinyin.split(" - "): | |
pinyin, tone = item.split(" ") | |
print(pinyin, tone) | |
aIdx = pinyin.find('a') | |
if aIdx != -1: | |
result = pinyin[:aIdx] + pinyin_tone['a'][int(tone) - 1] | |
if aIdx != len(pinyin) - 1: | |
result += pinyin[aIdx+1:] | |
print(result) | |
return result | |
oIdx = pinyin.find('o') | |
if oIdx != -1: | |
result = pinyin[:oIdx] + pinyin_tone['a'][int(tone) - 1] | |
if oIdx != len(pinyin) - 1: | |
result += pinyin[oIdx + 1:] | |
print(result) | |
return result | |
eIdx = pinyin.find('e') | |
if eIdx != -1: | |
result = pinyin[:eIdx] + pinyin_tone['a'][int(tone) - 1] | |
if eIdx != len(pinyin) - 1: | |
result += pinyin[eIdx+1:] | |
print(result) | |
return result | |
iIdx = pinyin.find('i') | |
uIdx = pinyin.find('u') | |
if ((iIdx != -1 && uIdx != -1 && uIdx > iIdx) || (uIdx != -1 & & iIdx == -1)) | |
{ | |
var | |
result = pinyin.Substring(0, uIdx) + pinyinWithTone['u'][tone - 1]; | |
if (uIdx != pinyin.Length - 1) | |
{ | |
result += pinyin.Substring(uIdx + 1, pinyin.Length - uIdx - 1); | |
} | |
return result; | |
} | |
else if ((iIdx != -1 & & uIdx != -1 & & uIdx < iIdx) | | (uIdx == -1 & & iIdx != -1)) | |
{ | |
var | |
result = pinyin.Substring(0, iIdx) + pinyinWithTone['i'][tone - 1]; | |
if (iIdx != pinyin.Length - 1) | |
{ | |
result += pinyin.Substring(iIdx + 1, pinyin.Length - iIdx - 1); | |
} | |
return result; | |
} | |
return "" | |
def get_pronunciation(sentence): | |
content = sentence | |
ttsserver = "https://sttruntime-customvoice-tts.cloudapp.net:443/synthesize/customvoice" | |
subscription = "575EA104-45C6-4264-831D-7B4FE35003D2" | |
prnc = OnlineTts.pronunciation(content, ttsserver, subscription) | |
sentence_structure = { | |
"Sentences": [ | |
{"Text": sentence} | |
] | |
} | |
pronunciation_story = append_pronunciation([sentence_structure], [content], [prnc]) | |
pronunciation = list() | |
for item in pronunciation_story[0]["Sentences"][0]["Pronunciation"]: | |
pronunciation += [ | |
{ | |
"item": item[0], | |
"index": [item[1], item[2]], | |
"pronunciation": pinyin_transformer(item[3]), | |
} | |
] | |
return pronunciation | |
def get_timeline_json(ssml, endpoint, subscription): | |
""" | |
通过CLR获取时间线 | |
:param ssml: 时间线生成脚本 | |
:param endpoint: 订阅点 | |
:param subscription: 订阅源 | |
:return: 时间线 | |
""" | |
timeline_string = TimeLine.PipeLineAsyncPython(ssml, endpoint, subscription).Result | |
timeline_json = json.loads(timeline_string) | |
return timeline_json | |
def cut_sentence(sentence): | |
""" | |
标点符号过滤 | |
:param sentence: 待过滤的句子 | |
:return: 过滤之后的句子 | |
""" | |
string = re.sub(r"[\s+\.\!\/_,$%^*(+\"\')]+|[+——()?【】\[\]1234567890“”!,,.。…:?、~@#¥%&*();;]+", "", sentence) | |
return string | |
def lcs(X, Y): | |
""" | |
最长公共子序列(用来对齐时间线和文本) | |
:param X: 序列一 | |
:param Y: 序列二 | |
:return: 最长公共子序列 | |
""" | |
m = len(X) | |
n = len(Y) | |
L = [[0 for x in range(n + 1)] for x in range(m + 1)] | |
for i in range(m + 1): | |
for j in range(n + 1): | |
if i == 0 or j == 0: | |
L[i][j] = 0 | |
elif X[i - 1] == Y[j - 1]: | |
L[i][j] = L[i - 1][j - 1] + 1 | |
else: | |
L[i][j] = max(L[i - 1][j], L[i][j - 1]) | |
index = L[m][n] | |
lcs = [""] * (index + 1) | |
lcs[index] = "" | |
i = m | |
j = n | |
mapping_list = list() | |
while i > 0 and j > 0: | |
if X[i - 1] == Y[j - 1]: | |
lcs[index - 1] = X[i - 1] | |
mapping_list += [{"element": Y[j - 1], "timeline_index":i - 1, "script_index":j - 1}] | |
i -= 1 | |
j -= 1 | |
index -= 1 | |
elif L[i - 1][j] > L[i][j - 1]: | |
i -= 1 | |
else: | |
j -= 1 | |
return mapping_list[::-1] | |
def is_chinese(uchar): | |
""" | |
判断是否是中文字符 | |
:param uchar: 待判断字符 | |
:return: 是否是中文字符 | |
""" | |
if uchar >= '\u4e00' and uchar <= '\u9fff': | |
return True | |
else: | |
return False | |
def get_location_mark_index(sentence): | |
""" | |
获取句子的起始结束位置索引 | |
:param sentence: 句子 | |
:return: 起始结束位置索引 | |
""" | |
start_index = 0 | |
end_index = 0 | |
flag = True | |
for index, character in enumerate(sentence): | |
if is_chinese(character) and flag: | |
start_index = index | |
flag = False | |
elif is_chinese(character) and not flag: | |
end_index = index | |
return start_index, end_index | |
def get_timeline_index(script_index, mapping_list): | |
""" | |
获取时间线索引 | |
:param script_index: 文本索引 | |
:param mapping_list: 时间线-文本映射表 | |
:return: 时间线索引 | |
""" | |
for item in mapping_list: | |
if item["script_index"] >= script_index: | |
return item["timeline_index"] | |
return mapping_list[-1]["timeline_index"] | |
def get_appear_time(dialogue_list, name_list): | |
""" | |
获取人物出现时间(如果没有识别到出现时间,默认为全场景出现) | |
:param dialogue_list: 当前场景的会话列表 | |
:param name_list: 人物姓名+人物昵称 | |
:return: 出现时间 | |
""" | |
for sentence in dialogue_list: | |
for character in name_list: | |
if character in sentence["text"]: | |
return sentence["start"] | |
return dialogue_list[0]["start"] | |
def merge(story_name, timejson, schema): | |
""" | |
合并对齐时间线以及文本内容 | |
:param story_name: 故事名称 | |
:param timejson: 时间线 | |
:param schema: 文本内容 | |
:return: 合并Schema | |
""" | |
schema = json.loads(schema) | |
story = [] | |
for scenario in schema: | |
for sentence in scenario["content"]: | |
story += [sentence["dialogue"]] | |
storyString = "".join(story) | |
timeJsonString = "".join([item["element"] for item in timejson]) | |
mapping_list = lcs(timeJsonString[:-20], storyString) | |
# print(storyString) | |
# print(timejson) | |
# print(mapping_list) | |
scenario_list = list() | |
# 开场 Scenario | |
init_scenario = dict() | |
init_scenario["scene_no"] = 0 | |
init_scenario["scene_background"] = "bedroom" | |
init_scenario["scene_weather"] = "default" | |
init_scenario["scene_time"] = "default" | |
init_scenario["start"] = 0.0 | |
init_scenario["end"] = float(timejson[mapping_list[0]["timeline_index"] - 3]["endTime"]) | |
init_scenario["element_list"] = [ | |
{ | |
"id": "xiaoice_1", | |
"category": "xiaoice", | |
"name": "xiaoice", | |
"scale": [0.6, 0.6], | |
"rotation": 0, | |
"start": 0, | |
"age": "child", | |
"gender": 1, | |
"role": "protagonist", | |
"status": "normal", | |
"type": "character", | |
"end": init_scenario["end"] | |
} | |
] | |
init_scenario["dialog_list"] = [ | |
{ | |
"text": "小朋友 你好呀", | |
"pronunciation": get_pronunciation("小朋友 你好呀"), | |
"speaker": "xiaoice_1", | |
"start": 200, | |
"end": 652.57, | |
"type": "talk" | |
}, | |
{ | |
"text": "我是你的好朋友小冰姐姐", | |
"pronunciation": get_pronunciation("我是你的好朋友小冰姐姐"), | |
"speaker": "xiaoice_1", | |
"start": 652.57, | |
"end": 1215.54, | |
"type": "talk" | |
}, | |
{ | |
"text": "今天我给你准备了一个有趣的故事", | |
"pronunciation": get_pronunciation("今天我给你准备了一个有趣的故事"), | |
"speaker": "xiaoice_1", | |
"start": 1215.54, | |
"end": 1938.98, | |
"type": "talk" | |
}, | |
{ | |
"text": "名字叫做" + story_name, | |
"pronunciation": get_pronunciation("名字叫做" + story_name), | |
"speaker": "xiaoice_1", | |
"start": 1938.98, | |
"end": init_scenario["end"], | |
"type": "talk" | |
} | |
] | |
scenario_list += [init_scenario] | |
# 常规 Scenario | |
original_index = 0 | |
for index, scenario in enumerate(schema): | |
temp_scenario = dict() | |
# scenario content | |
temp_scenario["dialogue_list"] = list() | |
for sentence in scenario["content"]: | |
original_sentence = sentence["dialogue"] | |
start_index, end_index = get_location_mark_index(original_sentence) | |
abs_start_index = original_index + start_index | |
abs_end_index = original_index + end_index | |
original_index += len(original_sentence) | |
abs_timeline_start_index = get_timeline_index(abs_start_index, mapping_list) | |
abs_timeline_end_index = get_timeline_index(abs_end_index, mapping_list) | |
temp_sentence = { | |
"text": sentence["dialogue"], | |
"type": sentence["type"], | |
"start": timejson[abs_timeline_start_index]["startTime"], | |
"end": timejson[abs_timeline_end_index]["endTime"], | |
"speaker": sentence["speaker_id"], | |
"pronunciation": get_pronunciation(sentence["dialogue"]), | |
} | |
temp_scenario["dialogue_list"] += [temp_sentence] | |
# scenario info | |
temp_scenario["scene_no"] = index + 1 | |
temp_scenario["scene_background"] = scenario["background"]["category"] | |
temp_scenario["scene_weather"] = scenario["weather"] | |
temp_scenario["scene_time"] = scenario["time"] | |
temp_scenario["start"] = float(temp_scenario["dialogue_list"][0]["start"]) | |
temp_scenario["end"] = float(temp_scenario["dialogue_list"][-1]["end"]) | |
# scenario character | |
temp_scenario["element_list"] = list() | |
for index, item in enumerate(scenario["elements"]): | |
temp_character = dict() | |
temp_character["id"] = item["id"] | |
temp_character["category"] = item["name"] | |
temp_character["age"] = item["age"] | |
temp_character["priority"] = item["priority"] | |
temp_character["role"] = item["role"] | |
temp_character["status"] = item["status"] | |
temp_character["type"] = item["type"] | |
temp_character["alias"] = item["alias"] | |
temp_character["gender"] = item["gender"] | |
temp_character["file_name"] = item["name"] | |
temp_character["scale"] = item["scale"] | |
temp_character["flip"] = [0 if index % 2 == 0 else 1, 0] | |
temp_character["rotation"] = item["rotation"] | |
temp_character["start"] = get_appear_time(temp_scenario["dialogue_list"], [item["name"]] + item["alias"]) | |
temp_character["end"] = temp_scenario["end"] | |
temp_scenario["element_list"] += [temp_character] | |
# Add scenario to list | |
scenario_list += [temp_scenario] | |
# 片尾 Scenario | |
tail_scenario = dict() | |
tail_scenario["scene_no"] = len(scenario_list) | |
tail_scenario["scene_background"] = "bedroom" | |
tail_scenario["scene_weather"] = "default" | |
tail_scenario["scene_time"] = "default" | |
tail_scenario["start"] = scenario_list[-1]["end"] + 400 | |
tail_scenario["end"] = float(timejson[-1]["endTime"]) | |
tail_scenario["element_list"] = [ | |
{ | |
"id": "xiaoice_1", | |
"category": "xiaoice", | |
"name": "xiaoice", | |
"scale": [0.6, 0.6], | |
"rotation": 0, | |
"start": tail_scenario["start"], | |
"age": "child", | |
"gender": 1, | |
"role": "protagonist", | |
"status": "normal", | |
"type": "character", | |
"end": tail_scenario["end"] | |
} | |
] | |
tail_scenario["dialog_list"] = [ | |
{ | |
"text": "小朋友 今天的故事 讲完了 你喜欢听吗", | |
"speaker": "xiaoice_1", | |
"start": tail_scenario["start"], | |
"end": float(tail_scenario["start"]) + 1072, | |
"type": "talk", | |
"pronunciation": get_pronunciation("小朋友 今天的故事 讲完了 你喜欢听吗") | |
}, | |
{ | |
"text": "小冰姐姐就陪你到这里啦 白白", | |
"speaker": "xiaoice_1", | |
"start": float(tail_scenario["start"]) + 1072, | |
"end": float(tail_scenario["start"]) + 1965, | |
"type": "talk", | |
"pronunciation": get_pronunciation("小冰姐姐就陪你到这里啦 白白") | |
} | |
] | |
scenario_list += [tail_scenario] | |
return scenario_list | |
def process(story_name, ssml, schema, endpoint, subscription): | |
try: | |
timejson = get_timeline_json(ssml, endpoint, subscription) | |
new_schema = merge(story_name, timejson, schema) | |
with open("../../result-json/final_schema/" + story_name, "w", encoding="utf-8") as fd: | |
fd.write(json.dumps(new_schema, ensure_ascii=False)) | |
print("Written Successfully!") | |
return new_schema | |
except Exception as error: | |
print(error) | |
return "" | |
if __name__ == "__main__": | |
script_repos = r"D:\Project-ice\KidsStory\StoryTelling\result-json\scenario_schema" | |
# ssml_repos = r"D:\Project-ice\KidsStory\StoryTelling\result-ssml" | |
ssml_repos = r"C:\Users\t-midu\PycharmProjects\Scenario\playground\ssml" | |
for story in os.listdir(ssml_repos): | |
story_name = story.split(".")[0] | |
print(story_name) | |
if story_name: | |
ssml_name = "\\" + story_name + ".ssml.txt" | |
script_name = "\\" + story_name + ".txt" | |
try: | |
with open(script_repos + script_name, encoding="utf-8") as script_fd: | |
schema = script_fd.read() | |
with open(ssml_repos + ssml_name, encoding="utf-8") as ssml_fd: | |
ssml = ssml_fd.read() | |
except FileNotFoundError: | |
print("File not found!") | |
continue | |
except UnicodeDecodeError: | |
print("Unicode decode error!") | |
continue | |
endpoint = "http://kidstory-tts-0.cloudapp.net:81/synthesize/customvoice" | |
subscription = "575EA104-45C6-4264-831D-7B4FE35003D2" | |
new_schema = process(story_name, ssml, schema, endpoint, subscription) | |
print(new_schema) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment