Skip to content

Instantly share code, notes, and snippets.

@0187773933
Created November 6, 2025 19:31
Show Gist options
  • Select an option

  • Save 0187773933/1e8ad4f0d3bc8edca9020da13ab52db6 to your computer and use it in GitHub Desktop.

Select an option

Save 0187773933/1e8ad4f0d3bc8edca9020da13ab52db6 to your computer and use it in GitHub Desktop.
Censors Profanity from Video
#!/usr/bin/env python3
import sys
import string
import subprocess
import json
import unicodedata
from pathlib import Path
from pprint import pprint
import torch
import re
from tqdm import tqdm
from pydub import AudioSegment
from faster_whisper import WhisperModel
from profanity_check import predict, predict_prob
from yt_dlp import YoutubeDL
from rapidfuzz import fuzz , process
from rapidfuzz.distance import Levenshtein
# ----------------------------- config -----------------------------
DEFAULT_MODEL = "distil-large-v3" # Distilled Whisper with great speed/quality
DEFAULT_COMPUTE = "float32" # use "int8" on CPU if RAM tight
DEFAULT_THRESHOLD = 0.92 # word-level probability to censor
DEFAULT_FUZZY_SCORE = 87 # fuzzy threshold (0-100)
DEFAULT_MERGE_MS = 140 # merge hits closer than this gap (ms)
DEFAULT_MIN_SEG_MS = 60 # ensure a minimum mute/beep duration (ms)
DEFAULT_BEEP_BASE_HZ = 1000 # base beep freq
DEFAULT_BEEP_RANGE_HZ = 400 # +/- freq by score
DEFAULT_PROFANITY_THRESHOLD = 0.5 # profanity probability threshold (0-1)
DEFAULT_LEVENSHTEIN_THRESHOLD = 0.92
DEFAULT_PADDING_MS = 80
DEFAULT_FADE_DUR = 0.04 # 40 ms fade
DEFAULT_WHITELIST = [
"freak" , "freaking" , "penis" ,
"tits" , "piss" , "pissing" , "pissed" , "butt" , "horny" , "bum" , "fool" , "hell" , "sex" ,
"sucks" , "crap" , "fart" , "kill" , "stupid" , "wtf" , "nazi" , "assassin" , "motherboard" , "hellboy" , "shoot" ,
]
# ------------------------------------------------------------------
def write_text( file_path , text_lines_list ):
#with open( file_path , 'a', encoding='utf-8' ) as f:
with open( file_path , "w" , encoding="utf-8" ) as f:
f.writelines( text_lines_list )
def read_text( file_path ):
with open( file_path ) as f:
return f.read().splitlines()
def write_json( file_path , python_object ):
with open( file_path , "w" , encoding="utf-8" ) as f:
json.dump( python_object , f , ensure_ascii=False , indent=4 )
def read_json( file_path ):
with open( file_path ) as f:
return json.load( f )
def get_wav_duration( input_path ):
audio = AudioSegment.from_wav( input_path )
duration = len( audio ) / 1000.0
return duration
def sanitize_filename(title: str) -> str:
"""
Normalize title → filesystem-safe ASCII (underscores for spaces, remove symbols/emojis).
"""
# Normalize unicode (NFKD) and remove non-ASCII
title = unicodedata.normalize("NFKD", title)
title = title.encode("ascii", "ignore").decode("ascii")
# Replace invalid filename chars with underscores
title = re.sub(r'[\\/*?:"<>|]', "_", title)
# Replace whitespace and repeated underscores
title = re.sub(r"\s+", "_", title)
title = re.sub(r"_+", "_", title)
return title.strip("_")
def download_youtube_video(url: str) -> Path:
"""
Download a YouTube video at highest available quality into current working directory.
Normalizes title for filesystem safety.
Returns Path to downloaded MP4.
"""
print(f"🎥 Downloading YouTube video → {url}")
# Probe video title first (quietly)
try:
title_cmd = ["/opt/homebrew/bin/yt-dlp", "--get-title", url]
title = subprocess.check_output(title_cmd, text=True).strip()
except Exception:
title = "video"
safe_title = sanitize_filename(title)
output_template = f"{safe_title}.%(ext)s"
if Path(f"{safe_title}.mp4").exists():
print(f"✅ Video already downloaded → {safe_title}.mp4")
return Path(f"{safe_title}.mp4")
cmd = [
"/opt/homebrew/bin/yt-dlp",
"-f", "bestvideo+bestaudio/best",
"--merge-output-format", "mp4",
"-o", output_template,
url
]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
filename = None
for line in process.stdout:
line = line.strip()
if line:
print(line)
# detect merger output
if "[Merger]" in line and ".mp4" in line:
match = re.search(r"‘(.+\.mp4)’|\"(.+\.mp4)\"", line)
if match:
filename = match.group(1) or match.group(2)
process.wait()
if process.returncode != 0:
print("❌ yt-dlp download failed.")
sys.exit(1)
# fallback: find most recent mp4
if not filename:
files = sorted(Path.cwd().glob(f"{safe_title}*.mp4"), key=lambda f: f.stat().st_mtime, reverse=True)
if not files:
sys.exit("❌ No MP4 file found after download.")
filename = str(files[0])
print(f"✅ Download complete → {filename}")
return Path(filename)
def is_whitelisted_word( word ):
word = word.lower().strip()
for w in DEFAULT_WHITELIST:
w = w.lower().strip()
if not w:
continue
if re.fullmatch(rf"{re.escape(w)}(s|es|ed|ing)?", word):
return True
# 2️⃣ Levenshtein similarity check
sim = Levenshtein.normalized_similarity(word, w)
if sim >= DEFAULT_LEVENSHTEIN_THRESHOLD:
return True
return False
def create_16k_wav_audio( input_path , output_path ):
try:
cmd = [
"ffmpeg" , "-y" ,
"-i", input_path ,
"-vn" , "-ac" , "1" , "-ar" , "16000" ,
output_path
]
subprocess.run( cmd , stdout=subprocess.DEVNULL , stderr=subprocess.DEVNULL , check=True )
return True
except Exception as e:
print( e )
return False
def transcribe_wav_audio( input_path , output_path ):
try:
duration = get_wav_duration( input_path )
model = WhisperModel(
DEFAULT_MODEL ,
device="cuda" if torch.cuda.is_available() else "cpu" ,
compute_type=DEFAULT_COMPUTE ,
)
segments_gen, info = model.transcribe(
input_path ,
beam_size=5 ,
vad_filter=True ,
word_timestamps=True ,
condition_on_previous_text=False ,
)
segments = []
with tqdm( total=duration , unit="s" , desc="Transcribing Audio" , ncols=100 , dynamic_ncols=True ) as pbar:
last_progress = 0.0
for seg in segments_gen:
text = ( seg.text or "" ).strip()
text_profanity_prob = float( predict_prob( [ text ] )[ 0 ] )
if text:
tqdm.write( text )
words = []
if seg.words:
for w in seg.words:
if w.start is None or w.end is None:
continue
_word = w.word.strip()
_word_profanity_prob = float( predict_prob( [ _word ] )[ 0 ] )
words.append({
"word": _word ,
"start": float( w.start ) ,
"end": float( w.end ) ,
"profanity_prob": _word_profanity_prob
})
segments.append({
"start": float( seg.start ) ,
"end": float( seg.end ) ,
"text": text ,
"profanity_prob": text_profanity_prob ,
"words": words
})
write_json( output_path , segments )
current_progress = min( seg.end , duration )
pbar.update( current_progress - last_progress )
last_progress = current_progress
return segments
except Exception as e:
print( e )
return False
def is_profane_word( word , censor_list ):
for c in censor_list:
c = c.lower().strip()
if not c:
continue
# exact or morphological variants
if re.fullmatch(rf"{re.escape(c)}(s|es|ed|ing)?", word):
return True
# compound forms (e.g. "bullshit", "motherfucker")
if re.search(rf"\b{re.escape(c)}(er|ers|ing|ed|in|n|a|o|y)?\b", word):
return True
# skip innocent substrings inside larger clean words
if c in word and not (word.startswith(c) or word.endswith(c)):
continue
return False
def fuzzy_profane_score( word , censor_list ):
if not isinstance( word , str ):
return 0.0
for c in censor_list:
if not isinstance( c , str ):
continue
return fuzz.ratio( word , c )
return 0.0
def compute_muted_segments(transcription_json, muted_segments_json):
transcription = read_json(transcription_json)
censor_list = [c.strip().lower() for c in read_text("./censor.txt") if c.strip()]
mute_spans = []
def _norm(s):
# txt = w.get("word", "").lower().strip().translate(str.maketrans('', '', string.punctuation))
return re.sub(rf"[{re.escape(string.punctuation)}]", "", (s or "").lower().strip())
for seg in transcription:
seg_words = seg.get("words", [])
if not seg_words:
continue
word_texts = [_norm(w.get("word", "")) for w in seg_words]
# ─────────────────────────────────────────────
# 🔹 Multi-word phrase detection (Levenshtein)
# ─────────────────────────────────────────────
for phrase in censor_list:
if " " not in phrase:
continue # skip single words here
phrase_norm = _norm(phrase)
phrase_tokens = [t for t in phrase_norm.split() if t]
if len(phrase_tokens) < 2:
continue
window_size = len(phrase_tokens)
for i in range(0, len(word_texts) - window_size + 1):
window_text = " ".join(word_texts[i:i + window_size])
ratio = Levenshtein.normalized_similarity(window_text, phrase_norm)
if ratio >= DEFAULT_LEVENSHTEIN_THRESHOLD:
s = max(0.0, float(seg_words[i]["start"]) - DEFAULT_PADDING_MS / 1000.0)
e = float(seg_words[i + window_size - 1]["end"]) + DEFAULT_PADDING_MS / 1000.0
if e - s < DEFAULT_MIN_SEG_MS / 1000.0:
e = s + DEFAULT_MIN_SEG_MS / 1000.0
print(f"🔇 phrase match: '{phrase}' (lev={ratio:.2f}) at {s:.2f}-{e:.2f}s")
mute_spans.append((s, e))
break # stop after first hit per phrase per segment
# ─────────────────────────────────────────────
# 🔹 Single-word profanity detection (Levenshtein)
# ─────────────────────────────────────────────
for w in seg_words:
txt = _norm(w.get("word", ""))
p = w.get("profanity_prob", 0.0)
# if txt in DEFAULT_WHITELIST or p < DEFAULT_PROFANITY_THRESHOLD:
# continue
if is_whitelisted_word( txt ):
continue
if p < DEFAULT_PROFANITY_THRESHOLD:
continue
# morphological direct hit
if is_profane_word(txt, censor_list):
pass
else:
# Levenshtein check against all censored entries
best_sim = max((Levenshtein.normalized_similarity(txt, c) for c in censor_list), default=0)
if best_sim < DEFAULT_LEVENSHTEIN_THRESHOLD:
continue
s = max(0.0, float(w["start"]) - DEFAULT_PADDING_MS / 1000.0)
e = float(w["end"]) + DEFAULT_PADDING_MS / 1000.0
if e - s < DEFAULT_MIN_SEG_MS / 1000.0:
e = s + DEFAULT_MIN_SEG_MS / 1000.0
print(f"🔇 word match: '{txt}' lev≥{DEFAULT_LEVENSHTEIN_THRESHOLD} p={p:.2f} , at {s:.2f}-{e:.2f}s")
mute_spans.append((s, e))
# ─────────────────────────────────────────────
# 🔹 Merge overlapping intervals
# ─────────────────────────────────────────────
mute_spans.sort()
merged = []
for s, e in mute_spans:
if not merged or s - merged[-1][1] > DEFAULT_MERGE_MS / 1000.0:
merged.append([s, e])
else:
merged[-1][1] = max(merged[-1][1], e)
write_json(muted_segments_json, merged)
return merged
def mute_profanity_in_video( input_video , muted_segments , output_video ):
muted_segments = read_json( muted_segments )
# build one simple volume filter
filter_expr = ",".join(
f"volume=enable='between(t,{s:.3f},{e:.3f})':volume=0"
for s, e in muted_segments
)
# get duration
try:
dur = float(
subprocess.run(
["ffprobe","-v","error","-show_entries","format=duration",
"-of","default=nk=1:nw=1",input_video],
stdout=subprocess.PIPE, text=True, check=True
).stdout.strip()
)
except Exception:
dur = 1.0
cmd = [
"ffmpeg","-y",
"-i",input_video,
"-filter:a",filter_expr,
"-c:v","copy",
"-c:a","aac",
"-b:a", "192k",
"-shortest",
output_video
]
print("🎬 Muting audio (progress follows)…")
proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True)
time_re = re.compile(r"time=(\d+):(\d+):([\d.]+)")
last = 0
for line in proc.stderr:
m = time_re.search(line)
if m:
h, m_, s = map(float, m.groups())
t = h*3600 + m_*60 + s
if t - last >= 2:
pct = min(t/dur*100, 100)
print(f"[ {t:7.2f} / {dur:7.2f}s ] {pct:5.1f}%")
last = t
proc.wait()
print(f"[ {dur:7.2f} / {dur:7.2f}s ] 100.0%")
if proc.returncode == 0:
print(f"✅ Created censored video → {output_video}")
return True
print("❌ ffmpeg failed")
return False
if __name__ == "__main__":
arg = sys.argv[ 1 ]
if re.match( r"^https?://(www\.)?(youtube\.com|youtu\.be)/" , arg , re.I ):
input_file = download_youtube_video( arg )
else:
input_file = Path( arg )
wav_audio_file = input_file.with_suffix( ".wav" )
transcription_file = input_file.with_suffix( ".json" )
muted_segments_file = input_file.with_name( input_file.stem + "_muted_segments.json" )
final_video_file = input_file.with_name( input_file.stem + "_censored.mp4" )
if wav_audio_file.exists() == False:
print( "Creating 16K WAV Audio for Transcription" )
create_16k_wav_audio( str( input_file ) , str( wav_audio_file ) )
if transcription_file.exists() == False:
print( "Transcribing Audio" )
transcribe_wav_audio( str( wav_audio_file ) , str( transcription_file ) )
# if muted_segments_file.exists() == False:
# print( "Computing Mute Segments" )
# compute_muted_segments( str( transcription_file ) , str( muted_segments_file ) )
# if final_video_file.exists() == False:
# print( "Transcoding Video with Muted Audio" )
# mute_profanity_in_video( str( input_file ) , str( muted_segments_file ) , str( final_video_file ) )
compute_muted_segments( str( transcription_file ) , str( muted_segments_file ) )
mute_profanity_in_video( str( input_file ) , str( muted_segments_file ) , str( final_video_file ) )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment