-
-
Save st4ycool/b15fdafdd571ecf00289c27ff5cae58f to your computer and use it in GitHub Desktop.
Python: vkontakte.ru (vk.com) audio music downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
""" | |
Скрипт для скачивания музыки с сайта vkontakte.ru (vk.com) | |
Запуск: | |
python vkcom_audio_download.py | |
Принцип работы: | |
Скрипт проверяет сохраненный access_token. Если его нет или срок истек, | |
то открывается страница в браузере с запросом на доступ к аккаунту. | |
После подтверждения идет редирект на https://oauth.vk.com/blank.htm#... . | |
Нужно скопировать весь url, на который вас редиректнуло и вставить его | |
в консоль скрипта. | |
Далее будут скачиваться все ваши аудиозаписи. Если аудиозапись уже есть на | |
диске - то скачивания не происходит. | |
Будут запрошены ваши данные приложением с app_id = 3358129 | |
Можно создать свое Standalone-приложение с доступом к аудио здесь: | |
http://vk.com/editapp?act=create | |
И заменить APP_ID на ваше. | |
""" | |
import webbrowser | |
import pickle | |
import json | |
import urllib | |
import requests | |
import html.parser | |
import re | |
import os | |
import urllib.parse | |
from datetime import datetime, timedelta | |
import asyncio | |
from contextlib import closing | |
import aiohttp | |
# id of vk.com application, that has access to audio | |
APP_ID = '3358129' | |
# if None, then save mp3 in current folder | |
MUSIC_FOLDER = 'music' | |
# file, where auth data is saved | |
AUTH_FILE = '.auth_data' | |
# simultaneous requests count | |
REQUESTS_COUNT = 10 | |
# chars to exclude from filename | |
FORBIDDEN_CHARS = '/\\\?%*:|"<>!' | |
def get_saved_auth_params(): | |
access_token = None | |
user_id = None | |
try: | |
with open(AUTH_FILE, 'rb') as pkl_file: | |
token = pickle.load(pkl_file) | |
expires = pickle.load(pkl_file) | |
uid = pickle.load(pkl_file) | |
if datetime.now() < expires: | |
access_token = token | |
user_id = uid | |
except IOError: | |
pass | |
return access_token, user_id | |
def save_auth_params(access_token, expires_in, user_id): | |
expires = datetime.now() + timedelta(seconds=int(expires_in)) | |
with open(AUTH_FILE, 'wb') as output: | |
pickle.dump(access_token, output) | |
pickle.dump(expires, output) | |
pickle.dump(user_id, output) | |
def get_auth_params(): | |
auth_url = ("https://oauth.vk.com/authorize?client_id={app_id}" | |
"&scope=audio&redirect_uri=http://oauth.vk.com/blank.html" | |
"&display=page&response_type=token".format(app_id=APP_ID)) | |
webbrowser.open_new_tab(auth_url) | |
redirected_url = input("Paste here url you were redirected:\n") | |
aup = urllib.parse.parse_qs(redirected_url) | |
aup['access_token'] = aup.pop( | |
'https://oauth.vk.com/blank.html#access_token') | |
save_auth_params(aup['access_token'][0], aup['expires_in'][0], | |
aup['user_id'][0]) | |
return aup['access_token'][0], aup['user_id'][0] | |
def get_tracks_metadata(access_token, user_id): | |
url = ("https://api.vk.com/method/audio.get.json?" | |
"uid={uid}&access_token={atoken}".format( | |
uid=user_id, atoken=access_token)) | |
return requests.get(url).json()['response'] | |
def get_track_full_name(t_data): | |
html_parser = html.parser.HTMLParser() | |
full_name = u"{0}_{1}".format( | |
html_parser.unescape(t_data['artist'][:100]).strip(), | |
html_parser.unescape(t_data['title'][:100]).strip(), | |
) | |
full_name = re.sub('[' + FORBIDDEN_CHARS + ']', "", full_name) | |
full_name = re.sub(' +', ' ', full_name) | |
return full_name + ".mp3" | |
async def download(t_url, t_name, session, semaphore): | |
t_path = os.path.join(MUSIC_FOLDER or "", t_name) | |
if os.path.exists(t_path): | |
return | |
with (await semaphore): | |
print('Downloading', t_name) | |
response = await session.get(t_url) | |
with closing(response), open(t_path, 'wb') as file: | |
while True: | |
chunk = await response.content.read() | |
if not chunk: | |
break | |
file.write(chunk) | |
print('Saved file', t_name) | |
return t_name | |
def main(): | |
access_token, user_id = get_saved_auth_params() | |
if not access_token or not user_id: | |
access_token, user_id = get_auth_params() | |
tracks = get_tracks_metadata(access_token, user_id) | |
if MUSIC_FOLDER and not os.path.exists(MUSIC_FOLDER): | |
os.makedirs(MUSIC_FOLDER) | |
t_info = ((t['url'], get_track_full_name(t)) for t in tracks) | |
with closing(asyncio.get_event_loop()) as loop, \ | |
closing(aiohttp.ClientSession()) as session: | |
semaphore = asyncio.Semaphore(REQUESTS_COUNT) | |
download_tasks = (download(url, name, session, semaphore) for url, name in t_info) | |
result = loop.run_until_complete(asyncio.gather(*download_tasks)) | |
print("All music is up to date") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment