Last active
February 27, 2020 20:33
-
-
Save tanimislam/12f9c5af4f56f60c0449a44a4fc2334d to your computer and use it in GitHub Desktop.
This converts an MP4 movie, with SRT file, and name and year into an MKV file with English subtitles
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# don't upload the database | |
# app.db | |
*.core | |
# Byte-compiled / optimized / DLL files | |
__pycache__/ | |
*.py[cod] | |
# MS word and latex extensions | |
*.bak | |
*.aux | |
*.log | |
*.lng | |
*.out | |
# C extensions | |
*.so | |
# Distribution / packaging | |
.Python | |
env/ | |
bin/ | |
build/ | |
develop-eggs/ | |
dist/ | |
eggs/ | |
lib/ | |
lib64/ | |
parts/ | |
sdist/ | |
var/ | |
*.egg-info/ | |
.installed.cfg | |
*.egg | |
# Installer logs | |
pip-log.txt | |
pip-delete-this-directory.txt | |
# Unit test / coverage reports | |
htmlcov/ | |
.tox/ | |
.coverage | |
.cache | |
nosetests.xml | |
coverage.xml | |
# Translations | |
*.mo | |
# Mr Developer | |
.mr.developer.cfg | |
.project | |
.pydevproject | |
# Rope | |
.ropeproject | |
# Django stuff: | |
*.log | |
*.pot | |
# Sphinx documentation | |
docs/_build/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This converts an MP4 movie, with SRT file, and name and year into an MKV file with English subtitles. | |
20-10-2019: now can use HandBrakeCLI to convert the MP4 movie file to an MKV file that is smaller | |
Requires titlecase, mutagen | |
Requires executables: ffmpeg, mkvmerge, HandBrakeCLI | |
""" | |
import mutagen.mp4, time, os, sys, titlecase | |
import uuid, logging, subprocess | |
from distutils.spawn import find_executable | |
from optparse import OptionParser | |
ffmpeg_exec = find_executable( 'ffmpeg' ) | |
mkvmerge_exec = find_executable( 'mkvmerge' ) | |
hcli_exec = find_executable( 'HandBrakeCLI' ) | |
assert( all(map(lambda exec_f: exec_f is not None, | |
( ffmpeg_exec, mkvmerge_exec, hcli_exec ) ) ) ) | |
def convert_mp4_movie( mp4movie, name, year, quality = 28, | |
srtfile = None, | |
delete_files = False ): | |
time0 = time.time( ) | |
assert( os.path.isfile( mp4movie ) ) | |
assert( os.path.basename( mp4movie ).lower( ).endswith( '.mp4' ) ) | |
assert( (quality >= 20 ) and (quality <= 30) ) | |
if srtfile is not None: | |
assert( os.path.isfile( srtfile ) ) | |
assert( os.path.basename( srtfile ).lower( ).endswith('.srt' ) ) | |
# | |
## now create the movie | |
newfile = '%s (%d).mkv' % ( titlecase.titlecase( name ), year ) | |
put_info_mp4movie( mp4movie, name, year ) | |
proc = subprocess.Popen( | |
[ hcli_exec, '-i', mp4movie, '-e', 'x264', '-q', '%d' % quality, | |
'-B', '160', '-o', newfile ], stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
# | |
if srtfile is not None: | |
tmpmkv = '%s.mkv' % '-'.join( str( uuid.uuid4( ) ).split('-')[:2] ) | |
proc = subprocess.Popen( | |
[ | |
mkvmerge_exec, '-o', tmpmkv, newfile, | |
'--language', '0:eng', | |
'--track-name', '0:English', srtfile ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
os.rename( tmpmkv, newfile ) | |
# | |
os.chmod( newfile, 0o644 ) | |
if delete_files: | |
os.remove( mp4movie ) | |
try: os.remove( srtfile ) | |
except: pass | |
logging.info( 'created %s in %0.3f seconds.' % ( | |
newfile, time.time( ) - time0 ) ) | |
def put_info_mp4movie( mp4movie, name, year ): | |
time0 = time.time( ) | |
assert( os.path.isfile( mp4movie ) ) | |
assert( os.path.basename( mp4movie ).lower( ).endswith('.mp4' ) ) | |
mp4tags = mutagen.mp4.MP4( mp4movie ) | |
mp4tags[ '\xa9nam' ] = [ name, ] | |
mp4tags[ '\xa9day' ] = [ '%d' % year, ] | |
mp4tags.save( ) | |
logging.info( 'took %0.3f seconds to add metadata to %s.' % ( | |
time.time( ) - time0, mp4movie ) ) | |
def create_mkv_file( mp4movie, name, year, | |
srtfile = None, | |
delete_files = False ): | |
time0 = time.time( ) | |
assert( os.path.isfile( mp4movie ) ) | |
assert( os.path.basename( mp4movie ).lower( ).endswith('.mp4' ) ) | |
if srtfile is not None: | |
assert( os.path.isfile( srtfile ) ) | |
assert( os.path.basename( srtfile ).lower( ).endswith('.srt' ) ) | |
newfile = '%s (%d).mkv' % ( titlecase.titlecase( name ), year ) | |
put_info_mp4movie( mp4movie, name, year ) | |
proc = subprocess.Popen( | |
[ | |
ffmpeg_exec, '-y', '-i', mp4movie, | |
'-codec', 'copy', "file:%s" % newfile ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
# | |
if srtfile is not None: | |
tmpmkv = '%s.mkv' % '-'.join( str( uuid.uuid4( ) ).split('-')[:2] ) | |
proc = subprocess.Popen( | |
[ | |
mkvmerge_exec, '-o', tmpmkv, newfile, | |
'--language', '0:eng', | |
'--track-name', '0:English', srtfile ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
os.rename( tmpmkv, newfile ) | |
# | |
os.chmod( newfile, 0o644 ) | |
if delete_files: | |
os.remove( mp4movie ) | |
try: os.remove( srtfile ) | |
except: pass | |
logging.info( 'created %s in %0.3f seconds.' % ( newfile, time.time( ) - time0 ) ) | |
if __name__ == '__main__': | |
parser = OptionParser( ) | |
parser.add_option( '--mp4', dest='mp4', type=str, action='store', | |
help = 'Name of the MP4 movie file name.' ) | |
parser.add_option( '--srt', dest='srt', type=str, action='store', | |
help = 'Name of the SRT subtitle file associated with the movie.' ) | |
parser.add_option( '-n', '--name', dest='name', type=str, action='store', | |
help = 'Name of the movie.' ) | |
parser.add_option( '-y', '--year', type=int, action='store', | |
help = 'Year in which the movie was aired.' ) | |
# | |
parser.add_option( '--transform', dest='do_transform', action='store_true', | |
default = False, | |
help = 'If chosen, then use HandBrakeCLI to transform movie to different quality MKV movie.' ) | |
parser.add_option( '-q', '--quality', dest='quality', type=int, action='store', default = 26, | |
help = 'When choosing --transform, the quality of the conversion that HandBrakeCLI uses. Default is 26.' ) | |
# | |
parser.add_option( '--keep', dest='do_delete', action='store_false', default = True, | |
help = 'If chosen, then KEEP the MP4 and SRT files.' ) | |
parser.add_option( '--noinfo', dest='do_info', action='store_false', default = True, | |
help = 'If chosen, then run with NO INFO logging (less debugging).' ) | |
opts, args = parser.parse_args( ) | |
# | |
## error checking | |
assert(not any(map(lambda tok: tok is None, ( opts.mp4, opts.name, opts.year ) ) ) ) | |
assert( os.path.isfile( opts.mp4 ) ) | |
assert( os.path.basename( opts.mp4 ).lower( ).endswith('.mp4' ) ) | |
if opts.srt is not None: | |
assert( os.path.isfile( opts.srt ) ) | |
assert( os.path.basename( opts.srt ).lower( ).endswith('.srt' ) ) | |
# | |
## | |
logger = logging.getLogger( ) | |
if opts.do_info: logger.setLevel( logging.INFO ) | |
if not opts.do_transform: | |
create_mkv_file( | |
opts.mp4, opts.name, opts.year, | |
delete_files = opts.do_delete, | |
srtfile = opts.srt ) | |
else: | |
convert_mp4_movie( | |
opts.mp4, opts.name, opts.year, | |
delete_files = opts.do_delete, | |
srtfile = opts.srt ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os, sys, numpy, requests | |
from plexapi.server import PlexServer | |
sys.path.append( '/mnt/software/sources/pythonics' ) | |
from plexstuff.plexcore import plexcore, get_maximum_matchval | |
def get_plex_server( ): | |
session = requests.Session( ) | |
session.verify = False | |
fullURL, token = plexcore.checkServerCredentials( doLocal = False ) | |
plex = PlexServer( fullURL, token, session = session ) | |
return plex | |
def get_song_tuples( ): | |
alltracks = sorted(filter(lambda line: line.rstrip( ), open( | |
'google_stereolabish_playlist.txt', mode='r', encoding='utf-8').readlines())) | |
all_songs = [ ] | |
for trackLine in alltracks: | |
artist = trackLine.split(' - ')[0].rstrip( ) | |
song = ' - '.join( trackLine.split(' - ')[2:] ).rstrip( ) | |
album = trackLine.split(' - ')[1].rstrip( ) | |
all_songs.append( ( artist, album, song ) ) | |
return set( all_songs ) | |
def fillout_all_tracks( plex ): | |
# | |
## first get the music library | |
musicSection = max(filter(lambda sec: sec.type == 'artist' and | |
'Music' in sec.title, plex.library.sections())) | |
assert( musicSection is not None ) | |
# | |
## go through artist, then album, then track -- filling out media info | |
all_media_found = { } | |
for artist in musicSection.all( ): | |
artistTitle = artist.title | |
for album in artist.albums( ): | |
albumTitle = album.title | |
for track in album.tracks( ): | |
songTitle = track.title | |
all_media_found[ ( artistTitle, albumTitle, songTitle ) ] = track | |
return all_media_found | |
def find_best_matches( song_tuples, all_media_found ): | |
song_tuples_nothere = set( song_tuples ) - set( all_media_found ) | |
assert( len( song_tuples_nothere ) != 0 ) | |
all_media_found_sdict = dict(map( | |
lambda key: ( ' - '.join( key ), key ), all_media_found ) ) | |
songs_nothere = set(map( | |
lambda key: ' - '.join( key ), song_tuples_nothere ) ) | |
bestmatch_dict = { } | |
for song in songs_nothere: | |
bmatch = max(all_media_found_sdict.keys( ), | |
key = lambda song_media: get_maximum_matchval( song_media, song ) ) | |
score = get_maximum_matchval( song, bmatch ) | |
bestmatch_dict[ song ] = ( score, all_media_found_sdict[ bmatch ] ) | |
# | |
return bestmatch_dict | |
def find_matching_media( song_tuples, all_media_found ): | |
song_tuples_nothere = set( song_tuples ) - set( all_media_found ) | |
song_mapping = dict(map(lambda song: ( song, all_media_found[ song ] ), | |
set( song_tuples ) & set( all_media_found ) ) ) | |
bestmatch_dict = find_best_matches( | |
song_tuples_nothere, all_media_found ) | |
for song in bestmatch_dict: | |
bmatch = bestmatch_dict[ song ][ -1 ] | |
song_mapping[ song ] = all_media_found[ bmatch ] | |
return song_mapping |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import reencode_hb, os | |
from optparse import OptionParser | |
parser = OptionParser() | |
parser.add_option('--inputs', dest='inputs', type=str, action='store', | |
help = 'Names of the input mp4 files to join.') | |
parser.add_option('--output', dest='output', type=str, action='store', | |
help = 'Name of the output mp4 file to join.') | |
parser.add_option('--verbose', dest='do_verbose', action='store_true', default=False, | |
help = 'If chosen, verbose output.') | |
parser.add_option('--deleteorig', dest='delete_orig', action='store_true', | |
default = False, help = 'If chosen, delete original mp4 files.') | |
opts, args = parser.parse_args() | |
assert(all([ tok is not None for tok in ( opts.inputs, opts.output ) ])) | |
inputfiles = list(map(lambda fname: os.path.expanduser(fname.strip()), opts.inputs.split(','))) | |
outputfile = os.path.expanduser( opts.output ) | |
print( inputfiles, outputfile ) | |
reencode_hb.join_episodes(inputfiles, outputfile, verbose = opts.do_verbose, | |
deleteOrig = opts.delete_orig) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os, time, filecmp, json, logging | |
import datetime, subprocess, titlecase, uuid | |
from dateutil.relativedelta import relativedelta | |
from optparse import OptionParser | |
from distutils.spawn import find_executable | |
_dt0 = datetime.datetime.fromtimestamp(0.0) | |
def _find_necessary_executables(): | |
exec_dict = {} | |
# | |
## first ffmpeg/avconv exec | |
ffmpeg_exec = None | |
for ffmpeg_exc in ('avconv', 'ffmpeg'): | |
ffmpeg_exec = find_executable(ffmpeg_exc) | |
if ffmpeg_exec is not None: break | |
if ffmpeg_exec is None: return None | |
exec_dict['avconv'] = ffmpeg_exec | |
# | |
## second ffprobe/avprobe | |
ffprobe_exec = None | |
for ffprobe_exc in ( 'avprobe', 'ffprobe' ): | |
ffprobe_exec = find_executable( ffprobe_exc ) | |
if ffprobe_exec is not None: break | |
if ffprobe_exec is None: return None | |
exec_dict[ 'avprobe' ] = ffprobe_exec | |
# | |
## third handbrake exec | |
handbrake_exec = find_executable('HandBrakeCLI') | |
if handbrake_exec is None: return None | |
exec_dict['handbrake'] = handbrake_exec | |
# | |
## fourth mkvmerge exec | |
mkvmerge_exec = find_executable('mkvmerge') | |
if mkvmerge_exec is None: return None | |
exec_dict[ 'mkvmerge' ] = mkvmerge_exec | |
# | |
## all four execs found -- return dict! | |
return exec_dict | |
def get_status_ffprobe( mp4file ): | |
""" | |
Will only return FFPROBE info for MP4 files that have a SINGLE video and SINGLE audio stream. | |
""" | |
assert( os.path.isfile( mp4file ) ) | |
assert( os.path.basename( mp4file ).lower( ).endswith( '.mp4' ) ) | |
exec_dict = _find_necessary_executables( ) | |
assert( exec_dict is not None ) | |
ffprobe_exec = exec_dict[ 'avprobe' ] | |
# | |
## follow recipe documented in https://tanimislamblog.wordpress.com/2018/09/12/ffprobe-to-get-output-in-json-format | |
## /usr/bin/ffprobe -v quiet -show_streams -show_format -print_format json FILENAME | |
proc = subprocess.Popen( | |
[ ffprobe_exec, '-v', 'quiet', '-show_streams', '-show_format', '-print_format', 'json', mp4file ], | |
stdout = subprocess.PIPE, stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
try: | |
data = json.loads( stdout_val ) | |
data_out = { } | |
# | |
## first get video stream. MUST JUST HAVE ONE! | |
video_streams = list( filter( | |
lambda stream: stream[ 'codec_type' ] == 'video', data[ 'streams' ] ) ) | |
if len( video_streams ) != 1: | |
raise ValueError("Error, found %d video streams. Exiting.." % len( video_streams ) ) | |
video_stream = video_streams[ 0 ] | |
data_out[ 'video' ] = { 'width' : video_stream[ 'width' ], 'height' : video_stream[ 'height' ] } | |
# | |
## now get audio stream. MUST JUST HAVE ONE! | |
audio_streams = list( filter( | |
lambda stream: stream[ 'codec_type' ] == 'audio', data[ 'streams' ] ) ) | |
if len( audio_streams ) != 1: | |
raise ValueError("Error, found %d audio streams. Exiting.." % len( audio_streams ) ) | |
audio_stream = audio_streams[ 0 ] | |
data_out[ 'audio' ] = { 'channels' : audio_stream[ 'channels' ] } | |
# | |
return data_out | |
except TypeError: | |
logging.info( 'could not JSONIFY %s. Error message? %s' % ( stdout_val, stderr_val ) ) | |
return None | |
except ValueError as ve: | |
logging.info( 'Error message: %s.' % str( ve ) ) | |
return None | |
def join_episodes_reencode(old_movie_files, new_mp4_file, verbose = False, | |
deleteOrig = True, newSyntax = False): | |
# | |
## now assert that any file in old_movie_files does not end in mp4 | |
#assert not any([ fname.lower().endswith('.mp4') for fname in | |
# old_movie_files ]) | |
exec_dict = _find_necessary_executables() | |
assert(exec_dict is not None) | |
handbrake_exec = exec_dict['handbrake'] | |
old_mp4_files = [] | |
time0 = time.time() | |
for filename in old_movie_files: | |
newfile = os.path.join( | |
os.path.dirname( filename ), | |
'.'.join([ '.'.join( os.path.basename( filename ).split('.')[:-1]), | |
'mp4' ]) ) | |
proc = subprocess.Popen( | |
[ handbrake_exec, '-i', filename, | |
'-o', newfile, '-e', 'x264' ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
if verbose: | |
print( 'processed %s in %0.3f seconds.' % ( | |
filename, time.time() - time0 ) ) | |
old_mp4_files.append( newfile ) | |
if deleteOrig: os.remove( filename ) | |
if os.path.isfile( new_mp4_file ): | |
assert not any([ filecmp.cmp(new_mp4_file, mp4file) for mp4file in old_mp4_files ]) | |
join_episodes( old_mp4_files, new_mp4_file, verbose = verbose, | |
deleteOrig = True, newSyntax = newSyntax ) | |
def join_episodes_usemkv( old_mp4_files, new_mp4_file, deleteOrig = False ): | |
exec_dict = _find_necessary_executables() | |
assert( exec_dict is not None ) | |
if os.path.isfile( new_mp4_file ): | |
assert not any(map(lambda mp4file: filecmp.cmp( new_mp4_file, mp4file ), | |
old_mp4_files ) ) | |
assert( all(map(lambda fname: fname.lower( ).endswith( '.mp4' ), | |
old_mp4_files + [ new_mp4_file, ] ) ) ) | |
prefix = str( uuid.uuid4( ) ) | |
mkvfiles = list(map(lambda idx: '%s_%02d.mkv' % ( prefix, idx ), | |
range( len( old_mp4_files ) ) ) ) | |
probe_info = list(filter(None, map( get_status_ffprobe, old_mp4_files ) ) ) | |
assert( len( probe_info ) == len( old_mp4_files ) ) | |
# | |
## widths and heights are all the same | |
assert( len( set(map(lambda info: info[ 'video' ][ 'width' ], probe_info ) ) ) == 1 ) | |
assert( len( set(map(lambda info: info[ 'video' ][ 'height' ], probe_info ) ) ) == 1 ) | |
assert( len( set(map(lambda info: info[ 'audio' ][ 'channels' ], probe_info ) ) ) == 1 ) | |
# | |
## now actually perform the copying to mkv | |
ffmpeg_exec = exec_dict[ 'avconv' ] | |
for mp4file, mkvfile in zip( old_mp4_files, mkvfiles ): | |
proc = subprocess.Popen( | |
[ ffmpeg_exec, '-y', '-i', mp4file, '-codec', 'copy', mkvfile ], | |
stdout = subprocess.PIPE, stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
# | |
## now create the joined mkvfile from the created ones | |
mkvmerge_exec = exec_dict[ 'mkvmerge' ] | |
exec_concat = [ mkvfiles[ 0 ], ] + list(map(lambda mkvfile: '+%s' % mkvfile, mkvfiles[1:] ) ) | |
exec_final = [ mkvmerge_exec, '-o', '%s.mkv' % prefix, ] + exec_concat | |
proc = subprocess.Popen( exec_final, stdout = subprocess.PIPE, stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
# | |
## now copy the final temp mkvfile to new_mp4_file | |
proc = subprocess.Popen([ ffmpeg_exec, '-y', '-i', '%s.mkv' % prefix, '-codec', 'copy', new_mp4_file ], | |
stdout = subprocess.PIPE, stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
os.chmod( new_mp4_file, 0o644 ) | |
os.remove( '%s.mkv' % prefix ) | |
list(map(lambda mkvfile: os.remove( mkvfile ), mkvfiles ) ) | |
# | |
## if deleting originals | |
if deleteOrig: list(map(lambda fname: os.remove( fname ), old_mp4_files ) ) | |
def join_episodes(old_mp4_files, new_mp4_file, verbose = False, | |
deleteOrig = False, newSyntax = False): | |
if os.path.isfile( new_mp4_file ): | |
assert not any([ filecmp.cmp(new_mp4_file, mp4file) for mp4file in | |
old_mp4_files ]) | |
assert all([ fname.lower().endswith('.mp4') for fname in | |
old_mp4_files + [ new_mp4_file, ] ]) | |
prefix = str( uuid.uuid4() ) | |
tsfiles = [ '%s.%d.ts' % ( prefix, idx ) for | |
idx in range(len(old_mp4_files)) ] | |
exec_dict = _find_necessary_executables() | |
assert(exec_dict is not None) | |
avconv_exec = exec_dict['avconv'] | |
time0 = time.time() | |
for mp4file, tsfile in zip(old_mp4_files, tsfiles): | |
proc = subprocess.Popen( | |
[ | |
avconv_exec, '-i', mp4file, | |
'-c', 'copy', '-bsf:v', | |
'h264_mp4toannexb', '-f', 'mpegts', | |
tsfile ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate( ) | |
if verbose: | |
print('processed %s in %0.3f seconds.' % ( | |
mp4file, time.time() - time0) ) | |
# | |
## now join together | |
if not newSyntax: | |
proc = subprocess.Popen([ avconv_exec, '-f', 'mpegts', '-i', | |
'concat:%s' % ( '|'.join(tsfiles) ), | |
'-c', 'copy', '-absf', 'aac_adtstoasc', | |
new_mp4_file ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
else: | |
proc = subprocess.Popen([ avconv_exec, '-f', 'mpegts', '-i', | |
'concat:%s' % ( '|'.join(tsfiles) ), | |
'-c', 'copy', '-bsf:a', 'aac_adtstoasc', | |
new_mp4_file ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate() | |
list(map(lambda tsfile: os.remove( tsfile ), tsfiles ) ) | |
if deleteOrig: | |
list(map(lambda mp4file: os.remove( mp4file ), old_mp4_files ) ) | |
if verbose: | |
print('processed all files to %s in %0.3f seconds.' % ( | |
new_mp4_file, time.time() - time0 ) ) | |
def reencode_new_mp4( oldmp4file, newmp4file, shouldDelete = True ): | |
assert( oldmp4file.lower( ).endswith( '.mp4' ) ) | |
assert( newmp4file.lower( ).endswith( '.mp4' ) ) | |
assert( os.path.isfile( oldmp4file ) ) | |
try: | |
assert( not filecmp.cmp( oldmp4file, newmp4file ) ) | |
except: | |
pass | |
exec_dict = _find_necessary_executables() | |
assert(exec_dict is not None) | |
handbrake_exec = exec_dict['handbrake'] | |
time0 = time.time( ) | |
proc = subprocess.Popen([ handbrake_exec, '-i', | |
oldmp4file, '-o', newmp4file, '-e', 'x264', | |
'-b', '1200', '-B', '160' ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate() | |
if shouldDelete: os.remove( oldmp4file ) | |
print('processed file %s in %s.' % ( | |
oldmp4file, _format_time( time.time() - time0 ) ) ) | |
def reencode_new_mkv( oldmp4file, newmkvfile, shouldDelete = True ): | |
assert( oldmp4file.lower( ).endswith( '.mp4' ) ) | |
assert( newmkvfile.lower( ).endswith( '.mkv' ) ) | |
assert( os.path.isfile( oldmp4file ) ) | |
try: | |
assert( not filecmp.cmp( oldmp4file, newmkvfile ) ) | |
except: pass | |
exec_dict = _find_necessary_executables() | |
assert(exec_dict is not None) | |
handbrake_exec = exec_dict['handbrake'] | |
time0 = time.time( ) | |
proc = subprocess.Popen( | |
[ handbrake_exec, '-i', | |
oldmp4file, '-o', newmkvfile, '-e', 'x264', | |
'-b', '1000', '-B', '160' ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate() | |
if shouldDelete: os.remove( oldmp4file ) | |
print( 'processed file %s in %s.' % ( | |
oldmp4file, _format_time( time.time() - time0 ) ) ) | |
def reencode_episodes(dirname, format = 'avi', shouldDelete = True): | |
assert(format.lower() != 'mp4') | |
filenames = [] | |
for root, dirs, names in os.walk(dirname): | |
for name in filter(lambda name: name.endswith('.%s' % format), names): | |
filenames.append( os.path.join( root, name ) ) | |
filenames = sorted(filenames) | |
exec_dict = _find_necessary_executables() | |
assert(exec_dict is not None) | |
handbrake_exec = exec_dict['handbrake'] | |
tstart = time.time() | |
for idx, filename in enumerate(filenames): | |
time0 = time.time() | |
dirname = os.path.dirname( filename ) | |
basename = os.path.basename( filename ) | |
basename = '.'.join([ basename.split('.')[0], | |
titlecase.titlecase( | |
'.'.join(basename.split('.')[1:-1] ) ), | |
'mp4' ]) | |
newfile = os.path.join(dirname, basename) | |
proc = subprocess.Popen( | |
[ handbrake_exec, '-i', | |
filename, '-o', newfile, '-e', 'x264' ], | |
stdout = subprocess.PIPE, | |
stderr = subprocess.STDOUT ) | |
stdout_val, stderr_val = proc.communicate() | |
if shouldDelete: os.remove(filename) | |
print('processed file %s in %s.' % ( | |
filename, _format_time( time.time() - time0 ) ) ) | |
def _format_time(tstamp): | |
dt = datetime.datetime.fromtimestamp(tstamp) | |
rd = relativedelta(dt, _dt0) | |
secs = rd.seconds + 1e-6 * rd.microseconds | |
def print_hour(hr): | |
if hr is None: | |
return '' | |
elif hr == 1: | |
return '1 hour' | |
else: | |
return '%d hours' % hr | |
def print_minute(minutes): | |
if minutes is None: | |
return '' | |
elif minutes == 1: | |
return '1 minute' | |
else: | |
return '%d minutes' % minutes | |
def print_day(day): | |
if day is None: | |
return '' | |
if day == 1: | |
return '1 day' | |
else: | |
return '%d days' % day | |
def print_sub(secs, minute = None, hour = None, day = None): | |
mystr = ', '.join( filter(lambda tok: len(tok) != 0, | |
[ print_day(day), print_hour(hour), print_minute(minute) ]) + | |
[ '%0.3f seconds' % secs, ]) | |
return mystr | |
if rd.days == 0: | |
day = None | |
else: | |
day = rd.days | |
if rd.hours == 0: | |
hour = None | |
else: | |
hour = rd.hours | |
if rd.minutes == 0: | |
minute = None | |
else: | |
minute = rd.minutes | |
return print_sub( secs, minute = minute, hour = hour, day = day ) | |
if __name__=='__main__': | |
parser = OptionParser() | |
parser.add_option('--dirname', dest='dirname', type=str, | |
action = 'store', help = 'Name of directory to search.') | |
parser.add_option('--format', dest='format', type=str, default = 'avi', | |
action = 'store', | |
help = 'Format of video files to convert. Default is avi.') | |
parser.add_option('--noDelete', dest='do_noDelete', action='store_true', default = False, | |
help = 'If chosen, do NOT delete the original file.') | |
opts, args = parser.parse_args() | |
assert( opts.dirname is not None) | |
dirname = os.path.expanduser( opts.dirname ) | |
assert( os.path.isdir( dirname ) ) | |
reencode_episodes( dirname, format = opts.format, | |
shouldDelete = not opts.do_noDelete ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pyqt5 | |
qdarkstyle | |
numpy | |
titlecase | |
mutagen | |
matplotlib |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import signal, sys | |
def signal_handler( signal, frame ): | |
print( "You pressed Ctrl+C. Exiting...") | |
sys.exit( 0 ) | |
signal.signal( signal.SIGINT, signal_handler ) | |
# | |
from functools import reduce | |
import logging, os, qdarkstyle, numpy, random | |
mainDir = reduce(lambda x,y: os.path.dirname( x ), range(2), | |
os.path.abspath( __file__ ) ) | |
sys.path.append( mainDir ) | |
# | |
from PyQt5.QtWidgets import * | |
from PyQt5.QtGui import * | |
from matplotlib.figure import Figure | |
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas | |
from matplotlib.patches import Rectangle, Ellipse | |
from optparse import OptionParser | |
class PlotCanvas( FigureCanvas ): | |
def __init__( self, colnums, parent = None, width = 5, height = 3, dpi = 100 ): | |
assert( len( colnums ) >= 1 ) | |
assert(all(map(lambda nums: sum(nums) > 0, colnums))) | |
fig = Figure( figsize = ( width, height ), dpi = dpi ) | |
self.axes = fig.add_subplot( 111 ) | |
fig.set_facecolor( 'black' ) | |
fig.subplots_adjust( hspace=0.01, wspace=0.01 ) | |
self.axes.set_facecolor( 'black' ) | |
# | |
super( PlotCanvas, self ).__init__( fig ) | |
self.setParent( parent ) | |
# | |
# FigureCanvas.setSizePolicy( | |
# self, | |
# QSizePolicy.Expanding, | |
# QSizePolicy.Expanding ) | |
# FigureCanvas.updateGeometry( self ) | |
self.plot( colnums ) | |
def plot( self, colnums ): | |
self.axes.axis( 'off' ) | |
self.axes.set_xlim([ 0, 1 ]) | |
self.axes.set_ylim([ 0, 1 ]) | |
txt = self.axes.set_title( 'PyQt5 Matplotlib Example' ) | |
txt.set_color( 'white' ) | |
totRows = len( colnums ) | |
divRow = 0.98 / totRows | |
deltaY = 0.95 * divRow | |
for idx, nums in enumerate( colnums[::-1] ): | |
arr = numpy.array( nums, dtype=int ) | |
cumarray = numpy.cumsum(numpy.concatenate([ | |
[ 0 ], arr[:-1] ])) | |
totCols = sum( nums ) | |
divCol = 0.98 / totCols | |
for jdx, wdth in enumerate( nums ): | |
if jdx % 2 == 0: color = 'purple' | |
else: color = '#0C4B4F' | |
startX = 0.01 + cumarray[ jdx ] * divCol | |
startY = 0.01 + idx * divRow | |
deltaX = wdth * divCol * 0.95 | |
self.axes.add_patch( | |
Rectangle(( startX, startY ), deltaX, deltaY, | |
linewidth = 0, facecolor = color, edgecolor = None, alpha = 1.0 ) ) | |
self.draw( ) | |
class TestGLWidget( QDialog ): | |
def screenGrab( self ): | |
fname, _ = QFileDialog.getSaveFileName( | |
self, 'Save Pixmap', os.path.expanduser( '~' ), | |
filter = '*.png' ) | |
if len( os.path.basename( fname.strip( ) ) ) == 0: return | |
if not fname.lower( ).endswith( '.png' ): | |
fname = '%s.png' % fname | |
qpm = self.grab( ) | |
qpm.save( fname ) | |
def __init__( self, inputArrTot = '1' ): | |
super( TestGLWidget, self ).__init__( ) | |
self.setModal( True ) | |
# | |
printAction = QAction( self ) | |
printAction.setShortcuts( [ 'Shift+Ctrl+P' ] ) | |
printAction.triggered.connect( self.screenGrab ) | |
self.addAction( printAction ) | |
quitAction = QAction( self ) | |
quitAction.setShortcuts( [ 'Ctrl+Q', 'Esc' ] ) | |
quitAction.triggered.connect( sys.exit ) | |
self.addAction( quitAction ) | |
# | |
self.setStyleSheet(""" | |
QWidget { | |
font-family: Consolas; | |
font-size: 20px; | |
}""" ) | |
self.setFixedWidth( 800 ) | |
# | |
mainLayout = QVBoxLayout( ) | |
self.setLayout( mainLayout ) | |
# | |
colnums = [ ] | |
logging.info( 'inputArrTot = %s,' % inputArrTot ) | |
for inputArr in inputArrTot.strip( ).split(';'): | |
logging.info( 'inputArr = %s.' % inputArr ) | |
try: | |
nums = list(map(lambda tok: int( tok.strip( ) ), | |
inputArr.strip( ).split(','))) | |
assert(all(map(lambda num: num > 0, nums))) | |
except: nums = [ 1 ] | |
colnums.append( nums ) | |
logging.info( 'colnums = %s.' % colnums ) | |
# | |
topWidget = QWidget( self ) | |
topLayout = QHBoxLayout( ) | |
topWidget.setLayout( topLayout ) | |
totstr = '; '.join(map(lambda nums: ', '.join( map(lambda tok: '%d' % tok, nums ) ), | |
colnums ) ) | |
topLayout.addWidget( QLabel( 'QGRID SPACING: %s.' % totstr ) ) | |
mainLayout.addWidget( topWidget ) | |
# | |
testGridLayoutWidget = QWidget( self ) | |
testGL = QGridLayout( ) | |
testGridLayoutWidget.setLayout( testGL ) | |
for rowno, nums in enumerate( colnums ): | |
arr = numpy.array( nums, dtype=int ) | |
cumarray = numpy.cumsum(numpy.concatenate([ | |
[ 0 ], arr[:-1] ])) | |
#cumarray = numpy.concatenate([ | |
# [ 0 ], numpy.cumsum( arr[1:] ) - 1 + arr[0] ]) | |
for idx, wdth in enumerate( nums ): | |
bWidg = QLabel( | |
'%s = %d' % ( chr( idx + 97 ).upper( ), wdth ) ) | |
if idx % 2 == 0: | |
bWidg.setStyleSheet('background-color:purple') | |
else: bWidg.setStyleSheet('background-color:#0C4B4F') | |
testGL.addWidget( bWidg, rowno, cumarray[ idx ], 1, wdth ) | |
#testGL.addWidget( bWidg, rowno, idx, 1, wdth ) | |
mainLayout.addWidget( testGridLayoutWidget ) | |
# | |
## now what it should look like | |
mainLayout.addWidget( PlotCanvas( colnums, self ) ) | |
self.setFixedSize( self.size( ) ) | |
self.show( ) | |
if __name__ == '__main__': | |
parser = OptionParser( ) | |
parser.add_option('--spacing', dest='spacing', action='store', type=str, default = '1', | |
help = 'The spacing for widgets in a QGridLayout. Default is 1.' ) | |
parser.add_option('--debug', dest='do_debug', action='store_true', | |
default = False, help = 'Run debug mode if chosen.') | |
opts, args = parser.parse_args( ) | |
logger = logging.getLogger( ) | |
if opts.do_debug: logger.setLevel( logging.INFO ) | |
app = QApplication([ ]) | |
QFontDatabase.addApplicationFont( 'Consolas.ttf' ) | |
app.setStyleSheet( qdarkstyle.load_stylesheet_pyqt5( ) ) | |
testGLWidget = TestGLWidget( inputArrTot = opts.spacing ) | |
result = app.exec_( ) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment