Created
November 3, 2022 07:39
-
-
Save bavadim/507bfd8e07eff15d5aa17a5692b48ee5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Optional, Tuple, Union | |
import numpy as np | |
import ffmpeg | |
import torch | |
import torch.nn.functional as F | |
import whisper | |
from whisper.audio import SAMPLE_RATE, N_FRAMES, HOP_LENGTH, pad_or_trim, log_mel_spectrogram | |
from whisper.decoding import DecodingOptions, DecodingResult | |
from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE, get_tokenizer | |
from whisper.utils import exact_div, format_timestamp, optional_int, optional_float, str2bool, write_txt, write_vtt, write_srt | |
from whisper.model import Whisper | |
import time | |
model = whisper.load_model("tiny") | |
def load_audio(buffer: bytearray, seek, sr: int = 16000): | |
""" | |
Open an audio file and read as mono waveform, resampling as necessary | |
Parameters | |
---------- | |
file: str | |
The audio file to open | |
sr: int | |
The sample rate to resample the audio if necessary | |
Returns | |
------- | |
A NumPy array containing the audio waveform, in float33 dtype. | |
""" | |
#try: | |
# # This launches a subprocess to decode audio while down-mixing and resampling as necessary. | |
# # Requires the ffmpeg CLI and `ffmpeg-python` package to be installed. | |
# process = ( | |
# ffmpeg.input("pipe:", threads=1) | |
# .output("-", format="f32le", acodec="pcm_s16le", ac=1, ar=sr) | |
# .run_async(cmd=["ffmpeg", "-nostdin"], pipe_stdout=True, pipe_stderr=True, pipe_stdin=True) | |
# ) | |
# out, _ = process.communicate(input=buffer) | |
#except ffmpeg.Error as e: | |
# raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e | |
#return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0 | |
try: | |
# This launches a subprocess to decode audio while down-mixing and resampling as necessary. | |
# Requires the ffmpeg CLI and `ffmpeg-python` package to be installed. | |
out, _ = ( | |
ffmpeg.input(buffer, threads=0, ss=seek) | |
.output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr) | |
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True) | |
) | |
except ffmpeg.Error as e: | |
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e | |
return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0 | |
def _user_transcriber(user): | |
current_h = [] | |
global_h = [] | |
chain_start_ts = int(time.time()) | |
def internal(path, seek): | |
nonlocal current_h, global_h, chain_start_ts | |
if seek % 30 == 0: | |
global_h.extend(current_h) | |
offset = (seek // 30) * 30 | |
audio = load_audio(path, offset) | |
if audio.shape[0] == 0: | |
current_h = [] | |
else: | |
ts = chain_start_ts + offset | |
#print(user, ts) | |
current_h = [ (s['text'], user, s['start'] + ts, s['end'] + ts) for s in model.transcribe(audio, language= 'ru', fp16=False)['segments'] ] | |
return global_h + current_h | |
def history(): | |
return global_h + current_h | |
return internal, history | |
def _merge_history(transcribers): | |
history = [] | |
for _, hist in transcribers.values(): | |
history.extend(hist()) | |
r= sorted(history, key=lambda t: t[2]) | |
return r | |
transcribers = {} | |
def clear(): | |
transcribers.clear() | |
def history() -> List[str]: | |
return _merge_history(transcribers) | |
def add2hist_transcribed(buffer: bytearray, user, seek): | |
global transcribers | |
handler = transcribers.get(user) | |
if handler == None: | |
trnscrb, hist = _user_transcriber(user) | |
else: | |
trnscrb, hist = handler | |
transcribers[user] = (trnscrb, hist) | |
trnscrb(buffer, seek) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment