Created
August 2, 2020 21:17
-
-
Save jedypod/448ddf8bd2fecb8c195b27240f67727e to your computer and use it in GitHub Desktop.
opusencdir is a Python tool to recursively encode contents of source directory into Opus audio files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os, sys, re, shutil | |
import logging | |
import threading | |
import concurrent.futures | |
import multiprocessing | |
import argparse, shlex | |
import subprocess | |
""" | |
opusencdir is a Python tool to recursively encode contents of source directory into Opus audio files. | |
Support multithreading. | |
""" | |
MUSICDB = "/cave/music/db" # This path will be split off of the source directory when creating the subdirectories in the output path. | |
THREADS = int(multiprocessing.cpu_count() / 2) | |
SUPPORTED_FORMATS = ['flac'] | |
DEFAULT_OUTPUT = '/cave/music/db-opus' | |
logging.basicConfig( | |
format="%(asctime)s: %(message)s", | |
level=logging.INFO, | |
datefmt="%H:%M:%S" | |
) | |
log = logging.getLogger(__name__) | |
def which(program): | |
def is_exe(fpath): | |
return os.path.isfile(fpath) and os.access(fpath, os.X_OK) | |
fpath, fname = os.path.split(program) | |
if fpath: | |
if is_exe(program): | |
return program | |
else: | |
for path in os.environ["PATH"].split(os.pathsep): | |
exe_file = os.path.join(path, program) | |
if is_exe(exe_file): | |
return exe_file | |
return None | |
class OpusEnc(): | |
def __init__(self): | |
""" | |
Initial setup: get and validate arguments | |
""" | |
parser = argparse.ArgumentParser(description='Encode directory structure to Opus.') | |
parser.add_argument("input_paths", | |
help="Source(s) to process. Can one or more directories or raw images.", | |
type=str, | |
action='store', | |
nargs='+') | |
parser.add_argument("-f", "--filter", | |
help="Regex to filter files. \n" | |
"Can be comma separated list.", | |
required=False) | |
parser.add_argument("-j", "--jobs", | |
help="Number of simultaneous jobs. (Threads / 2 if not specified).", | |
type=int, | |
required=False) | |
parser.add_argument("--dryrun", | |
help="Don't actually do anything, just print what would be done.", | |
action="store_true", | |
required=False) | |
parser.add_argument("-w", "--overwrite", | |
help="If output file exists, overwite it.", | |
action="store_true", | |
required=False) | |
parser.add_argument("-o", "--output", | |
help="Output directory. If not specified, the default output directory will be used.", | |
required=False) | |
parser.add_argument("-v", "--verbose", | |
help="Verbose output.", | |
action="store_true", | |
required=False) | |
self.input_dirs = list() | |
self.audio_files = list() | |
# Show help if no args. | |
if len(sys.argv)==1: | |
parser.print_help() | |
return | |
self.supported_formats = SUPPORTED_FORMATS + [r.upper() for r in SUPPORTED_FORMATS] | |
# Gather and validate arguments | |
args = parser.parse_args() | |
# Set verbose | |
if args.verbose: | |
log.setLevel(logging.DEBUG) | |
# Search filter String | |
if args.filter: | |
if "," in args.filter: | |
filter_items = [i.strip for i in args.filter.rsplit(',')] | |
else: | |
filter_items = [args.filter.strip()] | |
else: | |
filter_items = list() | |
self.filter_patterns = list() | |
if filter_items: | |
for pattern in filter_items: | |
self.filter_patterns.append(re.compile(pattern)) | |
# Get Overwrite | |
if args.overwrite: | |
self.overwrite = True | |
else: | |
self.overwrite = False | |
# Check for needed binaries | |
opusenc_exists = subprocess.call('which "{0}"'.format('opusenc'), shell=True) | |
if opusenc_exists != 0: | |
log.error('Error: opusenc not found. Exiting...') | |
return | |
self.opusenc_path = which('opusenc') | |
log.debug('opusenc path is: {0}'.format(self.opusenc_path)) | |
# Input paths | |
if args.input_paths: | |
for input_path in args.input_paths: | |
# Support ~ and relative paths | |
input_path = os.path.expanduser(input_path) | |
input_path = os.path.realpath(input_path) | |
if os.path.isdir(input_path): | |
self.input_dirs.append(input_path) | |
# Output directory | |
if args.output: | |
self.dst = args.output | |
self.dst = os.path.expanduser(self.dst) | |
self.dst = os.path.realpath(self.dst) | |
if not os.path.isdir(self.dst): | |
os.makedirs(self.dst) | |
else: | |
self.dst = None | |
# Dryrun | |
if args.dryrun: | |
self.dryrun = True | |
else: | |
self.dryrun = False | |
return | |
def gather_audio_files(self): | |
""" | |
Find and validate all files in basedir. Populate self.audio_files | |
:param self.input_dirs: (list) List of input directories to recursively search | |
""" | |
# Find all files in input dirs | |
for input_dir in self.input_dirs: | |
for root, directories, filenames in os.walk(input_dir): | |
for filename in filenames: | |
fpath = os.path.join(root, filename) | |
if os.path.isfile(fpath): | |
if "." in filename: | |
filename_ext = filename.split('.')[-1] | |
else: | |
filename_ext = None | |
if filename_ext in self.supported_formats: | |
# filter based on filter_list if it exists | |
if self.filter_patterns: | |
log.debug("Checking for pattern in {0}".format(fpath)) | |
for pattern in self.filter_patterns: | |
match = pattern.search(fpath) | |
if match: | |
log.debug("Match: {0}".format(match.group())) | |
if fpath not in self.audio_files: | |
self.audio_files.append(fpath) | |
else: | |
if fpath not in self.audio_files: | |
self.audio_files.append(fpath) | |
# sort by name | |
self.audio_files.sort() | |
log.debug("\n".join(self.audio_files)) | |
return | |
def process(self, audio_file): | |
""" | |
Encode audio file into opus | |
:param audio_file: (String) full path to raw file to convert. | |
""" | |
dirname, filename = os.path.split(audio_file) | |
filename_base, filename_ext = os.path.splitext(filename) | |
new_filename = filename_base | |
# Get destination directory | |
if self.dst: | |
destdir = self.dst | |
else: | |
destdir = DEFAULT_OUTPUT | |
# add opus file extension | |
new_filename += ".opus" | |
log.debug('dirname: {0}'.format(dirname)) | |
# construct intermediate directories | |
if MUSICDB in dirname: | |
album_dir = dirname.split(MUSICDB)[-1] | |
if not album_dir.startswith(os.path.sep): | |
album_dir = os.path.sep + album_dir | |
else: | |
log.warning('MUSICDB not in album dir: \n\t{0}\n\t{1}'.format(MUSICDB, dirname)) | |
album_dir = '' | |
log.debug('album dir:\n\t{0}'.format(album_dir)) | |
destdir = destdir + album_dir | |
log.debug('new output dir: \n\t{0}'.format(destdir)) | |
if not os.path.exists(destdir): | |
if not self.dryrun: | |
os.makedirs(destdir) | |
# full output filepath | |
output_file = os.path.join(destdir, new_filename) | |
log.debug("{0}\n\t{1}\n{2} {3}".format(dirname, filename, filename_base, filename_ext)) | |
log.info("Processing: \n\t{0}\n-->\t{1}".format(audio_file, output_file)) | |
if not self.dryrun: | |
# Process audio file | |
# Skip existing | |
if os.path.isfile(output_file): | |
if not self.overwrite: | |
log.info('Output file exists - skipping: \n\t{0}'.format(output_file)) | |
return | |
opus_cmd = '{0} --music --bitrate 128 "{1}" "{2}"'.format( | |
self.opusenc_path, | |
audio_file, | |
output_file | |
) | |
log.debug(opus_cmd) | |
return_code = subprocess.call(opus_cmd, shell=True) | |
if return_code is not 0: | |
log.error("Error! RT:\t{0}".format(return_code)) | |
if not os.path.isfile(output_file): | |
log.error("Error: Output file did not generate:\n\t{0}".format(output_file)) | |
return | |
def start(self): | |
""" | |
Start doing work | |
""" | |
if not self.input_dirs and not self.audio_files: | |
log.error("No inputs found.") | |
return | |
self.gather_audio_files() | |
# Multithreaded processing | |
with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as executor: | |
executor.map(self.process, self.audio_files) | |
if __name__=="__main__": | |
r = OpusEnc() | |
r.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment