Created
December 2, 2022 07:04
-
-
Save nwizugbesamson/05f4c2fbdc3af57d8a2bd4913609d024 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import random | |
from pathlib import Path | |
import cv2 | |
import numpy as np | |
from PIL import Image , ImageDraw, ImageFont | |
import textwrap | |
from podcast_animator.analysis.animation_frame_constructor import AnimationFrame | |
# from .image_generator import generate_image | |
# import itertools | |
# import time | |
FRAMES = 24 | |
class Animator: | |
"""generate animation from provided schema | |
""" | |
def __init__(self, background: Path, avatar_map: dict[str, Path], frames=24, **kwargs) -> None: | |
self.bg_path = background | |
self.avatar_map = avatar_map | |
self.images = [] | |
self.frames = frames | |
self.font = ImageFont.truetype(f'data/Fonts/{kwargs.get("font") or "arial"}.ttf', 10) | |
def build_images(self, schema: dict[str, dict[str, list[dict[str, Path | str] | None]]], animation_frame_length: int): | |
"""_summary_ | |
Args: | |
schema (dict[str, dict[str, list[dict[str, Path | str] | None]]]): _description_ | |
animation_frame_length (int): _description_ | |
""" | |
animation_frame_length = animation_frame_length | |
max_iter_range = min(animation_frame_length, 6000) | |
try: | |
for i in range(1, max_iter_range): | |
# for i in range(1, animation_frame_length + 1): | |
image = self._create_image(schema[str(i)]) | |
self.images.append(image) | |
except KeyError: | |
for i in range(1, 6000): | |
# for i in range(1, animation_frame_length + 1): | |
image = self._create_image(schema[i]) | |
self.images.append(image) | |
def _create_image(self, frame_obj): | |
background_image = Image.open(self.bg_path) | |
background_image = background_image.convert(mode='RGBA') | |
width, length = background_image.size | |
canvas = Image.new(mode='RGBA', size=(width, length), color=(255, 255, 255)) | |
canvas.paste(im=background_image, box=(0,0)) | |
offset = '' | |
for speaker in frame_obj: | |
base_image_path = self.avatar_map[speaker] / 'base.png' | |
base_image = Image.open(base_image_path) | |
base_image = base_image.convert('RGBA') | |
canvas = Image.alpha_composite(canvas, base_image) | |
if len(frame_obj[speaker]) == 0: | |
mouth_path = self.avatar_map[speaker] / "mouths/closed.png" | |
eye_path = self.avatar_map[speaker] / "eyes/default.png" | |
speaker_word = None | |
else: | |
if len(frame_obj[speaker]) == 1: | |
obj = frame_obj[speaker][0] | |
elif len(frame_obj[speaker]): | |
obj = random.choice(frame_obj[speaker]) | |
mouth_path = obj["mouth"] | |
# TODO 1. add eyes | |
eye_path = self.avatar_map[speaker] / "eyes/default.png" | |
speaker_word = obj["word"] | |
mouth = Image.open(mouth_path) | |
mouth = mouth.convert('RGBA') | |
canvas = Image.alpha_composite(canvas, mouth) | |
eye = Image.open(eye_path) | |
eye = eye.convert('RGBA') | |
canvas = Image.alpha_composite(canvas, eye) | |
if speaker_word: | |
self._draw_word(speaker_word, canvas, offset) | |
numpy_img = np.array(canvas) | |
cv2_image = cv2.cvtColor(numpy_img, cv2.COLOR_RGB2BGR) | |
cv2_image | |
return cv2_image | |
def _draw_word(self, speaker_word: str, image: Image, offset: int) -> None: | |
"""draw subtitle on frame | |
Args: | |
speaker_word (str): subtitle to be drawn | |
image (Image): image to draw subtitle | |
offset (int): position to draw_subtitle | |
""" | |
W,H = image.size | |
wrapper = textwrap.TextWrapper(width=W*0.07) | |
word_list = wrapper.wrap(text=speaker_word) | |
caption_new = '' | |
for ii in word_list[:-1]: | |
caption_new = caption_new + ii + '\n' | |
caption_new += word_list[-1] | |
draw = ImageDraw.Draw(image) | |
w,h = draw.textsize(caption_new, font=self.font) | |
x,y = 0.5*(W-w),0.90*H-h | |
draw.text((x, y), caption_new, font=self.fon | |
def build_video(self, build_path: Path): | |
frame_one = self.images[0] | |
# return | |
height, width, _ = frame_one.shape | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Be sure to use lower case | |
out = cv2.VideoWriter(str(build_path.absolute()), fourcc, 24.0, (width, height)) | |
# print("VIDEO WRITER START") | |
# start_write = time.time() | |
for image in self.images: | |
# subprocess.run( | |
# [f"ffmpeg -framerate 1 -i {image} -c:v libx264 -r 24 {str(output.absolute())}"], | |
# shell=True) | |
out.write(image) # Write out frame to video | |
# cv2.imshow('video',frame) | |
if (cv2.waitKey(1) & 0xFF) == ord('q'): # Hit `q` to exit | |
break | |
# print(f"END WRITING: [{time.time() - start_write}]") | |
# Release everything if job is finished | |
out.release() | |
cv2.destroyAllWindows() | |
# return output | |
# frame_one = self.images[0] | |
# height, width, _ = frame_one.shape | |
# process = ( | |
# ffmpeg | |
# .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) | |
# .output(str(build_path), pix_fmt='yuv420p', vcodec='libx264', r=self.frames, format='mp4') | |
# .overwrite_output() | |
# .run_async(pipe_stdin=True) | |
# ) | |
# for image in self.images: | |
# process.stdin.write( | |
# image | |
# .astype(np.uint8) | |
# .tobytes() | |
# ) | |
# process.stdin.close() | |
# process.wait() | |
# for i in self.images: | |
# subprocess.run( | |
# [f"ffmpeg -framerate 1 -i {i} -c:v libx264 -r {self.frames} {build_path}"], | |
# shell=True | |
# ) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
# import itertools | |
import re | |
def extract_wordtimestamps(sentence_dict): | |
split_sentence = sentence_dict["text"].split(' ') | |
words_obj = sentence_dict["words"] | |
timestamp_list = [f"{wrd['start']}-{wrd['end']}" for wrd in words_obj] | |
result = {} | |
start = 0 | |
for index, wrd in enumerate(split_sentence): | |
if re.search(r'\.$|\?$|\,$', wrd): | |
sentence = " ".join(split_sentence[start: index +1]) | |
timestamps = ";".join(timestamp_list[start:index+1]) | |
result[sentence] = timestamps | |
start = index + 1 | |
return result | |
# def extract_timestamps(sentence_dict: dict[str, str | dict[str, str | int]]): | |
# sentence = sentence_dict["text"] | |
# split_sentence = list(filter(None, re.split(r'[.,?]', sentence))) | |
# split_words = [phrase.split(' ') for phrase in split_sentence ] | |
# for phrase_list in split_words: | |
# phrase_list[0], phrase_list[-1] = "##start##", "##end##" | |
# all_words = itertools.chain.from_iterable(split_words) | |
# word_obj = sentence_dict["words"] | |
# word_list = [ | |
# ( f'{word["start"]}-{word["end"]}', word["text"]) for word in word_obj | |
# ] | |
# counter = 0 | |
# for word, time_stamp in zip(all_words, word_list): | |
# if split_words[counter][0] != "##start##" and split_words[counter][-1] != "##end##": | |
# counter += 1 | |
# if word == "##start##": | |
# split_words[counter][0] = time_stamp[0].split('-')[0] | |
# elif word == "##end##": | |
# split_words[counter][-1] = time_stamp[0].split('-')[1] | |
# result = {} | |
# for index, sentence in enumerate(split_sentence): | |
# result[f"{split_words[index][0]}-{split_words[index][-1]}"] = sentence | |
# return result | |
def diarize_audio(audio, api): | |
""" | |
using assembly ai we send an audio url and obtain json data for the audio file | |
@author : cchimdindu | |
Args: | |
audio (string): url of video | |
Returns: | |
lis: json file is sorted out and returned in list format | |
""" | |
url = "https://api.assemblyai.com/v2/transcript" | |
post_body = { | |
"audio_url": audio, | |
"speaker_labels": True, #include speaker labels | |
# "disfluencies": True #transcribe filler words | |
} | |
headers = { | |
"authorization": api, | |
"content-type": "application/json", | |
} | |
transcription_response = requests.post(url, json=post_body, headers=headers) | |
transcription_id = transcription_response.json()["id"] | |
## remove content-type from request headers | |
del headers["content-type"] | |
transcription_status = False | |
while not transcription_status: | |
## id of transcribed audio used for testing | |
# "rxym41rlo2-2606-4354-ae8f-095ccdf58181" | |
# response= requests.get(f'{url}/rxym41rlo2-2606-4354-ae8f-095ccdf58181', headers=headers) | |
response= requests.get(f'{url}/{transcription_id}', headers=headers) | |
request_data = response.json() | |
status = request_data["status"] | |
if status != "completed": | |
print(f"Processing Audio, Status: [{status}]") | |
time.sleep(20) | |
else: | |
transcription_status = True | |
remove_list = ["confidence"] | |
# utterances = [ | |
# { key: value for key, value in utterance.items() if key not in remove_list} | |
# for utterance in request_data["utterances"] | |
# ] | |
def utterance_schema(utterance: dict) -> dict: | |
result = {} | |
for key, value in utterance.items(): | |
if key == "words": | |
result["sentences"] = extract_wordtimestamps(utterance) | |
elif key not in remove_list: | |
result[key] = value | |
return result | |
transcription = { | |
"transcription_id": 'rxym41rlo2-2606-4354-ae8f-095ccdf58181', | |
"audio_duration_seconds": request_data["audio_duration"], | |
"text":request_data["text"], | |
"speech":[utterance_schema(utterance) for utterance in request_data["utterances"]] | |
# "words": [ | |
# { key: value for key, value in word.items() if key not in remove_list} | |
# for word in request_data["words"] | |
# ] | |
} | |
return transcription |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment