Last active
December 2, 2024 05:24
-
-
Save ColeMurray/0491bfeef78d6e1abb8c46d84eebc0de to your computer and use it in GitHub Desktop.
Video Transcription with Deepgram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pathlib import Path | |
from datetime import datetime | |
import logging | |
from typing import Optional | |
from dotenv import load_dotenv | |
from moviepy.editor import VideoFileClip | |
import httpx | |
from deepgram import ( | |
DeepgramClient, | |
DeepgramClientOptions, | |
PrerecordedOptions, | |
FileSource, | |
) | |
class VideoTranscriber: | |
def __init__(self, api_key: Optional[str] = None): | |
"""Initialize the VideoTranscriber with optional API key.""" | |
load_dotenv() | |
self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY") | |
if not self.api_key: | |
raise ValueError("Deepgram API key not found. Set DEEPGRAM_API_KEY environment variable or pass it to the constructor.") | |
self.deepgram = DeepgramClient(self.api_key) | |
def extract_audio(self, video_path: str, output_dir: str = "temp") -> str: | |
"""Extract audio from video file and save as WAV.""" | |
# Create output directory if it doesn't exist | |
Path(output_dir).mkdir(parents=True, exist_ok=True) | |
# Generate output audio path | |
video_filename = Path(video_path).stem | |
audio_path = str(Path(output_dir) / f"{video_filename}_audio.wav") | |
# Extract audio using moviepy | |
logging.info(f"Extracting audio from {video_path}") | |
video = VideoFileClip(video_path) | |
video.audio.write_audiofile(audio_path) | |
video.close() | |
return audio_path | |
def transcribe_audio(self, audio_path: str, **kwargs) -> dict: | |
"""Transcribe audio file using Deepgram API.""" | |
logging.info(f"Transcribing audio file: {audio_path}") | |
# Read audio file | |
with open(audio_path, "rb") as file: | |
buffer_data = file.read() | |
payload: FileSource = { | |
"buffer": buffer_data, | |
} | |
# Set up transcription options | |
options = PrerecordedOptions( | |
model="nova-2", | |
smart_format=True, | |
utterances=True, | |
punctuate=True, | |
diarize=True, | |
**kwargs # Allow passing additional options | |
) | |
# Perform transcription | |
response = self.deepgram.listen.rest.v("1").transcribe_file( | |
payload, | |
options, | |
timeout=httpx.Timeout(300.0, connect=10.0) | |
) | |
return response | |
def transcribe_video(self, video_path: str, cleanup: bool = True, **kwargs) -> dict: | |
"""Extract audio from video and transcribe it.""" | |
try: | |
# Time the operation | |
start_time = datetime.now() | |
# Extract audio | |
audio_path = self.extract_audio(video_path) | |
# Transcribe | |
response = self.transcribe_audio(audio_path, **kwargs) | |
# Cleanup temporary audio file if requested | |
if cleanup and os.path.exists(audio_path): | |
os.remove(audio_path) | |
logging.info(f"Cleaned up temporary audio file: {audio_path}") | |
# Calculate duration | |
duration = datetime.now() - start_time | |
logging.info(f"Total processing time: {duration.seconds} seconds") | |
return response | |
except Exception as e: | |
logging.error(f"Error during video transcription: {str(e)}") | |
raise | |
def main(): | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
# Get video path from command line or use default | |
import sys | |
video_path = sys.argv[1] if len(sys.argv) > 1 else "video.mp4" | |
try: | |
# Initialize transcriber | |
transcriber = VideoTranscriber() | |
# Transcribe video | |
response = transcriber.transcribe_video( | |
video_path, | |
cleanup=True, # Remove temporary audio file after transcription | |
# Additional Deepgram options can be passed here | |
language="en-US" | |
) | |
# Print results | |
print("\nTranscription Results:") | |
print(response.to_json(indent=4)) | |
except Exception as e: | |
logging.error(f"Error: {str(e)}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment