Skip to content

Instantly share code, notes, and snippets.

@izquiratops
Last active July 16, 2019 09:28
Show Gist options
  • Save izquiratops/d1e6ee6687e4ac8c236f682ebd4fa536 to your computer and use it in GitHub Desktop.
Save izquiratops/d1e6ee6687e4ac8c236f682ebd4fa536 to your computer and use it in GitHub Desktop.
Generador de videos para LSE
import os, glob, collections, datetime, subprocess
from dateutil.relativedelta import relativedelta
# Para un futuro uso es recomendable usar argparse o un fichero de configuración
analyzed_dir = '/media/ugiat/dd1/rtve/signantes/analyzed/'
timecodes_dir = '/media/ugiat/Drive/Signantes_Database/TCs/'
output_dir = '/media/ugiat/Drive/Signantes_Database/Output/'
# Diccionario de Delays - dateList yyyymmdd
# Se considera que todas las semanas incluyen como mínimo un signante para cada uno de los 7 días!!!
dateList = ['20190415','20190429','20190506','20190513','20190520','20190603','20190610','20190617','20190624']
delayList = [20,28,31,34,38,45,49,53,56]
# Utilties
def add_hh_mm_ss_to_date(current_date, hh, mm, ss):
if type(hh) != int:
hh = int(hh)
if type(mm) != int:
mm = int(mm)
if type(ss) != int:
ss = int(ss)
c = current_date + relativedelta(seconds=ss)
c = c + relativedelta(minutes=mm)
c = c + relativedelta(hours=hh)
return c
def revoke_delay(day, timecode, delay):
hh, mm, ss = timecode.rsplit(':')
timecode = add_hh_mm_ss_to_date(day, hh, mm, int(ss) - delay)
return str(timecode.hour).zfill(2) + ":" + str(timecode.minute).zfill(2) + ":" + str(timecode.second).zfill(2)
def get_month_to_int(month):
if month == 'enero':
return 1
elif month == 'febrero':
return 2
elif month == 'marzo':
return 3
elif month == 'abril':
return 4
elif month == 'mayo':
return 5
elif month == 'junio':
return 6
elif month == 'julio':
return 7
elif month == 'agosto':
return 8
elif month == 'septiembre':
return 9
elif month == 'octubre':
return 10
elif month == 'noviembre':
return 11
elif month == 'diciembre':
return 12
def parse_tc(tc):
splits_tc = tc.split('-')
return int(splits_tc[0]), int(splits_tc[1]), int(splits_tc[2])
def time_between(d1, d2):
dateTimeDifference = d2 - d1
return dateTimeDifference
def get_date_and_time_from_rtve_name(file_name_string):
splits_name = file_name_string.split('.')[0].split('_')
day = int(splits_name[2])
month = get_month_to_int(splits_name[3])
year = int(splits_name[4])
current_date = datetime.date(year, month, day)
hh, mm, ss = parse_tc(splits_name[6])
extra_hours = int(splits_name[-1])
actual_date = add_hh_mm_ss_to_date(current_date, (hh + extra_hours), mm, ss)
return actual_date
# Core Funcs
def getting_signantes_by_week(weeks, dateList, delayList):
for date, delay in zip(dateList, delayList):
# Append Delay
weeks[date].append(delay)
# Append Dates (in datetime)
year, month, day = int(date[0:4]), int(date[4:6]), int(date[6:8])
days_of_week = []
for i in range(7):
current_date = datetime.date(year, month, day)
current_date = add_hh_mm_ss_to_date(current_date, 24 * i, 0, 0)
days_of_week.append(current_date)
weeks[date].append(days_of_week)
# Append Dates (in string yyyy-mm-dd)
days_of_week_str = []
for i in range(7):
current_date = days_of_week[i]
current_date = str(current_date.year) + "-" + str(current_date.month).zfill(2) + "-" + str(current_date.day).zfill(2)
days_of_week_str.append(current_date)
weeks[date].append(days_of_week_str)
# Append Annots by day
timecodes_namefile = glob.glob(timecodes_dir + days_of_week_str[0] + '*.txt')
if len(timecodes_namefile) != 1:
print(f'ERROR buscando timecodes para ' + date)
pass
timecodes_file = open(timecodes_namefile[0], 'r')
# Extraer todos los timecodes de la semana
annotations = []
while True:
line = timecodes_file.readline()
if not line:
break
else:
line = line.rsplit(' ')
annotations.append(line)
# Para cada dia de la semana
tcs_week = []
for i in range(len(days_of_week_str)):
tcs_current = []
# Buscar entre todas las anotaciones
for x in range(len(annotations)):
# Y añadir a una lista las que pertenezcan a ese dia
if days_of_week_str[i] == annotations[x][0]:
tc_in = revoke_delay(days_of_week[i], annotations[x][1], delay)
tc_out = revoke_delay(days_of_week[i], annotations[x][2], delay)
tcs_current.append((tc_in, tc_out))
tcs_week.append(tcs_current)
weeks[date].append(tcs_week)
return weeks
def exec_ffmpeg(inputList, date_tc_in, date_tc_out, output_filename, vcodec = 'libx264', acodec = 'aac', muted = 'true'):
# Cropping video
if len(inputList) == 1:
timestamp_in = time_between(inputList[0][1],date_tc_in)
timestamp_out = time_between(inputList[0][1],date_tc_out)
video_duration = time_between(timestamp_in, timestamp_out)
if muted:
cmd = '/usr/local/bin/ffmpeg -hide_banner -loglevel panic -ss {} -i {} -t {} -vcodec {} -acodec {} {}'.format(timestamp_in,inputList[0][0],video_duration,vcodec,acodec,output_dir+output_filename)
else:
cmd = '/usr/local/bin/ffmpeg -ss {} -i {} -t {} -vcodec {} -acodec {} {}'.format(timestamp_in,inputList[0][0],video_duration,vcodec,acodec,output_dir+output_filename)
os.system(cmd)
# If the TC is made of many vids then we have to crop the start and end, then merging all together
elif len(inputList) > 1:
timestamp_in = time_between(inputList[0][1],date_tc_in)
timestamp_out = time_between(inputList[-1][1],date_tc_out)
cmd = 'ffprobe -i {} -show_entries format=duration -v quiet -of csv="p=0"'.format(inputList[0][0])
first_video_duration = float(subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT))
first_video_duration = datetime.timedelta(seconds=int(first_video_duration))
first_video_duration = time_between(timestamp_in, first_video_duration)
if muted:
cmd = '/usr/local/bin/ffmpeg -hide_banner -loglevel panic -ss {} -i {} -t {} -vcodec {} -acodec {} {}'.format(timestamp_in,inputList[0][0],first_video_duration,vcodec,acodec,'crop_in.mp4')
else:
cmd = '/usr/local/bin/ffmpeg -ss {} -i {} -t {} -vcodec {} -acodec {} {}'.format(timestamp_in,inputList[0][0],first_video_duration,vcodec,acodec,'crop_in.mp4')
os.system(cmd)
if muted:
cmd = '/usr/local/bin/ffmpeg -hide_banner -loglevel panic -ss {} -i {} -t {} -vcodec {} -acodec {} {}'.format('00:00:00',inputList[-1][0],timestamp_out,vcodec,acodec,'crop_out.mp4')
else:
cmd = '/usr/local/bin/ffmpeg -ss {} -i {} -t {} -vcodec {} -acodec {} {}'.format('00:00:00',inputList[-1][0],timestamp_out,vcodec,acodec,'crop_out.mp4')
os.system(cmd)
f = open('files_to_merge.txt', 'w+')
f.write('file crop_in.mp4\n')
for video in inputList[1:-1]:
f.write('file %s\n' % (video[0]))
f.write('file crop_out.mp4\n')
f.close()
cmd = '/usr/local/bin/ffmpeg -f concat -safe 0 -i %s -c copy %s' % ('files_to_merge.txt',output_dir+output_filename)
os.system(cmd)
cmd = 'rm -r crop_in.mp4 crop_out.mp4 files_to_merge.txt'
os.system(cmd)
def processing_videos(weeks, date, log_file):
print(f'Procesando nueva semana, desde ' + weeks[date][2][0] + ' hasta ' + weeks[date][2][6])
videoList = []
for i in range(7):
videoList.extend(sorted(glob.glob(analyzed_dir + weeks[date][2][i] + '/**/*.mp4')))
for i, video_path in enumerate(videoList):
video_date = get_date_and_time_from_rtve_name(video_path.split('/')[-1])
# IMPORTANTE usar 2 parentesis, sino realiza mal el sorting
videoList[i] = ((video_path,video_date))
del video_path, video_date
# videoList es una array de tuples: (Nombre del Video, Fecha en Datetime)
videoList = sorted(videoList, key=lambda x: x[1])
if os.path.isdir(output_dir + 'week_' + date) == False:
os.makedirs(output_dir + 'week_' + date)
# Para cada día de la semana
for i in range(7):
# Para cada timecode del día
for j in range(len(weeks[date][3][i])):
print(f'Día {i+1}/7 | Signante {j+1}/{len(weeks[date][3][i])}')
hh, mm, ss = weeks[date][3][i][j][0].split(':')
date_tc_in = add_hh_mm_ss_to_date(weeks[date][1][i], hh, mm, ss)
hh, mm, ss = weeks[date][3][i][j][1].split(':')
date_tc_out = add_hh_mm_ss_to_date(weeks[date][1][i], hh, mm, ss)
# Busca la cadena de videos que forman la anotación, se guarda en 'list_of_videos_to_merge'
for idx in range(len(videoList[:])):
if idx == 0 and videoList[0][1] <= date_tc_in < videoList[1][1]:
idx_In_Time = idx
break
if idx == len(videoList)-1 and videoList[-1][1] <= date_tc_in:
idx_In_Time = idx
break
if videoList[idx-1][1] <= date_tc_in < videoList[idx+1][1]:
idx_In_Time = idx
break
for idx in range(len(videoList[:])):
if idx == 0 and videoList[0][1] <= date_tc_out < videoList[1][1]:
idx_Out_Time = idx
break
if idx == len(videoList)-1 and videoList[-1][1] <= date_tc_out:
idx_Out_Time = idx
break
if videoList[idx-1][1] <= date_tc_out < videoList[idx+1][1]:
idx_Out_Time = idx
break
list_of_videos_to_merge = videoList[idx_In_Time:idx_Out_Time+1]
output_filename = 'week_' + date + '/' + weeks[date][2][i][-2:] + '_' + weeks[date][3][i][j][0] + '_' + weeks[date][3][i][j][1] + '.mp4'
# FFMPEG TIME!
if len(list_of_videos_to_merge) == 0:
log_file.write('{output_filename}\n')
else:
# vcodec libx264 | acodec aac)
exec_ffmpeg(list_of_videos_to_merge,date_tc_in,date_tc_out,output_filename)
return
# 'weeks' es una array de elementos que corresponden a cada semana:
# 1. Delay (segundos)
# 2. Día de la semana (Datetime)
# 3. Día de la semana (yyyy-mm-dd)
# 4. Array con los span de tiempo donde aparece signante
# ('03:20:50', '04:11:00'), ...
weeks = collections.defaultdict(list)
weeks = getting_signantes_by_week(weeks, dateList, delayList)
log_file = open('error_log.txt', 'w+')
log_file.truncate()
for date in dateList:
processing_videos(weeks, date, log_file)
log_file.close()
print(f'Done!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment