Last active
April 7, 2020 02:30
-
-
Save arnesacnussem/436f0a0190fadd3a802becf0c4aef0d6 to your computer and use it in GitHub Desktop.
网易云下载脚本,需要网易云api(https://github.com/Binaryify/NeteaseCloudMusicApi) -p命令可以下载歌单 --artist 可以下载指定艺术家的全部专辑
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import json | |
import os | |
import requests | |
import sys | |
import time | |
import datetime | |
import traceback | |
from PIL import Image | |
from mutagen.easyid3 import EasyID3 | |
from mutagen.flac import FLAC, Picture | |
from mutagen.id3 import ID3, PictureType, APIC, Encoding | |
from _ast import arg | |
import socket | |
import requests.packages.urllib3.util.connection as urllib3_cn | |
def allowed_gai_family(): | |
return socket.AF_INET | |
urllib3_cn.allowed_gai_family = allowed_gai_family | |
def download_and_tag(id): | |
print("[MAIN]: id={},inf".format(id), end='\r') | |
inf = get_song_url_info(id) | |
if not inf['url']: | |
print("Can't download: {}".format(id)) | |
err_list.append(id) | |
return | |
print("[MAIN]: id={},tag".format(id), end='\r') | |
tag = getTag(id) | |
print("[MAIN]: id={},src".format(id), end='\r') | |
download_file(inf['url'], '{}.{}'.format(id, inf['type'])) | |
tag.set_to_file('{}.{}'.format(id, inf['type']), inf['type']) | |
print("[MAIN]: id={},done".format(id)) | |
class Tag: | |
def __init__(self, title, artist, album, pub_time, pic_raw, cache_key): | |
self.title = title | |
self.artist = artist | |
self.album = album | |
self.pub_time = pub_time | |
self.pic_raw = pic_raw | |
self.cache_key = cache_key | |
def __str__(self): | |
return "\n title: %s\n artist: %s\n album: %s\n pub_time=%s\n pics: %s\n cache_key: %s" % ( | |
self.title, self.artist, self.album, self.pub_time, len(self.pic_raw), self.cache_key) | |
def set_to_file(self, file, ext): | |
if "flac".upper() in str(ext).upper(): | |
if len(self.pic_raw) > 0: | |
n_file = FLAC(file) | |
n_file.add_picture(getPicture(img_auto_resize( | |
self.pic_raw, self.cache_key), PictureType.COVER_FRONT)) | |
n_file.add_picture(getPicture(img_auto_resize( | |
self.pic_raw, self.cache_key, True), PictureType.FILE_ICON)) | |
n_file.save() | |
n_file = FLAC(file) | |
else: | |
if len(self.pic_raw) > 0: | |
n_file = ID3(file) | |
n_file.add(getAPIC(img_auto_resize( | |
self.pic_raw, self.cache_key), PictureType.COVER_FRONT)) | |
n_file.add(getAPIC(img_auto_resize( | |
self.pic_raw, self.cache_key, True), PictureType.FILE_ICON)) | |
n_file.save() | |
n_file = EasyID3(file) | |
n_file["title"] = self.title | |
n_file["album"] = self.album | |
n_file["artist"] = self.artist | |
n_file["date"] = str(self.pub_time) | |
n_file.save() | |
def getTag(id): | |
dbg_print('[tag]: id={}'.format(id)) | |
json_meta = json.loads( | |
getRawHttp("{}/song/detail?ids={}".format(api, id)))['songs'][0] | |
dbg_print('[tag]: id={},img'.format(id)) | |
if json_meta['al']['picUrl'] == None: | |
pic_raw = [] | |
else: | |
pic_raw = getRawHttp(json_meta['al']['picUrl'], "raw") | |
dbg_print('[tag]: id={},img done'.format(id)) | |
tag = Tag(json_meta["name"], r_artist(json_meta['ar']), | |
json_meta['al']['name'], r_time(json_meta["publishTime"]), pic_raw, json_meta['al']['pic']) | |
dbg_print('[tag]: id={},tag={}'.format(id, tag)) | |
return tag | |
def r_artist(json_artists): | |
artists = [] | |
for ar in json_artists: | |
artists.append(ar['name']) | |
return artists | |
def r_time(ts): | |
timeArray = time.localtime(ts/1000) | |
otherStyleTime = time.strftime("%Y-%m-%d-%H:%M:%S", timeArray) | |
return otherStyleTime | |
def getPicture(raw, pic_type): | |
pic = Picture() | |
img = Image.open(io.BytesIO(raw)) | |
pic.mime = u'image/jpeg' | |
pic.data, pic.width, pic.height, pic.type, pic.depth \ | |
= raw, img.size[0], img.size[1], pic_type, 16 | |
return pic | |
def getAPIC(raw, pic_type): | |
pic = APIC() | |
pic.desc, pic.encoding, pic.mime = str( | |
pic_type), Encoding.LATIN1, 'image/jpeg' | |
pic.type, pic.data = pic_type, raw | |
return pic | |
def compress(orig_image, max_size): | |
width, height = orig_image.size | |
scales = [scale / 1000 for scale in range(1, 1001)] | |
lo = 0 | |
hi = len(scales) | |
orig_image = orig_image.convert("RGB") | |
while lo < hi: | |
mid = (lo + hi) // 2 | |
scaled_size = (int(width * scales[mid]), int(height * scales[mid])) | |
resized_file = orig_image.resize(scaled_size, Image.ANTIALIAS) | |
file_bytes = io.BytesIO() | |
resized_file.save(file_bytes, **img_options) | |
size = file_bytes.tell() | |
dbg_print("%s , %s " % (size, scales[mid])) | |
if size < max_size: | |
lo = mid + 1 | |
else: | |
hi = mid | |
scale = scales[max(0, lo-1)] | |
dbg_print("[tag][img][resize]scale:{}".format(scale)) | |
orig_image.resize( | |
(int(width * scale), int(height * scale)), Image.ANTIALIAS) | |
def img_auto_resize(raw, cache_key, icon=False): | |
t = 'art' | |
max_size = 2**23 | |
if icon: | |
t = 'ico' | |
max_size = 2**16 | |
if len(raw) < max_size: | |
return raw | |
if cache_key in img_cache[t]: | |
return img_cache[t][cache_key] | |
img = Image.open(io.BytesIO(raw)) | |
compress(img, max_size) | |
img_cache[t].update({cache_key: pil_to_raw(img)}) | |
return img_cache[t][cache_key] | |
def pil_to_raw(pil_img): | |
byteIO = io.BytesIO() | |
RGBA = pil_img.convert("RGB") | |
RGBA.save(byteIO, **img_options) | |
return byteIO.getvalue() | |
def download_file(file_url, file_name): | |
file_path = file_name | |
response = requests.get(file_url, stream=True) | |
length = int(response.headers.get('Content-Length')) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
progress = ProgressBar(file_name, length) | |
with open(file_path, 'wb') as file: | |
for buffer in response.iter_content(chunk_size=1024): | |
if buffer: | |
file.write(buffer) | |
progress.refresh(len(buffer)) | |
return None | |
class ProgressBar(object): | |
def __init__(self, file_name, total): | |
super().__init__() | |
self.file_name = file_name | |
self.count = 0 | |
self.prev_count = 0 | |
self.total = total | |
self.end_str = '\r' | |
def __get_info(self): | |
return 'Progress: {:6.2f}%, {:8.2f}KB, [{:.30}]'\ | |
.format(self.count/self.total*100, self.total/1024, self.file_name) | |
def refresh(self, count): | |
self.count += count | |
# Update progress if down size > 10k | |
if (self.count - self.prev_count) > 10240: | |
self.prev_count = self.count | |
print(self.__get_info(), end=self.end_str) | |
# Finish downloading | |
if self.count >= self.total: | |
inf = '' | |
for x in self.__get_info(): | |
inf += ' ' | |
print(inf, end=self.end_str) | |
def getRawHttp(url, type="text"): | |
dbg_print("[requests]: url={}".format(url)) | |
resp = session.get(url) | |
if type == 'raw': | |
dbg_print("[requests]: url={},len={},done".format( | |
url, len(resp.content))) | |
return resp.content | |
dbg_print("[requests]: url={},len={},done".format(url, len(resp.text))) | |
return resp.text | |
def getTracksInPlaylist(id): | |
dbg_print("[playlist]: id={}".format(id)) | |
playlist = json.loads(getRawHttp( | |
'{}/playlist/detail?id={}'.format(api, id))) | |
playlist = playlist['playlist']['trackIds'] | |
dbg_print("[playlist]: id={},len={}".format(id, len(playlist))) | |
songList = [] | |
for item in playlist: | |
songList.append(item['id']) | |
return songList | |
def getTracksInAlbum(id): | |
dbg_print("[album]: id={}".format(id)) | |
album = json.loads(getRawHttp('{}/album?id={}'.format(api, id))) | |
album = album['songs'] | |
dbg_print("[album]: id={},len={}".format(id, len(album))) | |
songList = [] | |
for item in album: | |
songList.append(item['id']) | |
return songList | |
def getArtistAllAlbum(id): | |
dbg_print("[artist]: id={}".format(id)) | |
artist = json.loads(getRawHttp( | |
'{}/artist/album?id={}&limit=9999999'.format(api, id))) | |
albums = artist['hotAlbums'] | |
album_ids = [] | |
for item in albums: | |
album_ids.append(item['id']) | |
return album_ids | |
def batchGetTracksInAlbum(album_ids): | |
dbg_print("[batch][batchGetTracksInAlbum]: ids={}".format(album_ids)) | |
songList = [] | |
for item in album_ids: | |
tmpList = getTracksInAlbum(item) | |
for song in tmpList: | |
songList.append(song) | |
return songList | |
def get_song_url_info(id): | |
return json.loads(getRawHttp('{}/song/url?id={}'.format(api, id)))['data'][0] | |
def login(): | |
resp = json.loads(getRawHttp( | |
"{}/login?email={}&password={}".format(api, settings['username'], settings['password']))) | |
if resp['code'] != 200: | |
print('账号密码错误') | |
exit() | |
else: | |
print("登陆成功") | |
def dbg_print(msg): | |
if debug: | |
print(msg) | |
# only for playlist mode | |
def load_progress(id): | |
file_name = '{}.json'.format(id) | |
if os.path.exists(file_name): | |
progress = json.loads(open(file_name).read()) | |
print("{}个项目已加载,文件名: {}".format(len(progress), file_name)) | |
dbg_print("[LOAD]: {}".format(json.dumps(progress))) | |
return progress | |
else: | |
return [] | |
def save_progress(id, progress): | |
file_name = '{}.json'.format(id) | |
open(file_name, 'w').write(json.dumps(progress)) | |
print("{}个项目已保存,文件名: {}".format(len(progress), file_name)) | |
dbg_print("[SAVE]: {}".format(json.dumps(progress))) | |
global session | |
global api | |
global settings | |
global debug | |
global err_list | |
global img_options | |
global img_cache | |
if __name__ == "__main__": | |
debug = False | |
argv = sys.argv | |
playlist_mode = False | |
album_mode = False | |
artist_mode = False | |
err_list = load_progress("error_list") | |
img_options = {'optimize': True, 'quality': 100, 'format': 'jpeg'} | |
img_cache = dict(art=dict(), ico=dict()) | |
print('launch with {}'.format(argv)) | |
settings = json.loads(open('settings.json').read()) | |
api = settings['api'] | |
debug = settings['debug'] | |
arg = argv[1] | |
if argv[1] == '--playlist': | |
arg = argv[2] | |
playlist_mode = True | |
if argv[1] == '--album': | |
arg = argv[2] | |
album_mode = True | |
if argv[1] == '--artist': | |
arg = argv[2] | |
artist_mode = True | |
session = requests.session() | |
login() | |
if playlist_mode | album_mode | artist_mode: | |
if playlist_mode: | |
tracks = getTracksInPlaylist(arg) | |
if album_mode: | |
tracks = getTracksInAlbum(arg) | |
if artist_mode: | |
tracks = batchGetTracksInAlbum(getArtistAllAlbum(arg)) | |
progress = load_progress(arg) | |
counter = 0 | |
try: | |
for song in tracks: | |
try: | |
counter += 1 | |
print("[MAIN]: {}/{}".format(counter, len(tracks)), end='\r') | |
if song not in progress: | |
print( | |
"[MAIN]: {}/{},id={}".format(counter, len(tracks), song)) | |
download_and_tag(song) | |
progress.append(song) | |
else: | |
print( | |
"[MAIN]: {}/{},id={} (skip)".format(counter, len(tracks), song)) | |
except BaseException as e: | |
save_progress(arg, progress) | |
save_progress('error_list', err_list) | |
if debug: | |
traceback.print_exc(file=sys.stdout) | |
else: | |
print(e) | |
exit() | |
except KeyboardInterrupt: | |
print("Error List:"+json.dumps(err_list)) | |
save_progress(id, progress) | |
save_progress(arg, progress) | |
else: | |
download_and_tag(arg) | |
print("All DONE!") | |
print("Error List:"+json.dumps(err_list)) | |
save_progress('error_list', err_list) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
certifi==2019.11.28 | |
chardet==3.0.4 | |
idna==2.8 | |
mutagen==1.43.0 | |
Pillow==6.2.1 | |
requests==2.22.0 | |
urllib3==1.25.7 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"api":"http://localhost:3000", | |
"username":"12345", | |
"password":"12345", | |
"debug":false, | |
"threads":1 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment