Skip to content

Instantly share code, notes, and snippets.

@twobob
Created July 3, 2022 16:57
Show Gist options
  • Save twobob/c54da13bbfb3e06a801d0703ccb55c62 to your computer and use it in GitHub Desktop.
Save twobob/c54da13bbfb3e06a801d0703ccb55c62 to your computer and use it in GitHub Desktop.
determine more accurately the spoken WPM in a yt video.
import youtube_dl
import time
import re
import requests
import os
import sys
import matplotlib.pyplot as plt
import subprocess
import re
import logging
import os
import numpy as np
import scipy.io
import scipy.io.wavfile
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logging.getLogger()
def convert_to_wav(filename):
os.rename(filename, filename.replace(" ", "_"))
filename = filename.replace(" ", "_")
new_name = f"{os.path.splitext(filename)[0]}_converted.wav".replace(" ", "_")
command = f"ffmpeg -i {filename} -f wav -acodec pcm_s16le -ar 22050 -ac 1 {new_name} -y"
subprocess.run(command.split())
return new_name
def get_chunk_times(in_filename, silence_threshold, silence_duration=1):
silence_duration_re = re.compile('silence_duration: (\d+.\d+)')
silence_end_re = re.compile('silence_end: (\d+.\d+)\s')
command = f"ffmpeg -i {in_filename} -af silencedetect=n=-{silence_threshold}dB:d={silence_duration} -f null - "
out = subprocess.run(command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
stdout = out.stdout
lines = stdout.splitlines()
ts = 0
chunks = []
for line in lines:
match = silence_duration_re.search(str(line))
if (match):
chunk_time = float(match.group(1))
ts = ts + chunk_time
end = silence_end_re.search(str(line))
if (end):
t_end = float(end.group(1))
t_start = t_end - chunk_time
chunks.append([t_start, t_end, chunks])
logging.info(f"TS audio {os.path.basename(in_filename)} = {ts}")
return ts, chunks
def get_audio_plot(filename, chunks):
fig, ax = plt.subplots()
fig.set_size_inches(18.5, 10.5)
sampleRate, audioBuffer = scipy.io.wavfile.read(filename)
duration = len(audioBuffer) / sampleRate
time = np.arange(0, duration, 1 / sampleRate)
ax.plot(time, audioBuffer)
y1 = min(audioBuffer)
y2 = max(audioBuffer)
for c in chunks:
ax.fill_between(c[0:2], y1, y2,
color='red', alpha=0.3)
plt.xlabel('Time [s]')
plt.ylabel('Amplitude')
plt.title("Cumulatively considered silences")
return plt.gcf()
def get_audio_info(audio, threshold):
new_audio = convert_to_wav(audio)
ts, chunks = get_chunk_times(new_audio, threshold, 1)
p = get_audio_plot(new_audio, chunks)
return str(ts), p
argv = sys.argv[1:]
Url = ""
def down_sub(video_url, language):
# check if valid youtube_link and remove playlist ID from url if exists.
_temp = video_url.lower()
if "youtube.com" in _temp or "youtu.be" in _temp:
if '&list=' in video_url:
video_url = video_url.split('&list=')[0].strip()
ydl_opts = {'dump-json': True,
'writesubtitles': True,
'writeautomaticsub': True,
'quiet': True,
"no_warnings": True,
"logtostderr": True,
'youtube_include_dash_manifest': False}
try:
with youtube_dl.YoutubeDL(ydl_opts) as (ydl):
info_dict = ydl.extract_info(video_url, download=False)
# print(info_dict)
if not info_dict['formats']:
print("Status : Something went wrong retry or video is unavailable")
return
except:
print('Error : Check your Internet Connection or Url.')
return
video_title = info_dict['title']
video_name = re.sub("[\\\\/*?:\"<>|]", "", video_title)
subtitles = info_dict.get('subtitles')
automatic_captions = info_dict.get('automatic_captions')
if subtitles:
subtitle = subtitles.get(language)
if subtitle:
for fmt in subtitle:
if fmt['ext'] == 'vtt':
sub_dlink = fmt['url']
return [sub_dlink, video_name]
if automatic_captions:
subtitle = automatic_captions.get(language)
if subtitle:
for fmt in subtitle:
if fmt['ext'] == 'vtt':
sub_dlink = fmt['url']
return [sub_dlink, video_name]
def main():
sensitivity = 15
# Check if the options' length is 1
if len(argv) == 0:
url = input('Please input a video URL: ')
elif len(argv) >= 1:
url = argv[0]
if len(argv) == 2:
sensitivity = int(argv[0])
if len(argv) > 2:
print('usage python wpm.py URL integer[0-50]')
ydl_opts = {
'writesubtitles': True,
'format': 'm4a/none',
'writeautomaticsub': True,
'subtitlesformat': 'vtt',
'subtitleslangs': ['en'],
'noplaylist': True,
'outtmpl': 'TEMP_AUDIO.%(ext)s',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '22050',
}],
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=False)
ydl.download([url])
silence_duration, plot = (get_audio_info('./TEMP_AUDIO.wav', 15))
print("silence length of", silence_duration)
duration = info_dict.get('duration') - float(silence_duration)
plt.show(block=True)
print('The spoken duration of the video is:', str(info_dict.get('duration')), 'seconds -', silence_duration, 'of silence')
subtitle = down_sub(url, 'en')
puresub = False
if subtitle: # check if not None
r = requests.get(subtitle[0]) # subtitle[0] download url.
with open(os.path.join('subtitles', f"{subtitle[1]}.vtt"), 'wb') as f:
parsed_subs = r.content.decode("utf-8")
if "align:start" not in parsed_subs:
puresub = True
print('Pure Subs')
saved_subs = r.content
else: # subtitle[1] video name.
f.write(r.content)
print('\n Subtitle Downloaded Successfully.')
if not puresub:
file = open(os.path.join('subtitles', f"{subtitle[1]}.vtt"))
content = file.readlines()
subs = content[6]
new = re.sub(r'\d{2}\W\d{2}\W\d{2}\W\d{3}\s\W{3}\s\d{2}\W\d{2}\W\d{2}\W\d{3}', '', subs)
# print(new)
totalcount = len(new.split(' '))
else:
content = saved_subs.decode("utf-8")
new = re.sub(r'\d{2}\W\d{2}\W\d{2}\W\d{3}\s\W{3}\s\d{2}\W\d{2}\W\d{2}\W\d{3}', '', content)
flat = ' '.join(new.split())
flat = flat.replace('WEBVTT Kind: captions Language: en', '')
print(flat)
totalcount = len(flat.split(' '))
convert = time.strftime("%H:%M:%S", time.gmtime(duration))
convertsilence = time.strftime("%H:%M:%S", time.gmtime(float(silence_duration)))
total_time = time.strftime("%H:%M:%S", time.gmtime( duration + float(silence_duration) ))
print(totalcount, 'total words spoken in HMS:', convert, "of total time:",
total_time, "excluding", convertsilence, "silence")
wpm = int(totalcount / (duration / 60))
print(wpm, 'de-silenced wpm')
sys.exit(2)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment