Skip to content

Instantly share code, notes, and snippets.

@me-suzy
Created April 20, 2025 19:06
Show Gist options
  • Save me-suzy/ecc1292acac2ac7d7d139b45d7f00dee to your computer and use it in GitHub Desktop.
Save me-suzy/ecc1292acac2ac7d7d139b45d7f00dee to your computer and use it in GitHub Desktop.
asfasfa video combina si comprima
#!/usr/bin/env python3
import os
import sys
import subprocess
from moviepy.editor import VideoFileClip, ImageClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips
import re
import gc
import shutil
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_video_info(clip):
return {
'durată': f"{clip.duration:.2f} secunde",
'dimensiune': f"{os.path.getsize(clip.filename)/(1024*1024):.1f} MB",
'rezoluție': f"{clip.size[0]}x{clip.size[1]}",
'fps': f"{clip.fps}"
}
def process_video_compression(video_path, output_file, target_bitrate, original_size):
"""Folosește direct ffmpeg pentru compresie în loc de MoviePy"""
try:
# Calculăm valorile de bitrate pentru ffmpeg
max_rate = f"{int(target_bitrate*1.5/1000)}k"
buf_size = f"{int(target_bitrate*2/1000)}k"
bitrate = f"{int(target_bitrate/1000)}k"
# Comanda ffmpeg pentru compresie
cmd = [
"ffmpeg", "-y", "-i", video_path,
"-c:v", "libx264", "-preset", "faster", "-crf", "23",
"-maxrate", max_rate, "-bufsize", buf_size, "-b:v", bitrate,
"-profile:v", "high", "-level", "4.1", "-pix_fmt", "yuv420p",
"-tune", "film", "-c:a", "aac", "-b:a", "192k",
"-movflags", "+faststart", output_file
]
# Executăm comanda
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_, stderr = process.communicate()
if process.returncode != 0:
print(f"Eroare ffmpeg: {stderr.decode('utf-8', errors='ignore')}")
return False
final_size = os.path.getsize(output_file)
compression_ratio = (1 - final_size/original_size) * 100
print("\n=== Rezultate Compresie ===")
print(f"Dimensiune originală: {original_size/(1024*1024):.1f} MB")
print(f"Dimensiune după compresie: {final_size/(1024*1024):.1f} MB")
print(f"Rata de compresie: {compression_ratio:.1f}%")
return True
except Exception as e:
print(f"\n❌ Eroare la compresie: {str(e)}")
return False
finally:
gc.collect()
def get_media_info(file_path):
"""Obține informații despre fișierul media folosind ffprobe"""
try:
if file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
# Pentru imagini
from PIL import Image
img = Image.open(file_path)
width, height = img.size
img.close()
return {"width": width, "height": height, "fps": 24}
else:
# Pentru video folosim ffprobe
cmd = [
"ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height,r_frame_rate",
"-of", "csv=p=0", file_path
]
output = subprocess.check_output(cmd).decode('utf-8').strip().split(',')
width, height = int(output[0]), int(output[1])
# Calculăm FPS-ul din raportul dat (ex: "24/1")
fps_parts = output[2].split('/')
fps = float(fps_parts[0]) / float(fps_parts[1])
return {"width": width, "height": height, "fps": fps}
except Exception as e:
print(f"Eroare la obținerea informațiilor media: {e}")
# Valorile implicite dacă eșuează
return {"width": 1920, "height": 1080, "fps": 24}
def process_file_individually(args):
"""Procesează fiecare fișier separat, folosind direct ffmpeg"""
file_path, output_dir, index, target_size, target_fps = args
temp_output = os.path.join(output_dir, f"temp_processed_{index}.mp4")
try:
if file_path.lower().endswith(('.mp4')):
# Procesare video
cmd = [
"ffmpeg", "-y", "-i", file_path,
"-vf", f"scale={target_size[0]}:{target_size[1]}:force_original_aspect_ratio=decrease,pad={target_size[0]}:{target_size[1]}:(ow-iw)/2:(oh-ih)/2:color=black",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k", "-r", str(target_fps),
temp_output
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
# Procesare imagine
cmd = [
"ffmpeg", "-y", "-loop", "1", "-i", file_path, "-t", "5",
"-vf", f"scale={target_size[0]}:{target_size[1]}:force_original_aspect_ratio=decrease,pad={target_size[0]}:{target_size[1]}:(ow-iw)/2:(oh-ih)/2:color=black",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-r", str(target_fps),
temp_output
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return temp_output
except Exception as e:
print(f"\n❌ Eroare la procesarea fișierului {os.path.basename(file_path)}: {str(e)}")
return None
def combine_and_compress_media(input_directory, output_file, audio_files=None, image_duration=5):
try:
media_files = [
f for f in os.listdir(input_directory)
if f.lower().endswith(('.mp4', '.jpg', '.jpeg', '.png'))
]
if not media_files:
print("❌ Nu s-au găsit fișiere media în director!")
return False
media_files.sort(key=lambda s: [
int(t) if t.isdigit() else t.lower()
for t in re.split('([0-9]+)', s)
])
print("\nFișiere găsite:", ", ".join(media_files))
# Definim un director temporar personalizat
temp_dir = os.path.join(input_directory, "Temp")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
print(f"Director temporar: {temp_dir}")
# Colectăm informații despre fișiere
print("\nColectez informații despre fișiere...")
resolutions = []
fps_values = []
total_size = 0
for file in media_files:
full_path = os.path.join(input_directory, file)
total_size += os.path.getsize(full_path)
# Obținem informațiile media
info = get_media_info(full_path)
resolutions.append((info["width"], info["height"]))
fps_values.append(info["fps"])
target_width = max([res[0] for res in resolutions])
target_height = max([res[1] for res in resolutions])
target_size = (target_width, target_height)
target_fps = max(fps_values)
print(f"\nRezoluție țintă: {target_size}, FPS țintă: {target_fps}")
# Procesăm fișierele în paralel
print("\nProcesez fișierele în paralel...")
processed_files = []
# Determină numărul de procesori disponibili și folosește jumătate
num_workers = max(multiprocessing.cpu_count() // 2, 1)
print(f"Folosesc {num_workers} thread-uri pentru procesare paralelă")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = []
for idx, file in enumerate(media_files):
full_path = os.path.join(input_directory, file)
futures.append(executor.submit(
process_file_individually,
(full_path, temp_dir, idx, target_size, target_fps)
))
for idx, future in enumerate(as_completed(futures)):
result = future.result()
if result:
processed_files.append(result)
print(f"✓ Procesat {idx+1}/{len(media_files)}: {os.path.basename(result)}")
else:
print(f"✗ Eroare la procesarea {idx+1}/{len(media_files)}")
if not processed_files:
print("Nu s-au putut procesa fișierele media!")
return False
# Combinăm fișierele folosind ffmpeg direct
print("\nCombinez fișierele procesate...")
concat_file = os.path.join(temp_dir, "concat_list.txt")
with open(concat_file, 'w') as f:
for file in processed_files:
f.write(f"file '{os.path.abspath(file)}'\n")
temp_combined = os.path.join(temp_dir, "combined_video.mp4")
subprocess.run(
['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file, '-c', 'copy', temp_combined],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if not os.path.exists(temp_combined):
print("❌ Nu s-a putut crea fișierul combinat!")
return False
# Adăugăm audio dacă este specificat
if audio_files and any(audio_files):
valid_audio = [a for a in audio_files if a and os.path.exists(a)]
if valid_audio:
print("🎵 Adaug fișiere audio pe fundal...")
# Obținem durata video-ului combinat
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "csv=p=0", temp_combined]
video_duration = float(subprocess.check_output(cmd).decode('utf-8').strip())
# Pregătim fișierul audio combinat
temp_audio = os.path.join(temp_dir, "combined_audio.m4a")
if len(valid_audio) == 1:
# Dacă avem un singur fișier audio, îl folosim direct
audio_path = valid_audio[0]
# Verificăm durata audio
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "csv=p=0", audio_path]
audio_duration = float(subprocess.check_output(cmd).decode('utf-8').strip())
if audio_duration < video_duration:
# Repetăm audio-ul până acoperim durata video
cmd = [
"ffmpeg", "-y", "-stream_loop", str(int(video_duration / audio_duration) + 1),
"-i", audio_path, "-t", str(video_duration), "-c:a", "aac", "-b:a", "192k",
temp_audio
]
else:
# Tăiem audio-ul la durata video
cmd = [
"ffmpeg", "-y", "-i", audio_path, "-t", str(video_duration),
"-c:a", "aac", "-b:a", "192k", temp_audio
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
# Dacă avem mai multe fișiere audio, le concatenăm
audio_list_file = os.path.join(temp_dir, "audio_list.txt")
with open(audio_list_file, 'w') as f:
for audio in valid_audio:
f.write(f"file '{os.path.abspath(audio)}'\n")
# Generăm audio combinat
subprocess.run(
['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', audio_list_file,
'-c:a', 'aac', '-b:a', '192k', temp_audio],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Verificăm durata audio combinat
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "csv=p=0", temp_audio]
audio_duration = float(subprocess.check_output(cmd).decode('utf-8').strip())
if audio_duration < video_duration:
# Trebuie să repetăm audio-ul
temp_audio_repeated = os.path.join(temp_dir, "repeated_audio.m4a")
cmd = [
"ffmpeg", "-y", "-stream_loop", str(int(video_duration / audio_duration) + 1),
"-i", temp_audio, "-t", str(video_duration), "-c:a", "aac", "-b:a", "192k",
temp_audio_repeated
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
os.replace(temp_audio_repeated, temp_audio)
# Adăugăm audio-ul la video
video_with_audio = os.path.join(temp_dir, "video_with_audio.mp4")
cmd = [
"ffmpeg", "-y", "-i", temp_combined, "-i", temp_audio,
"-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0",
"-shortest", video_with_audio
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Înlocuim fișierul combinat cu cel cu audio
if os.path.exists(video_with_audio):
os.replace(video_with_audio, temp_combined)
print("✓ Audio adăugat cu succes")
else:
print("⚠️ Avertisment: Nu s-a putut adăuga audio, se continuă fără audio")
else:
print("⚠️ Nu s-au găsit fișiere audio valide")
# Comprimăm video-ul final
print("📦 Comprim video-ul final...")
success = process_video_compression(temp_combined, output_file, 5000*1000, total_size)
return success
except Exception as e:
print(f"❌ Eroare: {str(e)}")
return False
finally:
try:
# Curățăm fișierele temporare, dar păstrăm directorul
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
if os.path.isfile(file_path):
try:
os.remove(file_path)
except:
pass
except:
pass
gc.collect()
def main():
INPUT_PATH = r"g:\Dubai Nou\METRO DUBAI"
AUDIO_FILES = [
r"G:\Muzica\Andorra - A Song For Harmony (Original Mix) (320kbps).mp3",
r"G:\Muzica\Andorra - A Song For Harmony (Original Mix) (320kbps).mp3"
]
# Pentru a salva fără audio, setează:
# AUDIO_FILES = [
# r"",
# r""
# ]
AUDIO_FILES = [af for af in AUDIO_FILES if af]
output_file = os.path.join(INPUT_PATH, "FISIER-SALVAT.mp4")
success = combine_and_compress_media(INPUT_PATH, output_file, AUDIO_FILES, image_duration=5)
if success:
print("\n✅ Procesul s-a finalizat cu succes!")
else:
print("\n❌ Procesul a eșuat!")
if __name__ == "__main__":
gc.collect()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment