Created
November 6, 2025 19:31
-
-
Save 0187773933/1e8ad4f0d3bc8edca9020da13ab52db6 to your computer and use it in GitHub Desktop.
Censors Profanity from Video
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import string | |
| import subprocess | |
| import json | |
| import unicodedata | |
| from pathlib import Path | |
| from pprint import pprint | |
| import torch | |
| import re | |
| from tqdm import tqdm | |
| from pydub import AudioSegment | |
| from faster_whisper import WhisperModel | |
| from profanity_check import predict, predict_prob | |
| from yt_dlp import YoutubeDL | |
| from rapidfuzz import fuzz , process | |
| from rapidfuzz.distance import Levenshtein | |
| # ----------------------------- config ----------------------------- | |
| DEFAULT_MODEL = "distil-large-v3" # Distilled Whisper with great speed/quality | |
| DEFAULT_COMPUTE = "float32" # use "int8" on CPU if RAM tight | |
| DEFAULT_THRESHOLD = 0.92 # word-level probability to censor | |
| DEFAULT_FUZZY_SCORE = 87 # fuzzy threshold (0-100) | |
| DEFAULT_MERGE_MS = 140 # merge hits closer than this gap (ms) | |
| DEFAULT_MIN_SEG_MS = 60 # ensure a minimum mute/beep duration (ms) | |
| DEFAULT_BEEP_BASE_HZ = 1000 # base beep freq | |
| DEFAULT_BEEP_RANGE_HZ = 400 # +/- freq by score | |
| DEFAULT_PROFANITY_THRESHOLD = 0.5 # profanity probability threshold (0-1) | |
| DEFAULT_LEVENSHTEIN_THRESHOLD = 0.92 | |
| DEFAULT_PADDING_MS = 80 | |
| DEFAULT_FADE_DUR = 0.04 # 40 ms fade | |
| DEFAULT_WHITELIST = [ | |
| "freak" , "freaking" , "penis" , | |
| "tits" , "piss" , "pissing" , "pissed" , "butt" , "horny" , "bum" , "fool" , "hell" , "sex" , | |
| "sucks" , "crap" , "fart" , "kill" , "stupid" , "wtf" , "nazi" , "assassin" , "motherboard" , "hellboy" , "shoot" , | |
| ] | |
| # ------------------------------------------------------------------ | |
| def write_text( file_path , text_lines_list ): | |
| #with open( file_path , 'a', encoding='utf-8' ) as f: | |
| with open( file_path , "w" , encoding="utf-8" ) as f: | |
| f.writelines( text_lines_list ) | |
| def read_text( file_path ): | |
| with open( file_path ) as f: | |
| return f.read().splitlines() | |
| def write_json( file_path , python_object ): | |
| with open( file_path , "w" , encoding="utf-8" ) as f: | |
| json.dump( python_object , f , ensure_ascii=False , indent=4 ) | |
| def read_json( file_path ): | |
| with open( file_path ) as f: | |
| return json.load( f ) | |
| def get_wav_duration( input_path ): | |
| audio = AudioSegment.from_wav( input_path ) | |
| duration = len( audio ) / 1000.0 | |
| return duration | |
| def sanitize_filename(title: str) -> str: | |
| """ | |
| Normalize title → filesystem-safe ASCII (underscores for spaces, remove symbols/emojis). | |
| """ | |
| # Normalize unicode (NFKD) and remove non-ASCII | |
| title = unicodedata.normalize("NFKD", title) | |
| title = title.encode("ascii", "ignore").decode("ascii") | |
| # Replace invalid filename chars with underscores | |
| title = re.sub(r'[\\/*?:"<>|]', "_", title) | |
| # Replace whitespace and repeated underscores | |
| title = re.sub(r"\s+", "_", title) | |
| title = re.sub(r"_+", "_", title) | |
| return title.strip("_") | |
| def download_youtube_video(url: str) -> Path: | |
| """ | |
| Download a YouTube video at highest available quality into current working directory. | |
| Normalizes title for filesystem safety. | |
| Returns Path to downloaded MP4. | |
| """ | |
| print(f"🎥 Downloading YouTube video → {url}") | |
| # Probe video title first (quietly) | |
| try: | |
| title_cmd = ["/opt/homebrew/bin/yt-dlp", "--get-title", url] | |
| title = subprocess.check_output(title_cmd, text=True).strip() | |
| except Exception: | |
| title = "video" | |
| safe_title = sanitize_filename(title) | |
| output_template = f"{safe_title}.%(ext)s" | |
| if Path(f"{safe_title}.mp4").exists(): | |
| print(f"✅ Video already downloaded → {safe_title}.mp4") | |
| return Path(f"{safe_title}.mp4") | |
| cmd = [ | |
| "/opt/homebrew/bin/yt-dlp", | |
| "-f", "bestvideo+bestaudio/best", | |
| "--merge-output-format", "mp4", | |
| "-o", output_template, | |
| url | |
| ] | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) | |
| filename = None | |
| for line in process.stdout: | |
| line = line.strip() | |
| if line: | |
| print(line) | |
| # detect merger output | |
| if "[Merger]" in line and ".mp4" in line: | |
| match = re.search(r"‘(.+\.mp4)’|\"(.+\.mp4)\"", line) | |
| if match: | |
| filename = match.group(1) or match.group(2) | |
| process.wait() | |
| if process.returncode != 0: | |
| print("❌ yt-dlp download failed.") | |
| sys.exit(1) | |
| # fallback: find most recent mp4 | |
| if not filename: | |
| files = sorted(Path.cwd().glob(f"{safe_title}*.mp4"), key=lambda f: f.stat().st_mtime, reverse=True) | |
| if not files: | |
| sys.exit("❌ No MP4 file found after download.") | |
| filename = str(files[0]) | |
| print(f"✅ Download complete → {filename}") | |
| return Path(filename) | |
| def is_whitelisted_word( word ): | |
| word = word.lower().strip() | |
| for w in DEFAULT_WHITELIST: | |
| w = w.lower().strip() | |
| if not w: | |
| continue | |
| if re.fullmatch(rf"{re.escape(w)}(s|es|ed|ing)?", word): | |
| return True | |
| # 2️⃣ Levenshtein similarity check | |
| sim = Levenshtein.normalized_similarity(word, w) | |
| if sim >= DEFAULT_LEVENSHTEIN_THRESHOLD: | |
| return True | |
| return False | |
| def create_16k_wav_audio( input_path , output_path ): | |
| try: | |
| cmd = [ | |
| "ffmpeg" , "-y" , | |
| "-i", input_path , | |
| "-vn" , "-ac" , "1" , "-ar" , "16000" , | |
| output_path | |
| ] | |
| subprocess.run( cmd , stdout=subprocess.DEVNULL , stderr=subprocess.DEVNULL , check=True ) | |
| return True | |
| except Exception as e: | |
| print( e ) | |
| return False | |
| def transcribe_wav_audio( input_path , output_path ): | |
| try: | |
| duration = get_wav_duration( input_path ) | |
| model = WhisperModel( | |
| DEFAULT_MODEL , | |
| device="cuda" if torch.cuda.is_available() else "cpu" , | |
| compute_type=DEFAULT_COMPUTE , | |
| ) | |
| segments_gen, info = model.transcribe( | |
| input_path , | |
| beam_size=5 , | |
| vad_filter=True , | |
| word_timestamps=True , | |
| condition_on_previous_text=False , | |
| ) | |
| segments = [] | |
| with tqdm( total=duration , unit="s" , desc="Transcribing Audio" , ncols=100 , dynamic_ncols=True ) as pbar: | |
| last_progress = 0.0 | |
| for seg in segments_gen: | |
| text = ( seg.text or "" ).strip() | |
| text_profanity_prob = float( predict_prob( [ text ] )[ 0 ] ) | |
| if text: | |
| tqdm.write( text ) | |
| words = [] | |
| if seg.words: | |
| for w in seg.words: | |
| if w.start is None or w.end is None: | |
| continue | |
| _word = w.word.strip() | |
| _word_profanity_prob = float( predict_prob( [ _word ] )[ 0 ] ) | |
| words.append({ | |
| "word": _word , | |
| "start": float( w.start ) , | |
| "end": float( w.end ) , | |
| "profanity_prob": _word_profanity_prob | |
| }) | |
| segments.append({ | |
| "start": float( seg.start ) , | |
| "end": float( seg.end ) , | |
| "text": text , | |
| "profanity_prob": text_profanity_prob , | |
| "words": words | |
| }) | |
| write_json( output_path , segments ) | |
| current_progress = min( seg.end , duration ) | |
| pbar.update( current_progress - last_progress ) | |
| last_progress = current_progress | |
| return segments | |
| except Exception as e: | |
| print( e ) | |
| return False | |
| def is_profane_word( word , censor_list ): | |
| for c in censor_list: | |
| c = c.lower().strip() | |
| if not c: | |
| continue | |
| # exact or morphological variants | |
| if re.fullmatch(rf"{re.escape(c)}(s|es|ed|ing)?", word): | |
| return True | |
| # compound forms (e.g. "bullshit", "motherfucker") | |
| if re.search(rf"\b{re.escape(c)}(er|ers|ing|ed|in|n|a|o|y)?\b", word): | |
| return True | |
| # skip innocent substrings inside larger clean words | |
| if c in word and not (word.startswith(c) or word.endswith(c)): | |
| continue | |
| return False | |
| def fuzzy_profane_score( word , censor_list ): | |
| if not isinstance( word , str ): | |
| return 0.0 | |
| for c in censor_list: | |
| if not isinstance( c , str ): | |
| continue | |
| return fuzz.ratio( word , c ) | |
| return 0.0 | |
| def compute_muted_segments(transcription_json, muted_segments_json): | |
| transcription = read_json(transcription_json) | |
| censor_list = [c.strip().lower() for c in read_text("./censor.txt") if c.strip()] | |
| mute_spans = [] | |
| def _norm(s): | |
| # txt = w.get("word", "").lower().strip().translate(str.maketrans('', '', string.punctuation)) | |
| return re.sub(rf"[{re.escape(string.punctuation)}]", "", (s or "").lower().strip()) | |
| for seg in transcription: | |
| seg_words = seg.get("words", []) | |
| if not seg_words: | |
| continue | |
| word_texts = [_norm(w.get("word", "")) for w in seg_words] | |
| # ───────────────────────────────────────────── | |
| # 🔹 Multi-word phrase detection (Levenshtein) | |
| # ───────────────────────────────────────────── | |
| for phrase in censor_list: | |
| if " " not in phrase: | |
| continue # skip single words here | |
| phrase_norm = _norm(phrase) | |
| phrase_tokens = [t for t in phrase_norm.split() if t] | |
| if len(phrase_tokens) < 2: | |
| continue | |
| window_size = len(phrase_tokens) | |
| for i in range(0, len(word_texts) - window_size + 1): | |
| window_text = " ".join(word_texts[i:i + window_size]) | |
| ratio = Levenshtein.normalized_similarity(window_text, phrase_norm) | |
| if ratio >= DEFAULT_LEVENSHTEIN_THRESHOLD: | |
| s = max(0.0, float(seg_words[i]["start"]) - DEFAULT_PADDING_MS / 1000.0) | |
| e = float(seg_words[i + window_size - 1]["end"]) + DEFAULT_PADDING_MS / 1000.0 | |
| if e - s < DEFAULT_MIN_SEG_MS / 1000.0: | |
| e = s + DEFAULT_MIN_SEG_MS / 1000.0 | |
| print(f"🔇 phrase match: '{phrase}' (lev={ratio:.2f}) at {s:.2f}-{e:.2f}s") | |
| mute_spans.append((s, e)) | |
| break # stop after first hit per phrase per segment | |
| # ───────────────────────────────────────────── | |
| # 🔹 Single-word profanity detection (Levenshtein) | |
| # ───────────────────────────────────────────── | |
| for w in seg_words: | |
| txt = _norm(w.get("word", "")) | |
| p = w.get("profanity_prob", 0.0) | |
| # if txt in DEFAULT_WHITELIST or p < DEFAULT_PROFANITY_THRESHOLD: | |
| # continue | |
| if is_whitelisted_word( txt ): | |
| continue | |
| if p < DEFAULT_PROFANITY_THRESHOLD: | |
| continue | |
| # morphological direct hit | |
| if is_profane_word(txt, censor_list): | |
| pass | |
| else: | |
| # Levenshtein check against all censored entries | |
| best_sim = max((Levenshtein.normalized_similarity(txt, c) for c in censor_list), default=0) | |
| if best_sim < DEFAULT_LEVENSHTEIN_THRESHOLD: | |
| continue | |
| s = max(0.0, float(w["start"]) - DEFAULT_PADDING_MS / 1000.0) | |
| e = float(w["end"]) + DEFAULT_PADDING_MS / 1000.0 | |
| if e - s < DEFAULT_MIN_SEG_MS / 1000.0: | |
| e = s + DEFAULT_MIN_SEG_MS / 1000.0 | |
| print(f"🔇 word match: '{txt}' lev≥{DEFAULT_LEVENSHTEIN_THRESHOLD} p={p:.2f} , at {s:.2f}-{e:.2f}s") | |
| mute_spans.append((s, e)) | |
| # ───────────────────────────────────────────── | |
| # 🔹 Merge overlapping intervals | |
| # ───────────────────────────────────────────── | |
| mute_spans.sort() | |
| merged = [] | |
| for s, e in mute_spans: | |
| if not merged or s - merged[-1][1] > DEFAULT_MERGE_MS / 1000.0: | |
| merged.append([s, e]) | |
| else: | |
| merged[-1][1] = max(merged[-1][1], e) | |
| write_json(muted_segments_json, merged) | |
| return merged | |
| def mute_profanity_in_video( input_video , muted_segments , output_video ): | |
| muted_segments = read_json( muted_segments ) | |
| # build one simple volume filter | |
| filter_expr = ",".join( | |
| f"volume=enable='between(t,{s:.3f},{e:.3f})':volume=0" | |
| for s, e in muted_segments | |
| ) | |
| # get duration | |
| try: | |
| dur = float( | |
| subprocess.run( | |
| ["ffprobe","-v","error","-show_entries","format=duration", | |
| "-of","default=nk=1:nw=1",input_video], | |
| stdout=subprocess.PIPE, text=True, check=True | |
| ).stdout.strip() | |
| ) | |
| except Exception: | |
| dur = 1.0 | |
| cmd = [ | |
| "ffmpeg","-y", | |
| "-i",input_video, | |
| "-filter:a",filter_expr, | |
| "-c:v","copy", | |
| "-c:a","aac", | |
| "-b:a", "192k", | |
| "-shortest", | |
| output_video | |
| ] | |
| print("🎬 Muting audio (progress follows)…") | |
| proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True) | |
| time_re = re.compile(r"time=(\d+):(\d+):([\d.]+)") | |
| last = 0 | |
| for line in proc.stderr: | |
| m = time_re.search(line) | |
| if m: | |
| h, m_, s = map(float, m.groups()) | |
| t = h*3600 + m_*60 + s | |
| if t - last >= 2: | |
| pct = min(t/dur*100, 100) | |
| print(f"[ {t:7.2f} / {dur:7.2f}s ] {pct:5.1f}%") | |
| last = t | |
| proc.wait() | |
| print(f"[ {dur:7.2f} / {dur:7.2f}s ] 100.0%") | |
| if proc.returncode == 0: | |
| print(f"✅ Created censored video → {output_video}") | |
| return True | |
| print("❌ ffmpeg failed") | |
| return False | |
| if __name__ == "__main__": | |
| arg = sys.argv[ 1 ] | |
| if re.match( r"^https?://(www\.)?(youtube\.com|youtu\.be)/" , arg , re.I ): | |
| input_file = download_youtube_video( arg ) | |
| else: | |
| input_file = Path( arg ) | |
| wav_audio_file = input_file.with_suffix( ".wav" ) | |
| transcription_file = input_file.with_suffix( ".json" ) | |
| muted_segments_file = input_file.with_name( input_file.stem + "_muted_segments.json" ) | |
| final_video_file = input_file.with_name( input_file.stem + "_censored.mp4" ) | |
| if wav_audio_file.exists() == False: | |
| print( "Creating 16K WAV Audio for Transcription" ) | |
| create_16k_wav_audio( str( input_file ) , str( wav_audio_file ) ) | |
| if transcription_file.exists() == False: | |
| print( "Transcribing Audio" ) | |
| transcribe_wav_audio( str( wav_audio_file ) , str( transcription_file ) ) | |
| # if muted_segments_file.exists() == False: | |
| # print( "Computing Mute Segments" ) | |
| # compute_muted_segments( str( transcription_file ) , str( muted_segments_file ) ) | |
| # if final_video_file.exists() == False: | |
| # print( "Transcoding Video with Muted Audio" ) | |
| # mute_profanity_in_video( str( input_file ) , str( muted_segments_file ) , str( final_video_file ) ) | |
| compute_muted_segments( str( transcription_file ) , str( muted_segments_file ) ) | |
| mute_profanity_in_video( str( input_file ) , str( muted_segments_file ) , str( final_video_file ) ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment