Created
October 30, 2017 21:26
-
-
Save anarcie/2677f2a2244dbf4938b176c3d4aa1aa3 to your computer and use it in GitHub Desktop.
4Chan Radio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
import requests,youtube_dl,os,shutil, isodate, random, re, string | |
import json | |
import time | |
import re, sys | |
import subprocess | |
import datetime | |
#Globals | |
global_Last_Request = time.time() | |
global_Request_Min_Delay = 1.2 | |
global_ignore_list = [] | |
global_max_song_length_min = 5 | |
global_ydl_opts = { | |
'format': 'bestaudio/best', | |
'quiet': True, | |
'outtmpl': './songs/%(title)s.%(ext)s', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'wav', | |
'preferredquality': '192', | |
}] | |
} | |
def clearSongs(): | |
folder = './songs' | |
for the_file in os.listdir(folder): | |
file_path = os.path.join(folder, the_file) | |
try: | |
if os.path.isfile(file_path): | |
os.unlink(file_path) | |
except Exception as e: | |
print(e) | |
def clearSongsWavs(): | |
folder = './songs' | |
for the_file in os.listdir(folder): | |
file_path = os.path.join(folder, the_file) | |
try: | |
if os.path.isfile(file_path): | |
if ".wav" in file_path: | |
os.unlink(file_path) | |
except Exception as e: | |
print(e) | |
#Sends out a get request to 4chan | |
#Requests are rate limited to 1 request a second | |
#as per api docs | |
def sendGet(arg1): | |
try: | |
global global_Last_Request | |
current_Request = time.time() | |
#Dont exceed the request rate limit. | |
#Loop until rate limited exceeded | |
while(current_Request - global_Last_Request < global_Request_Min_Delay): | |
current_Request = time.time() | |
time.sleep(global_Request_Min_Delay/2) | |
print('Requesting {}'.format(arg1)) | |
r = requests.get('http://a.4cdn.org/{}'.format(arg1), timeout=2.00) | |
#Update the last request time. | |
global_Last_Request = time.time() | |
return r.text | |
except Exception: | |
return False | |
def isMusic(cat, tags, duration): | |
if duration > global_max_song_length_min * 60: | |
print ('Song Too Long, Skipping: {} Seconds'.format(duration)) | |
return False | |
if str(cat) in ['10']: | |
print ('Music: Catagory') | |
return True | |
for tag in tags: | |
if tag.lower() in ['music', 'song']: | |
print ('Music: Tags') | |
return True | |
print ('Music: Nope') | |
return False | |
def getCatagory(videoID): | |
global global_ignore_list | |
if videoID not in global_ignore_list: | |
try: | |
tags = '' | |
catagory = '' | |
print ('Lookup up video ID {}'.format(videoID)) | |
apiKey = 'INSERT YOUR OWN DAMN KEY' | |
apiURL = 'https://www.googleapis.com/youtube/v3/videos?id={}&part=snippet,statistics,contentDetails&key={}' | |
youtubeURL = apiURL.format(videoID, apiKey) | |
r = requests.get(youtubeURL) | |
info = json.loads(r.text) | |
catagory = info['items'][0]['snippet']['categoryId'] | |
tags = info['items'][0]['snippet']['tags'] | |
duration = info['items'][0]['contentDetails']['duration'] | |
duration= isodate.parse_duration(duration) | |
duration = duration.total_seconds() | |
global_ignore_list.append(videoID) | |
return isMusic(catagory, tags, duration) | |
except Exception as e: | |
print ('Error processing video ID {}'.format(videoID)) | |
print (e) | |
return False | |
else: | |
print ('Skipping previously scanned video ID {}'.format(videoID)) | |
def downloadAudio(videoURL): | |
try: | |
global global_ydl_opts | |
with youtube_dl.YoutubeDL(global_ydl_opts) as ydl: | |
dl = ydl.download([videoURL]) | |
except Exception as e: | |
print (e) | |
#Get list of threads on a given board | |
def getBoard(shortName): | |
parseBoard(sendGet('{}/threads.json'.format(shortName)), shortName) | |
#Get the thread based on board and Thread ID | |
def getThread(board, threadNumber): | |
parseThread(sendGet('{}/thread/{}.json'.format(board, threadNumber))) | |
#Parse board, getting all threads | |
def parseBoard(resp, shortName): | |
total_threads = 0 | |
curr_thread = 0 | |
if resp: | |
for page in json.loads(resp): | |
threads = page['threads'] | |
total_threads = len(threads) * len(threads) | |
for thread in threads: | |
curr_thread = curr_thread + 1 | |
print ('Processing Thread: {} of {}'.format(str(curr_thread), str(total_threads))) | |
getThread(shortName, thread['no']) | |
#Parse a thread for Youtube links | |
def parseThread(resp): | |
if resp: | |
for post in json.loads(resp)['posts']: | |
if 'com' in post: | |
parseComment(post['com']) | |
def parseComment(com): | |
regexString = 'http(?:s?):\/\/(?:www\.)?youtu(?:be\.com\/watch\?v=|\.be\/)([\w\-\_]*)(&(amp;)?[\w\?=]*)?' | |
com = com.replace('<wbr>', '') | |
if 'youtube' in com: | |
matchList = re.finditer(regexString, com, re.IGNORECASE) | |
for match in matchList: | |
VideoID = match.group(1) | |
VideoURL = match.group() | |
if (getCatagory(VideoID)): | |
print (VideoURL) | |
downloadAudio(VideoID) #VideoURL | |
def normalizeName(name): | |
return "".join(i for i in name if ord(i)<128) | |
def NormalizeAll(): | |
for dirpath,_,filenames in os.walk('./songs'): | |
for file in filenames: | |
if file.endswith('.mp3'): | |
print ("Normalizing File: {}".format(file)) | |
fullPath = os.path.abspath(os.path.join(dirpath, file.replace("'", "\'"))) | |
normalName = normalizeName(os.path.abspath(os.path.join(dirpath, file.replace("'", "\'")))) | |
os.rename(fullPath, normalName) | |
def convertAudio(): | |
for dirpath,_,filenames in os.walk('./songs'): | |
for file in filenames: | |
if file.endswith('.wav'): | |
print ("Converting {} to MP3 format...".format(file)) | |
try: | |
fullPath = normalizeName(os.path.abspath(os.path.join(dirpath, file.replace("'", "\'")))) | |
command = 'ffmpeg -i "./songs/{}" -vn -ar 44100 -ac 2 -ab 192k -f mp3 "./songs/{}.mp3"'.format(file,file.replace('.wav', '').replace("'", "")) | |
subprocess.check_output(command, stderr=subprocess.STDOUT,shell=True) | |
os.unlink(fullPath) | |
except subprocess.CalledProcessError as e: | |
raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output)) | |
def buildRadioList(): | |
RadioList = open('radioList.txt', 'w+') | |
for dirpath,_,filenames in os.walk('./songs'): | |
random.shuffle(filenames) | |
for file in filenames: | |
if file.endswith('.mp3'): | |
fullPath = os.path.abspath(os.path.join(dirpath, file.replace("'", ""))) | |
fullPath = normalizeName(fullPath) | |
if os.path.exists(fullPath): | |
RadioList.write('file \'' + fullPath + '\'\n') | |
def buildBroadcast(board): | |
try: | |
command = 'ffmpeg -y -f concat -safe 0 -i radioList.txt -c copy "./Broadcasts/4Chan Radio {}.mp3"'.format(board) | |
print(command) | |
subprocess.check_output(command, stderr=subprocess.STDOUT,shell=True) | |
except subprocess.CalledProcessError as e: | |
raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output)) | |
def processBoard(board): | |
clearSongs() | |
getBoard(board) | |
convertAudio() | |
NormalizeAll() | |
buildRadioList() | |
buildBroadcast(board) | |
#MAIN | |
#processBoard('r9k') | |
#processBoard('pol') | |
#processBoard('bant') | |
processBoard('mu') | |
processBoard('fit') | |
processBoard('a') | |
processBoard('c') | |
#processBoard('b') | |
print ("Complete") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment