Skip to content

Instantly share code, notes, and snippets.

@nwizugbesamson
Created December 2, 2022 07:04
Show Gist options
  • Save nwizugbesamson/05f4c2fbdc3af57d8a2bd4913609d024 to your computer and use it in GitHub Desktop.
Save nwizugbesamson/05f4c2fbdc3af57d8a2bd4913609d024 to your computer and use it in GitHub Desktop.
import subprocess
import random
from pathlib import Path
import cv2
import numpy as np
from PIL import Image , ImageDraw, ImageFont
import textwrap
from podcast_animator.analysis.animation_frame_constructor import AnimationFrame
# from .image_generator import generate_image
# import itertools
# import time
FRAMES = 24
class Animator:
"""generate animation from provided schema
"""
def __init__(self, background: Path, avatar_map: dict[str, Path], frames=24, **kwargs) -> None:
self.bg_path = background
self.avatar_map = avatar_map
self.images = []
self.frames = frames
self.font = ImageFont.truetype(f'data/Fonts/{kwargs.get("font") or "arial"}.ttf', 10)
def build_images(self, schema: dict[str, dict[str, list[dict[str, Path | str] | None]]], animation_frame_length: int):
"""_summary_
Args:
schema (dict[str, dict[str, list[dict[str, Path | str] | None]]]): _description_
animation_frame_length (int): _description_
"""
animation_frame_length = animation_frame_length
max_iter_range = min(animation_frame_length, 6000)
try:
for i in range(1, max_iter_range):
# for i in range(1, animation_frame_length + 1):
image = self._create_image(schema[str(i)])
self.images.append(image)
except KeyError:
for i in range(1, 6000):
# for i in range(1, animation_frame_length + 1):
image = self._create_image(schema[i])
self.images.append(image)
def _create_image(self, frame_obj):
background_image = Image.open(self.bg_path)
background_image = background_image.convert(mode='RGBA')
width, length = background_image.size
canvas = Image.new(mode='RGBA', size=(width, length), color=(255, 255, 255))
canvas.paste(im=background_image, box=(0,0))
offset = ''
for speaker in frame_obj:
base_image_path = self.avatar_map[speaker] / 'base.png'
base_image = Image.open(base_image_path)
base_image = base_image.convert('RGBA')
canvas = Image.alpha_composite(canvas, base_image)
if len(frame_obj[speaker]) == 0:
mouth_path = self.avatar_map[speaker] / "mouths/closed.png"
eye_path = self.avatar_map[speaker] / "eyes/default.png"
speaker_word = None
else:
if len(frame_obj[speaker]) == 1:
obj = frame_obj[speaker][0]
elif len(frame_obj[speaker]):
obj = random.choice(frame_obj[speaker])
mouth_path = obj["mouth"]
# TODO 1. add eyes
eye_path = self.avatar_map[speaker] / "eyes/default.png"
speaker_word = obj["word"]
mouth = Image.open(mouth_path)
mouth = mouth.convert('RGBA')
canvas = Image.alpha_composite(canvas, mouth)
eye = Image.open(eye_path)
eye = eye.convert('RGBA')
canvas = Image.alpha_composite(canvas, eye)
if speaker_word:
self._draw_word(speaker_word, canvas, offset)
numpy_img = np.array(canvas)
cv2_image = cv2.cvtColor(numpy_img, cv2.COLOR_RGB2BGR)
cv2_image
return cv2_image
def _draw_word(self, speaker_word: str, image: Image, offset: int) -> None:
"""draw subtitle on frame
Args:
speaker_word (str): subtitle to be drawn
image (Image): image to draw subtitle
offset (int): position to draw_subtitle
"""
W,H = image.size
wrapper = textwrap.TextWrapper(width=W*0.07)
word_list = wrapper.wrap(text=speaker_word)
caption_new = ''
for ii in word_list[:-1]:
caption_new = caption_new + ii + '\n'
caption_new += word_list[-1]
draw = ImageDraw.Draw(image)
w,h = draw.textsize(caption_new, font=self.font)
x,y = 0.5*(W-w),0.90*H-h
draw.text((x, y), caption_new, font=self.fon
def build_video(self, build_path: Path):
frame_one = self.images[0]
# return
height, width, _ = frame_one.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Be sure to use lower case
out = cv2.VideoWriter(str(build_path.absolute()), fourcc, 24.0, (width, height))
# print("VIDEO WRITER START")
# start_write = time.time()
for image in self.images:
# subprocess.run(
# [f"ffmpeg -framerate 1 -i {image} -c:v libx264 -r 24 {str(output.absolute())}"],
# shell=True)
out.write(image) # Write out frame to video
# cv2.imshow('video',frame)
if (cv2.waitKey(1) & 0xFF) == ord('q'): # Hit `q` to exit
break
# print(f"END WRITING: [{time.time() - start_write}]")
# Release everything if job is finished
out.release()
cv2.destroyAllWindows()
# return output
# frame_one = self.images[0]
# height, width, _ = frame_one.shape
# process = (
# ffmpeg
# .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
# .output(str(build_path), pix_fmt='yuv420p', vcodec='libx264', r=self.frames, format='mp4')
# .overwrite_output()
# .run_async(pipe_stdin=True)
# )
# for image in self.images:
# process.stdin.write(
# image
# .astype(np.uint8)
# .tobytes()
# )
# process.stdin.close()
# process.wait()
# for i in self.images:
# subprocess.run(
# [f"ffmpeg -framerate 1 -i {i} -c:v libx264 -r {self.frames} {build_path}"],
# shell=True
# )
import requests
import time
# import itertools
import re
def extract_wordtimestamps(sentence_dict):
split_sentence = sentence_dict["text"].split(' ')
words_obj = sentence_dict["words"]
timestamp_list = [f"{wrd['start']}-{wrd['end']}" for wrd in words_obj]
result = {}
start = 0
for index, wrd in enumerate(split_sentence):
if re.search(r'\.$|\?$|\,$', wrd):
sentence = " ".join(split_sentence[start: index +1])
timestamps = ";".join(timestamp_list[start:index+1])
result[sentence] = timestamps
start = index + 1
return result
# def extract_timestamps(sentence_dict: dict[str, str | dict[str, str | int]]):
# sentence = sentence_dict["text"]
# split_sentence = list(filter(None, re.split(r'[.,?]', sentence)))
# split_words = [phrase.split(' ') for phrase in split_sentence ]
# for phrase_list in split_words:
# phrase_list[0], phrase_list[-1] = "##start##", "##end##"
# all_words = itertools.chain.from_iterable(split_words)
# word_obj = sentence_dict["words"]
# word_list = [
# ( f'{word["start"]}-{word["end"]}', word["text"]) for word in word_obj
# ]
# counter = 0
# for word, time_stamp in zip(all_words, word_list):
# if split_words[counter][0] != "##start##" and split_words[counter][-1] != "##end##":
# counter += 1
# if word == "##start##":
# split_words[counter][0] = time_stamp[0].split('-')[0]
# elif word == "##end##":
# split_words[counter][-1] = time_stamp[0].split('-')[1]
# result = {}
# for index, sentence in enumerate(split_sentence):
# result[f"{split_words[index][0]}-{split_words[index][-1]}"] = sentence
# return result
def diarize_audio(audio, api):
"""
using assembly ai we send an audio url and obtain json data for the audio file
@author : cchimdindu
Args:
audio (string): url of video
Returns:
lis: json file is sorted out and returned in list format
"""
url = "https://api.assemblyai.com/v2/transcript"
post_body = {
"audio_url": audio,
"speaker_labels": True, #include speaker labels
# "disfluencies": True #transcribe filler words
}
headers = {
"authorization": api,
"content-type": "application/json",
}
transcription_response = requests.post(url, json=post_body, headers=headers)
transcription_id = transcription_response.json()["id"]
## remove content-type from request headers
del headers["content-type"]
transcription_status = False
while not transcription_status:
## id of transcribed audio used for testing
# "rxym41rlo2-2606-4354-ae8f-095ccdf58181"
# response= requests.get(f'{url}/rxym41rlo2-2606-4354-ae8f-095ccdf58181', headers=headers)
response= requests.get(f'{url}/{transcription_id}', headers=headers)
request_data = response.json()
status = request_data["status"]
if status != "completed":
print(f"Processing Audio, Status: [{status}]")
time.sleep(20)
else:
transcription_status = True
remove_list = ["confidence"]
# utterances = [
# { key: value for key, value in utterance.items() if key not in remove_list}
# for utterance in request_data["utterances"]
# ]
def utterance_schema(utterance: dict) -> dict:
result = {}
for key, value in utterance.items():
if key == "words":
result["sentences"] = extract_wordtimestamps(utterance)
elif key not in remove_list:
result[key] = value
return result
transcription = {
"transcription_id": 'rxym41rlo2-2606-4354-ae8f-095ccdf58181',
"audio_duration_seconds": request_data["audio_duration"],
"text":request_data["text"],
"speech":[utterance_schema(utterance) for utterance in request_data["utterances"]]
# "words": [
# { key: value for key, value in word.items() if key not in remove_list}
# for word in request_data["words"]
# ]
}
return transcription
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment