Skip to content

Instantly share code, notes, and snippets.

@nwizugbesamson
Last active November 24, 2022 08:13
Show Gist options
  • Save nwizugbesamson/3eb750fc9e1319e2603b36f08ac8ed45 to your computer and use it in GitHub Desktop.
Save nwizugbesamson/3eb750fc9e1319e2603b36f08ac8ed45 to your computer and use it in GitHub Desktop.
import cv2
import os
import sys
from pathlib import Path
from uuid import uuid4
from .image_generator import generate_image
import itertools
import time
FRAMES = 24
# def _generate_state_sequence(img_path: Path, state: str) -> list[Path]:
# """load the path to 24 images that make up a one second
# animation or default state for selected avatar
# @author: anonnoone
# Args:
# img_path (Path): pathlib.Path object to selected avatar directory
# Returns:
# list[Path]: sorted list of animation or default sequence paths
# >>>>: generate_speech_seqeunce(path/to/avatar_01, state="speech")
# >>>>: [path/to/avatar_o1/state_01, ..., path/to/avatar_o1/state_24]
# "em": [1, 2, 3, 4, 5, 6, 7] -> 100 msecs
# "em": 500msecs -> [01,02, 3, 3, 4, 4, 5, 5 , 06, 07]
# """
# if state == "speech":
# dir_files = [str(file.path) for file in os.scandir(img_path / "animation")]
# return sorted(dir_files, key= lambda x: x.split('_')[1])
# elif state == "silence":
# return [img_path / "default.png" for _ in range(FRAMES)]
def generate_animation(
data: dict[str, list[str]],
bg_path: Path, num_speakers: int,
avatar_dict: dict[str, str],
data_dir: Path) -> Path:
"""
create animation using provided avatars and background
using sequence generated from audio file
@author: anonnoone
Args:
data (dict[str, list[str]]): speakers in audio and
their action/state per second list
bg_path (Path): pathlib.Path object path to animation background
num_speakers (int): number of speakers in audio file
avatar_dict (dict[str, str]): speaker to selected avatar map
data_dir (Path): pathlib.Path object Path to application data
Returns:
Path: path to generated animation
>>>>: generate_animation(
{"A": ["speech", "sequence"...], "B": ["speech", "sequence"...]},
path/to/background_08
2,
{"A": path/to/avatar_01, "B":[path/to/avatar_05]}
)
>>>>: DATA_DIR/temp/73hr- df44-ctr4-ct4t.mp4
"""
images = []
img_paths = []
output = data_dir / f'temp/{str(uuid4())}.mp4'
##
for speaker in data:
avatar_path = avatar_dict[speaker]
anm_seq = [avatar_path / f"mouths/{state}" for state in data[speaker][:600]]
img_paths.append(anm_seq)
# ##
# for speaker in data:
# avatar_path = avatar_dict[speaker]
# anm_seq = [_generate_state_sequence(avatar_path, state=state) for state in data[speaker][:600]]
# img_paths.append(list(itertools.chain.from_iterable(anm_seq)))
print("Start Image Build")
start_path = time.time()
if num_speakers == 2:
count = 1
for img_1, img_2 in zip(*img_paths):
state_images = [img_1, img_2]
avatar_images = [path for path in avatar_dict.values()]
images.append(
generate_image(state_images, avatar_images, bg_path)
)
count += 1
elif num_speakers ==3:
count = 1
for img_1, img_2, img_3 in zip(*img_paths):
state_images = [img_1, img_2, img_3]
avatar_images = [path for path in avatar_dict.values()]
images.append(
generate_image(state_images, avatar_images, bg_path)
)
count += 1
elif num_speakers ==4:
count = 1
for img_1, img_2, img_3, img_4 in zip(*img_paths):
state_images = [img_1, img_2, img_3, img_4]
avatar_images = [path for path in avatar_dict.values()]
images.append(
generate_image(state_images, avatar_images, bg_path)
)
count += 1
print(f"IMage Build: [{time.time()-start_path}]")
print(sys.getsizeof(images), sys.getsizeof(images[0]))
frame_one = images[0]
# return
print(len(images))
height, width, _ = frame_one.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Be sure to use lower case
out = cv2.VideoWriter(str(output.absolute()), fourcc, 24.0, (width, height))
print("VIDEO WRITER START")
start_write = time.time()
for image in images:
out.write(image) # Write out frame to video
# cv2.imshow('video',frame)
if (cv2.waitKey(1) & 0xFF) == ord('q'): # Hit `q` to exit
break
print(f"END WRITING: [{time.time() - start_write}]")
# Release everything if job is finished
out.release()
cv2.destroyAllWindows()
return output
import os
import requests
import time
from itertools import islice
import collections
API_KEY = "22a6f3ca548c414f819ba4de3247feac" #str(os.getenv("ASSEMBLYAI"))
# print(API_KEY)
def diarize_audio(audio):
"""
using assembly ai we send an audio url and obtain json data for the audio file
@author : cchimdindu
Args:
audio (string): url of video
Returns:
lis: json file is sorted out and returned in list format
"""
endpoint1 = "https://api.assemblyai.com/v2/transcript"
# json1 = {
# "audio_url": audio,
# "speaker_labels": True,
# "sentiment_analysis": True,
# "disfluencies": True #transcribe filler words
# }
# headers1 = {
# "authorization": API_KEY,
# "content-type": "application/json",
# }
# response1 = requests.post(endpoint1, json=json1, headers=headers1)
# first = response1.json()
# print(first["id"])
# second = first["id"]
# rxym41rlo2-2606-4354-ae8f-095ccdf58181
# endpoint_result = "https://api.assemblyai.com/v2/transcript/" + second
endpoint_result = "https://api.assemblyai.com/v2/transcript/" + "rxym41rlo2-2606-4354-ae8f-095ccdf58181"
headers2 = {
"authorization": API_KEY,
}
# print(first)
# response2= requests.get(endpoint_result, headers=headers2)
# a = response2.json()
process_done = False
while not process_done:
response2= requests.get(endpoint_result, headers=headers2)
a = response2.json()
status = a["status"]
if status != "completed":
print(f"Processing Audio, Status: [{status}]")
time.sleep(20)
else:
process_done = True
#maps words to timestamp only
# endpointVTT = "https://api.assemblyai.com/v2/transcript/" + second + "/vtt"
listout = {
"text":a["text"],
"utterances": a["utterances"],
"words": a["words"],
"audio_duration": int(a["audio_duration"])
}
# ress = requests.get(endpointVTT, headers=headers2)
# b = ress.text #just time and speech
return listout
def checking(audio1):
"""taking data from audio transcript and turns into dictionary
puts all utterances into a list for easy adapataion
@author: cchimdindu
Args:
audio1 (string): url used to call diariza_audio function
Returns:
List, int: list of utterances formatted each time person speaks, length of video
"""
audiotexts = []
dataneed = diarize_audio(audio1)
print(dataneed)
transcription = dataneed["text"]
diarization = dataneed["words"]
audiolength = int(dataneed["audio_duration"]/1000)
#time start and end in millieseconds -> convert to seconds
speakersvale = []
count = 0
for a in diarization:
data = a
speaker = data["speaker"]
starttime = int(data["start"]/1000) #convert to seconds
endtime = int(data["end"]/1000) #convert to seconds
speech = data["text"]
if starttime == endtime:
duraction = [starttime]
else:
duraction = list(range(starttime,endtime+1))
text = {
"speaker" : speaker,
"duration" : duraction,
"speech" : speech,
"index" : count,
}
print(speaker, duraction)
count += 1
audiotexts.append(text)
speakersvale.append(speaker)
return audiotexts, audiolength
def chunkgeneratory2(iterable, chunk_size):
"""takes an iterable and makes it a nested list of size needed
@author: cchimdindu
Args:
iterable (list,dict,tuple,set): anything that can be looped
chunk_size (int): size you want iterable split into
Returns:
nested list: [[..],...,[..]]
"""
return [iterable[x:x + chunk_size] for x in range(0, len(iterable), chunk_size)]
def chunkgeneratory1(iterable, chunk_size):
"""takes an iterable and makes it a nested list of size needed
@author: cchimdindu
Args:
iterable (list,tuple,sets,dict): anything that can be looped
chunk_size (int): size you want iterable split into
Yields:
list: [..] multiple yields for each time yeild is called
"""
imagesList = iter(iterable)
chunk = list(islice(imagesList, chunk_size)) #n is steps iterable is sliced
while chunk:
yield chunk
chunk = list(islice(imagesList, chunk_size))
def whoistalking(audiotexts, timetotal):
"""checks who is speaking when
@author: cchimdindu
Args:
audiotexts (list): contains each instance a person talks, in turns
timetotal (int): length of audio
Yields:
string,string: person speaking name, true or false for person speaking
"""
# it = chunkgeneratory1(timetotal, 1)
for dict_item in audiotexts:
newdur = dict_item["duration"]
it = chunkgeneratory1(timetotal, 1)
it2 = chunkgeneratory1(newdur, 1)
# for yawn, noyawn in zip(it, it2):
while it:
try:
dur = next(it)
dur2 = next(it2)
except StopIteration:
break
if collections.Counter(dur2) == collections.Counter(dur):
valt = "speech"
else:
valt = "silence"
yield dict_item["speaker"],valt
def convertdict(audiotexts, audiolength): #returns speaker sequence
"""converts tuple to dict
@author: cchimdindu
Args:
audiotexts (list): list of each persons turn in speaking
audiolength (int): length of audio file
Returns:
dict: [
{'A':'speech','silent'},
{'B':'silent','speech'}
]
"""
isIT ={}
speechsequeen = []
timetotal = list(range(0,audiolength+1))
donemaybe = whoistalking(audiotexts, timetotal)
while True:
try:
letter = next(donemaybe)
except StopIteration:
break
speechsequeen.append(letter)
for x,y in speechsequeen:
isIT.setdefault(x,[]).append(y)
print(isIT) #for cchimdindu testing
return isIT
DATA_SCHEMA = {
"closed": [""],
"sczshch": ["JH", "Z", "ZH", "CH", "SH", "S", "T", "N", "NX", "NG"],
"k": ["K"],
"ah" : ["AE", "AW", "AX", "AXR", "AA" ],
"i" : ["Y", "EY", "AY"],
"ee" : ["IH", "IX", "IY"],
"enwh" : ["ER", "EH", "EN", "WH"],
"mbp" : ["M", "B", "P"],
"fv": ["F", "V"],
"oh" : ["UH", "OY", "H", "AH", "AO"],
"uoo" : ["UW", "OW", "UH", "UX"],
"wr" : ["W", "H", "HH"],
"lth" : ["TH", "DH", "L", "D", "DX", "R"]
}
STATE_MAP = {
"JH": "sczshch","Z": "sczshch","ZH": "sczshch","CH": "sczshch",
"SH": "sczshch","S": "sczshch","T": "sczshch","N": "sczshch","NX": "sczshch","NG": "sczshch",
"K": "k",
"AE": "ah", "AW": "ah", "AX": "ah", "AXR": "ah", "AA": "ah",
"Y": "i", "EY": "i", "AY": "i",
"IH": "ee", "IX": "ee", "IY": "ee",
"ER": "enwh", "EH": "enwh", "EN": "enwh", "WH": "enwh",
"M": "mbp", "B": "mbp", "P": "mbp",
"F": "fv", "V": "fv",
"UH": "oh", "OY": "oh", "H": "oh", "AH": "oh", "AO": "oh",
"UW": "uoo", "OW": "uoo", "UH": "uoo", "UX": "uoo",
"W": "wr", "H": "wr", "HH": "wr",
"TH": "lth", "DH": "lth", "L": "lth", "D": "lth", "DX": "lth", "R": "lth"
}
from pathlib import Path
from PIL import Image #, ImageDraw, ImageFont
import cv2 as cv
import numpy as np
def generate_image(state_images: list, avatar_images:list, bg_path: Path) -> np.ndarray:
"""uses images in a list and background provided to generate a video sequence with help of pillow
@author : samson6398
Args:
images (list): avatar images
bg_path (Path): background image
Returns:
array: sequence of images in an array
"""
background_image = Image.open(bg_path)
background_image = background_image.convert(mode='RGBA')
width, length = background_image.size
canvas = Image.new(mode='RGBA', size=(width, length), color=(255, 255, 255))
canvas.paste(im=background_image, box=(0,0))
for state_path, avatar_path in zip(state_images, avatar_images):
speaker_avatar = Image.open(avatar_path)
speaker_avatar = speaker_avatar.convert('RGBA')
canvas = Image.alpha_composite(canvas, speaker_avatar)
speaker_state = Image.open(state_path)
speaker_state = speaker_state.convert('RGBA')
canvas = Image.alpha_composite(canvas, speaker_state)
numpy_img = np.array(canvas)
cv2_image = cv.cvtColor(numpy_img, cv.COLOR_RGB2BGR)
cv2_image
return cv2_image
""" Audio animator main script,
This project serves as a schema for the hng9
TEAM CLUTCH podcast animator projects
this script contains the program workflow
"""
## DATA IMPORTS
import os
from sys import argv
import json
from pathlib import Path
from uuid import uuid4
from dotenv import load_dotenv
import time
from components.parser_two import generate_sequence
from components.animator import generate_animation
from moviepy.editor import VideoFileClip, AudioFileClip
## move all paths to config.py in package root directory
## APPLICATION ROOT DIRECTOR
ROOT_DIR = Path(__file__).parent.parent.parent.parent.resolve()
DATA_DIR = ROOT_DIR / "data"
AVATAR_DIR = DATA_DIR / "Image/avatars"
BG_DIR = DATA_DIR / "Image/backgrounds"
DOTENV_PATH = ROOT_DIR / "src/podcast_animator/env/.env"
## load environment variable
# print(DOTENV_PATH)
if DOTENV_PATH.exists():
load_dotenv(DOTENV_PATH)
def get_path(directory: Path, _id: str, is_folder: bool=False):
""" generate posix path object from project data directory
of images and backgrounds
@author: anonnoone
Args:
directory (Path): pathlib.Path object of directory to locate file
or directory within
e.g: data/Image/avatars -> path
_id (str): id of chosen avatar
is_folder (bool, optional): locate dir path or file path. Defaults to False.
Returns:
_type_: dir or file path
"""
for file in os.scandir(directory):
if is_folder:
if file.is_dir() and str(file.name).endswith(_id):
return directory / f"{file.name}"
else:
name, ext = str(file.name).split('.')
if file.is_file() and str(name).endswith(_id):
return directory / f"{name}.{ext}"
def animate( metadata_path :str) -> None:
"""
generate animated video from audio, using input metadata
@author: anonnoone
Args:
metadata_path (str): path to json file containing all information
required for animation
Returns:
str: path to generated animation
"""
## create unique output name
output_path = DATA_DIR / f"Result/{str(uuid4())}.mp4"
## load metadata json provided
with open(metadata_path) as data_file:
metadata_obj = json.load(data_file)
audio_url: str = metadata_obj["audio_url"]
audio_path: str = metadata_obj["audio_path"]
avatar_map: dict = metadata_obj["avatar_map"]
bg_id: str = metadata_obj["bg_path"]
num_speakers = len(avatar_map)
bg_path = get_path(BG_DIR, bg_id, is_folder=False)
avatar_paths = {avatar: get_path(AVATAR_DIR, value, is_folder=True
) for avatar, value in avatar_map.items()}
## generate animation sequence
animation_sequence = generate_sequence(audio_url)
# print(animation_sequence)
## animate to return path to animation
animation_path = generate_animation(
animation_sequence,
bg_path,
num_speakers, avatar_paths, DATA_DIR)
# return
## add audio to generated animation
videoclip = VideoFileClip(str(animation_path))
audioclip = AudioFileClip(str(audio_path))
print("About to set audio clip")
video = videoclip.set_audio(audioclip)
print("Audio clip set")
video.write_videofile(str(output_path))
print(f'YOUR VIDEO HAS BEEN SAVED TO: [{output_path}]')
## delete temporary animation
os.remove(animation_path)
if __name__=='__main__':
start = time.time()
metadata_path = str(argv[1])
animate(metadata_path)
print(f'RUNTIME: [{time.time() - start}]')
import re
from itertools import chain
import pronouncing
from g2p_en import G2p
import numpy as np
from .speech import Speech
from .data_schemer import DATA_SCHEMA, STATE_MAP
from podcast_animator.analysis.assembly_analyser import diarize_audio
def split(duration, chunks):
k, m = divmod(len(duration), chunks)
return (duration[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(chunks))
g2p = G2p()
def generate_sequence(url: str):
"""generates mapped dictionary of speaker's state
@author: JustAkiniyi
Args:
url (str): http url to dowloadable audio file
Returns:
dict[str, str]:
dictionary containing action/state of all speakers per sec
speakers are labeled alphabetically, A - Z
e.g
>>>> diarize_audio('http://bit.ly/1e4')
>>>> {
"A": ['speech', 'speech', 'silence'...],
"B": ['speech', 'silence', 'silence'...],
...
}
"""
dataneed = diarize_audio(url)
# transcription = dataneed["text"]
diarization = dataneed["words"]
audiolength = int(dataneed["audio_duration"])
audio_data = []
for data in diarization:
phrase = Speech(
speaker=data["speaker"],
start = data["start"],
stop = data["end"],
text=data["text"],
index = diarization.index(data)
)
audio_data.append(phrase)
sequence = _speakers_sequence(audio_data, audiolength)
return sequence
def _speakers_sequence(
audio_data: list[Speech], audiolength: int
) -> dict[str, list]:
""" Parse list speech objects from audio diarization into
dictionary mapping speakers to action/state by seconds
@author: JustAkiniyi
Args:
audio_data (list[Speech]): list comprised of Speech objects from speaker diarization
[Speech(), Speech() ...]
audiolength (int): length of diarized audio(secs)
Returns:
dict[str, list]:
>>>> _speaker_sequence(audio_data, audiolength)
>>>> {
"A": ['speech', 'speech', 'silence'...],
"B": ['speech', 'silence', 'silence'...],
...
}
"""
## split speech list into individual speakers using a dictionary
import time
start = time.time()
speaker_sequence = {}
for data in audio_data:
if data.speaker not in speaker_sequence:
if len(data.text.split(' ')) >1:
wrds = data.text.split(' ')
durs = split(data.duration, len(wrds))
res = []
for wrd, dur in zip(wrds, durs):
res.extend(extract_phoneme(wrd, dur))
speaker_sequence[data.speaker] = res
else:
speaker_sequence[data.speaker]=[extract_phoneme(data.text, data.duration)]
else:
if len(data.text.split(' ')) >1:
wrds = data.text.split(' ')
durs = split(data.duration, len(wrds))
# print(wrds, list(durs))
res = []
for wrd, dur in zip(wrds, durs):
res.extend(extract_phoneme(wrd.strip(), dur))
speaker_sequence[data.speaker].append(res)
else:
speaker_sequence[data.speaker].append(list(extract_phoneme(data.text, data.duration)))
print(time.time() - start)
speaking_moments = {}
for each_speaker in speaker_sequence:
# print(speaker_sequence[each_speaker])
flattened_list = list(chain.from_iterable(speaker_sequence[each_speaker]))
print(flattened_list[:50])
result = []
for i in range(0, (audiolength + 1) * 1000 , 42):
print(i)
stp_index = next((index for index, wrd in enumerate(flattened_list) if wrd[0] == i), None)
# print(stp_index)
if stp_index is not None:
stamp_index = next((index for index, wrd in enumerate(flattened_list) if wrd[0] == i), None)
# stamp_index = flattened_list.index(i)
if flattened_list[stamp_index][1] in STATE_MAP:
result.append(STATE_MAP[flattened_list[stamp_index][1]])
# for state in DATA_SCHEMA:
# if flattened_list[stamp_index][1] in DATA_SCHEMA[state]:
# result.append(state)
# break
else:
result.append("closed")
print(result[:50])
speaking_moments[each_speaker] = result
return speaking_moments
def extract_phoneme(word: str, duration: list[int], g2p: G2p = g2p) -> list[str]:
""" generate a timestamp mapped list of phone present in a word
Args:
word (str): word from the English language
timestamp (list[int]): ordered timestamps in multiples of 42
representing length of time assigned to word
e.g [0, 42, 84 ...]
[1(24anim), 1(24default)]
[]
24 frmez -> 1000 msecs
1 frmze -> 41.67 msecs
Returns:
list[str]: mouth shape for each frame in video
[hhw2, fo41, fmwo0, ]
{
hhw: 2,
fo4, 1,
fmwo: 0,
}
"""
# phonemes = pronouncing.phones_for_word(word)
try:
phonemes = pronouncing.phones_for_word(word)[0].split(' ')
except IndexError:
phonemes = g2p(word)
phonemes = [phnm for phnm in phonemes if phnm != ' ']
# print(phonemes)
phoneme_dict = {}
for ph in phonemes:
# phoneme_dict[re.sub(r'\d+', '', ph)] = int(re.findall(r'\d+', ph)[0]) + 1 if re.match(r'\d+', ph) else 0
if re.search(r'\d+', ph):
phoneme_dict[re.sub(r'\d+', '', ph)] = int(re.findall(r'\d+', ph)[0]) + 1
else:
phoneme_dict[re.sub(r'\d+', '', ph)] = 0
## sort weighted dictionary on value
sorted_phoneme_dict = dict(sorted(phoneme_dict.items(), key=lambda item: item[1], reverse=True))
result_list = [re.sub(r'\d+', '', phn) for phn in phonemes]
while len(result_list) < len(duration):
for phn in sorted_phoneme_dict:
if len(result_list) == len(duration):
break
else:
phn_index = result_list.index(phn)
result_list.insert(phn_index, phn)
# print(phn_index, phn, result_list)
# print(sorted_phoneme_dict)
# print(word, phonemes, sorted_phoneme_dict, result_list, dict(zip(duration, result_list)))
return tuple(zip(duration, result_list))
import re
import inflect
import logging
logging.basicConfig(filename=__file__)
class Speech:
"""
"""
def __init__(self, speaker: str, start: int, stop: int, text: str, index) -> None:
self.speaker = speaker
self.duration = self.generate_duration(start, stop)
self._text = text
self.index = index
self.p = inflect.engine()
@property
def text(self):
result: str = self._text.strip('.?,')
# result = result
if re.search(r'\d+', result):
digits = re.findall(r'\d+', result)[0]
transcription: str = self.p.number_to_words(int(digits))
transcription = transcription.replace('-', ' ')
result = result.replace(digits, f'{transcription}')
return result
def generate_duration(self, start: int, stop: int) -> list[int]:
"""_summary_
Args:
start (int): _description_
stop (int): _description_
Returns:
list[int]: _description_
[1, 2, 3....2000]secs
["speech", "speech", "silence"]
[42, 84, 126, ..... 20000]msecs
["mbf", "mbf", "mbf",..... "fv" ]
"""
if start == stop:
res = []
res.append(self.round_to_multiple(start, 42))
return res
return list(map(self.round_to_multiple, list(range(start,stop, 42))))
# Developing a function to round to a multiple
@staticmethod
def round_to_multiple(number, multiple=42):
return multiple * round(number / multiple)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment