Last active
June 6, 2022 08:59
-
-
Save Paul-Michaud/03b9802f8ae0ab0200270f9323c37220 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Script to download deezer playlist from spotify - multiprocess""" | |
from __future__ import unicode_literals | |
import os | |
import sys | |
import re | |
import urllib.parse | |
import concurrent.futures | |
import requests | |
import youtube_dl | |
def download_youtube(t_name, l_name, lc_name): | |
"""Find and download one track from youtube | |
Args: | |
t_name (str): Name of the track to search on youtube | |
l_name (str): Name of the list (For saving in the right directory) | |
lc_name (str): Name of the list creator (For saving in the right directory) | |
Returns: | |
1: fail | |
0: success | |
""" | |
tn_encoded = urllib.parse.quote_plus(t_name.replace("_", " ")) | |
print(f"Search for {tn_encoded}") | |
res = requests.get(f"https://www.youtube.com/results?search_query={tn_encoded}", timeout=5) | |
# Get the first result | |
video_id = next(re.compile(r'/watch\?v=([^\"]+)', re.I | re.M | re.U).finditer(res.content.decode("utf-8"))).group(1) | |
if not video_id: | |
print("Not found") | |
return 1 | |
video_url = f"http://www.youtube.com/watch?v={video_id}" | |
print(f"video url: {video_url}") | |
ydl_opts = { | |
"postprocessors": [{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "mp3", | |
"preferredquality": "192", | |
"nopostoverwrites": False, | |
}], | |
"outtmpl": f"{lc_name}/{l_name}/{t_name}.%%(ext)s", | |
"format": "bestaudio/best", | |
"ignoreerrors": True, | |
"socket_timeout": 5 | |
} | |
with youtube_dl.YoutubeDL(ydl_opts ) as ydl: | |
# https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/YoutubeDL.py#L2056 | |
ydl.download([video_url]) | |
# remove tack from list | |
with open(list_title, "r+", encoding='utf8') as f: | |
track_lines = f.read().splitlines() | |
f.seek(0) | |
for i in track_lines: | |
if i != t_name: | |
f.write(i+"\n") | |
f.truncate() | |
return 0 | |
DEEZER_API_URL = "https://api.deezer.com" | |
# playlist or album URL | |
LIST_URL = "https://www.deezer.com/fr/album/108175412" | |
list_type, list_id = LIST_URL.strip().split("/")[-2:] | |
list_metadata_url = f"{DEEZER_API_URL}/{list_type}/{list_id}" | |
list_metadata_res = requests.get(list_metadata_url) | |
list_metadata = list_metadata_res.json() | |
# For output directory | |
if list_type == "playlist": | |
list_title = list_metadata["title"].replace(" ", "_").replace("%","") | |
list_creator_name = list_metadata["creator"]["name"].replace(" ", "_").replace("%","") | |
elif list_type == "album": | |
list_title = list_metadata["title"].replace(" ", "_").replace("%","") | |
list_creator_name = list_metadata["artist"]["name"].replace(" ", "_").replace("%","") | |
else: | |
print(f"{list_type} not suported") | |
sys.exit(1) | |
tracks_url = f"{DEEZER_API_URL}/{list_type}/{list_id}/tracks?index=0&limit=10000" | |
tracks_res = requests.get(tracks_url) | |
tracks_data = tracks_res.json() | |
if "error" in tracks_data: | |
print(tracks_data["error"]) | |
sys.exit(1) | |
if "next" in tracks_data: | |
print("Increase limit") | |
sys.exit(1) | |
if not os.path.exists(f"{list_creator_name}/{list_title}"): | |
os.makedirs(f"{list_creator_name}/{list_title}") | |
path_list_tracks = f"{list_creator_name}/{list_title}/list_tracks.txt" | |
# Create list of tracks without already downloaded ones | |
if os.path.isfile(path_list_tracks): os.remove(path_list_tracks) | |
with open(path_list_tracks, "a", encoding='utf8') as f: | |
for track in tracks_data["data"]: | |
track_name = f"{track['artist']['name']}_{track['title']}".replace(" ", "_") | |
if not os.path.isfile(f"{list_creator_name}/{list_title}/{track_name}.mp3"): | |
f.write(track_name+"\n") | |
with open(path_list_tracks, "r", encoding='utf8') as f: | |
track_list = f.read().splitlines() | |
# Download each track 10 by 10 | |
# Probably race condition if 2 process try to open the list of track at the same time in download_youtube | |
executor = concurrent.futures.ProcessPoolExecutor(10) | |
futures = [executor.submit(download_youtube, track, list_title, list_creator_name) for track in track_list] | |
concurrent.futures.wait(futures) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment