Last active
January 13, 2020 22:35
-
-
Save hutattedonmyarm/83609559921d48c4ec02f9c24929cf7a to your computer and use it in GitHub Desktop.
Splits audiobooks (M4B, single MP3, 1 MP3/chapter) into chapter files, but with a minimum of 90 minutes and uploads them to Overcast
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import glob | |
import sys | |
import json | |
import re | |
import requests | |
from bs4 import BeautifulSoup | |
from colorama import Fore | |
from colorama import Style | |
import colorama | |
def login_overcast(): | |
""" | |
Logs into Overcast and returns the session & response | |
""" | |
fields = { | |
'email': OVERCAST_EMAIL, | |
'password': OVERCAST_PASSWORD, | |
'then': 'podcasts' | |
} | |
url = 'https://overcast.fm/login' | |
upload_url = 'https://overcast.fm/uploads' | |
overcast_session = requests.session() | |
overcast_session.post(url, data=fields) | |
login_response = overcast_session.get(upload_url) | |
return (overcast_session, login_response) | |
def upload_file(path, data, overcast_session, prefix): | |
""" | |
Uploads a file to overcast | |
""" | |
s3_key = prefix + os.path.basename(path) | |
audio_files = {'file': open(path, 'rb')} | |
print(f'{Fore.GREEN}Uploading {path}{Style.RESET_ALL}') | |
ul_response = overcast_session.post(upload_to, files=audio_files, data=data) | |
print(f'{Fore.GREEN}Upload response: {ul_response.status_code}{Style.RESET_ALL}') | |
final_response = overcast_session.post( | |
'https://overcast.fm/podcasts/upload_succeeded', | |
data={'key': s3_key}) | |
print(f'{Fore.GREEN}Final response: {final_response.status_code}{Style.RESET_ALL}') | |
def calc_mp3_split(audio_files): | |
""" | |
Reads chapter data from mp3 files | |
and calculates where splits are needed | |
""" | |
audio_files.sort() | |
chapter_data = [[]] | |
idx = 0 | |
dur = 0 | |
size = 0 | |
for audio_file in audio_files: | |
cmd = [ | |
'ffprobe', | |
'-i', | |
'{}'.format(audio_file), | |
'-loglevel', | |
'quiet', | |
'-print_format', | |
'json', | |
'-show_format' | |
] | |
print(f'{Fore.GREEN}') | |
print(' '.join(cmd)) | |
print(f'{Style.RESET_ALL}') | |
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = process.communicate() | |
if err: | |
print('Error probing file {}: {}'.format(audio_file, err), file=sys.stderr) | |
j = json.loads(out) | |
dur += float(j['format']['duration']) | |
already_moved = False | |
if size + float(j['format']['size']) > 1000000000: | |
idx += 1 | |
dur = 0 | |
chapter_data.append([]) | |
size = 0 | |
already_moved = True | |
size += float(j['format']['size']) | |
chapter_data[idx].append(audio_file) | |
if dur > 5400 and not already_moved: | |
idx += 1 | |
dur = 0 | |
size = 0 | |
chapter_data.append([]) | |
print(f'{Fore.GREEN}') | |
print('MP3 split: ', json.dumps(chapter_data)) | |
print(f'{Style.RESET_ALL}') | |
return chapter_data | |
def concat_mp3_files(chapter_data, directory): | |
""" | |
Concatenates the files for upload | |
""" | |
basename = os.path.basename(os.path.abspath(directory)) | |
print(f'{Fore.BLUE}Basename: {basename}{Style.RESET_ALL}') | |
overcast_files = [] | |
folderbasename = 'Overcast' | |
folderappendix = 0 | |
while True: | |
appendix = '' if folderappendix == 0 else str(folderappendix) | |
foldername = folderbasename + appendix | |
folderpath = os.path.join(directory, foldername) | |
if not os.path.isdir(folderpath): | |
break | |
print(f'{Fore.BLUE}Folder: {folderpath} already exists. Trying different name{Style.RESET_ALL}') | |
folderappendix += 1 | |
os.mkdir(folderpath) | |
for idx, output_file in enumerate(chapter_data): | |
file_name = os.path.join(folderpath, '{:02d} - {}.mp3'.format(idx+1, basename)) | |
command = [ | |
'ffmpeg', | |
'-i', | |
"concat:{}".format('|'.join(output_file)), | |
'-c', | |
'copy', | |
'{}'.format(file_name) | |
] | |
print(f'{Fore.GREEN}') | |
print(' '.join(command)) | |
print(f'{Style.RESET_ALL}') | |
process = subprocess.Popen(command) | |
process.wait() | |
overcast_files.append(file_name) | |
print(f'{Fore.GREEN}Written {file_name}{Style.RESET_ALL}') | |
return overcast_files | |
def get_chapters_from_silence(audio_file): | |
""" | |
Guesses the chapterization by silence in the audio file | |
""" | |
chapters = [] | |
# Detect silence with a length of >= 2sec (d=2) | |
command = [ | |
'ffmpeg', | |
'-i', | |
'{}'.format(audio_file), | |
'-af', | |
'silencedetect=n=-50dB:d=2', | |
'-f', | |
'null', | |
'-' | |
] | |
print(f'{Fore.GREEN}') | |
print(' '.join(command)) | |
print(f'{Style.RESET_ALL}') | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
# ffmpeg writes to stderr, read from there | |
_, err = process.communicate() | |
# Decode output to string | |
err = err.decode('utf-8').splitlines() | |
pattern_start = re.compile(r'silence_start: (\d+(?:\.\d+)?)$') #float(index 1) | |
pattern_end = re.compile(r'silence_end: (\d+(?:\.\d+)?) \|.*?$') | |
chapter_start = 0.0 | |
line_generator = (line for line in err if line.startswith('[silencedetect')) | |
for line in line_generator: | |
if not line.startswith('[silencedetect'): | |
continue | |
split_start = re.search(pattern_start, line) | |
split_end = re.search(pattern_end, line) | |
# Silence has started => Chapter is over | |
if split_start: | |
chapters.append({ | |
'start_time': chapter_start, | |
'end_time': float(split_start[1])}) | |
# Silence end: Chapter start | |
elif split_end: | |
chapter_start = float(split_end[1]) | |
return chapters | |
def probe_file_info(audio_file): | |
""" | |
Reads chapter info and bitrate from the file | |
""" | |
command = [ | |
'ffprobe', | |
'-i', | |
'{}'.format(audio_file), | |
'-loglevel', | |
'quiet', | |
'-print_format', | |
'json', | |
'-show_chapters', | |
'-show_format' | |
] | |
print(f'{Fore.GREEN}') | |
print(' '.join(command)) | |
print(f'{Style.RESET_ALL}') | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = process.communicate() | |
if err: | |
print(f'{Fore.GREEN}Error probing file: {err}{Style.RESET_ALL}', file=sys.stderr) | |
j = json.loads(out) | |
chapters = j.get('chapters', []) | |
if not chapters: | |
print(f'{Fore.GREEN}No chapters in metadata. ' | |
f'Trying to detect them via silence{Style.RESET_ALL}') | |
chapters = get_chapters_from_silence(audio_file) | |
print(f'{Fore.GREEN}') | |
print('chapters: ', chapters) | |
print(f'{Style.RESET_ALL}') | |
bitrate = float(j['format']['bit_rate']) / 8 | |
return (chapters, bitrate) | |
def remove_video_track_from_m4b(m4b_file): | |
""" | |
Sometimes there's an empty video track in m4b files. | |
Remove that and write the audio track into an m4a file | |
""" | |
audio_only_file_name = m4b_file.replace('m4b', 'm4a') | |
# -y: Override | |
# -vn: No video | |
command = [ | |
'ffmpeg', | |
'-y', | |
'-i', | |
'{}'.format(m4b_file), | |
'-c', | |
'copy', | |
'-vn', | |
'{}'.format(audio_only_file_name) | |
] | |
print(f'{Fore.GREEN}') | |
print('Removing potential empty video track from m4b file') | |
print(' '.join(command)) | |
print(f'{Style.RESET_ALL}') | |
process = subprocess.Popen(command) | |
process.wait() | |
return audio_only_file_name | |
def generate_splits_from_chapters(chapters, bitrate): | |
""" | |
Generates the splits from chapter data | |
""" | |
splits = [] | |
idx = 0 | |
dur = 0 | |
size = 0 | |
for chapter in chapters: | |
duration = float(chapter['end_time']) - float(chapter['start_time']) | |
chapter_bytes = duration * bitrate | |
dur += duration | |
already_moved = False | |
if size + chapter_bytes > 1000000000: | |
idx += 1 | |
dur = duration | |
splits.append((float(chapter['start_time']), float(chapter['end_time']))) | |
size = chapter_bytes | |
already_moved = True | |
if splits: | |
start, end = splits[idx] | |
end = float(chapter['end_time']) | |
splits[idx] = start, end | |
else: | |
splits.append((float(chapter['start_time']), float(chapter['end_time']))) | |
size += chapter_bytes | |
if dur > 5400 and not already_moved: | |
idx += 1 | |
dur = 0 | |
size = 0 | |
splits.append((float(chapter['end_time']), float(chapter['end_time']))) | |
return splits | |
def convert_to_mp3(audio_file, mp3_file): | |
""" | |
Converts an audio file to mp3 | |
""" | |
command = [ | |
'ffmpeg', | |
'-i', | |
'{}'.format(audio_file), | |
'-acodec', | |
'libmp3lame', | |
'{}'.format(mp3_file) | |
] | |
print(f'{Fore.GREEN}') | |
print(' '.join(command)) | |
print(f'{Style.RESET_ALL}') | |
process = subprocess.Popen(command) | |
process.wait() | |
def handle_m4b(audio_file, is_mp3=False): | |
""" | |
Processes Single file audiobooks | |
""" | |
print(f'{Fore.GREEN}') | |
print('Checking M4B info. Is MP3?', is_mp3) | |
print(f'{Style.RESET_ALL}') | |
#convert m4b to one big mp3 | |
_, ext = os.path.splitext(f) | |
mp3_file = audio_file if is_mp3 else '{}'.format(audio_file.replace(ext, '.mp3')) | |
#read chapter data | |
if not is_mp3 and ext != '.m4a': | |
audio_file = remove_video_track_from_m4b(audio_file) | |
chapters, bitrate = probe_file_info(audio_file) | |
splits = generate_splits_from_chapters(chapters, bitrate) | |
#split mp3 file | |
print('Splits:', splits) | |
if splits[-1][0] == splits[-1][1]: | |
splits = splits[:-1] | |
#mp3_file | |
print(f'{Fore.GREEN}Splitting single file into {len(splits)} files{Style.RESET_ALL}') | |
if not is_mp3: | |
print(f'{Fore.GREEN}Converting to MP3 file{Style.RESET_ALL}') | |
convert_to_mp3(audio_file, mp3_file) | |
else: | |
print(f'{Fore.GREEN}Already MP3{Style.RESET_ALL}') | |
idx = 1 | |
overcast_files = [] | |
for split in splits: | |
oc_file = '{}_{:02d}.mp3'.format(mp3_file.replace('.mp3', ''), idx) | |
command = [ | |
'ffmpeg', | |
'-i', | |
'{}'.format(mp3_file), | |
'-acodec', | |
'copy', | |
'-ss', | |
'{}'.format(split[0]), | |
'-to', | |
'{}'.format(split[1]), | |
'{}'.format(oc_file) | |
] | |
print(f'{Fore.GREEN}') | |
print(' '.join(command)) | |
print(f'{Style.RESET_ALL}') | |
process = subprocess.Popen(command) | |
process.wait() | |
idx += 1 | |
overcast_files.append(oc_file) | |
return overcast_files | |
if len(sys.argv) < 2: | |
print(f'Usage: {sys.argv[1]} /path/to/folder/with/audiobook/') | |
sys.exit(0) | |
ab_dir = sys.argv[1] | |
glob_filter = os.path.join(ab_dir, '*.mp3') | |
oc_files = [] | |
colorama.init() | |
if len(sys.argv) > 2 and sys.argv[2] == "-u": | |
oc_files = glob.glob(glob_filter) | |
else: | |
files = glob.glob(glob_filter) | |
print(f'{Fore.GREEN}{files}{Style.RESET_ALL}') | |
if not files: | |
glob_filter = os.path.join(ab_dir, '*.m4b') | |
print(f'{Fore.GREEN}{glob_filter}{Style.RESET_ALL}') | |
files = glob.glob(glob_filter) | |
print(f'{Fore.GREEN}{files}{Style.RESET_ALL}') | |
glob_filter = os.path.join(ab_dir, '*.m4a') | |
print(f'{Fore.GREEN}{glob_filter}{Style.RESET_ALL}') | |
files.extend(glob.glob(glob_filter)) | |
print(f'{Fore.GREEN}{files}{Style.RESET_ALL}') | |
for f in files: | |
oc_files.extend(handle_m4b(f, False)) | |
else: | |
if len(files) < 10: | |
oc_files.extend(handle_m4b(files[0], True)) | |
elif len(files) < 10: | |
oc_files = files | |
else: | |
print(print(f'{Fore.GREEN}Splitting MP3 files{Style.RESET_ALL}')) | |
to_merge = calc_mp3_split(files) | |
oc_files = concat_mp3_files(to_merge, ab_dir) | |
if not oc_files: | |
print(f'{Fore.YELLOW}Less than 10 mp3 files and no m4b files, so nothing to do{Style.RESET_ALL}') | |
sys.exit() | |
print('Done splitting, starting upload of {} files'.format(len(oc_files))) | |
print(print(f'{Fore.GREEN}Logging in...{Style.RESET_ALL}')) | |
session, response = login_overcast() | |
html = response.text | |
soup = BeautifulSoup(html, 'html.parser') | |
ul_form = soup.find(id='upload_form') | |
ul_inputs = ul_form.find_all('input') | |
upload_to = ul_form.get('action') | |
ul_data = {} | |
for ul_input in ul_inputs: | |
ul_data[ul_input.get('name')] = ul_input.get('value') | |
print(f'{Fore.GREEN}Log in done{Style.RESET_ALL}') | |
for file in oc_files: | |
upload_file(file, ul_data, session, ul_form.get('data-key-prefix')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment