-
-
Save glader/39c0f08e53f8091faa435a2d4c1d6c88 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__version__ = (2, 0, 0) | |
# meta developer: @cofob | |
# requires: git+https://github.com/MarshalX/yandex-music-api@dev eyed3 | |
# meta desc: Module for downloading music from Yandex Music | |
import logging | |
import os | |
import eyed3 | |
from eyed3.id3.frames import ImageFrame | |
from asyncio import sleep | |
from yandex_music import ClientAsync, TracksList, Track | |
from telethon import TelegramClient | |
from telethon.errors.rpcerrorlist import FloodWaitError | |
import traceback | |
from .. import loader | |
logger = logging.getLogger(__name__) | |
logging.getLogger("yandex_music").propagate = False | |
@loader.tds | |
class YaDownMod(loader.Module): | |
"""Module for downloading favorite music from Yandex Music""" | |
strings = { | |
"name": "YaDown", | |
"no_token": "<b>🚫 Specify a token in config!</b>", | |
"no_channel": "<b>🚫 Specify a channel in config!</b>", | |
"_cfg_yandexmusictoken": "Yandex Music token", | |
"_cfg_postchannel": "Channel for posting music", | |
} | |
strings_ru = { | |
"no_token": "<b>🚫 Укажи токен в конфиге!</b>", | |
"no_channel": "<b>🚫 Укажи канал в конфиге!</b>", | |
"_cfg_yandexmusictoken": "Токен Yandex Music", | |
"_cfg_postchannel": "Канал для постинга музыки", | |
} | |
def __init__(self): | |
self.config = loader.ModuleConfig( | |
loader.ConfigValue( | |
"YandexMusicToken", | |
None, | |
lambda: self.strings["_cfg_yandexmusictoken"], | |
validator=loader.validators.Hidden(), | |
), | |
loader.ConfigValue("PostChannel", None, lambda: self.strings["_cfg_postchannel"]), | |
) | |
async def client_ready(self, client: TelegramClient, db): | |
self.client = client | |
self.db = db | |
if self.config["YandexMusicToken"] and self.config["PostChannel"]: | |
logger.info("Starting downloader") | |
self.downloader.start() | |
@loader.loop(interval=300) | |
async def downloader(self): | |
if not self.config["YandexMusicToken"]: | |
logger.error(self.strings["no_token"]) | |
return | |
if not self.config["PostChannel"]: | |
logger.error(self.strings["no_channel"]) | |
return | |
client = ClientAsync(self.config["YandexMusicToken"]) | |
await client.init() | |
tracks: list[TracksList] = (await client.users_likes_tracks())[::-1] | |
for short_track in tracks: | |
track_id = short_track.track_id | |
logger.debug(f"Processing track {track_id}") | |
try: | |
downloaded = self.get("downloaded", {}) | |
if isinstance(downloaded, list): | |
downloaded = {} | |
if track_id in downloaded: | |
continue | |
full_track: Track = await short_track.fetch_track_async() | |
if not full_track.available: | |
continue | |
if not full_track.artists: | |
continue | |
if not full_track.title: | |
continue | |
name = f"{full_track.artists[0].name} - {full_track.title}" | |
filename = name.replace("/", "-") | |
text = f"<emoji document_id=5188621441926438751>🎵</emoji> <b>{name}</b>" | |
text += "\n<emoji document_id=5373012449597335010>👤</emoji> " | |
for artist in full_track.artists: | |
artist_name = artist.name.replace(" ", "_").replace(".", "_").replace("!", "_").replace("?", "_").replace("-", "_").replace(",", "_").replace("(", "_").replace(")", "_") | |
text += f"#{artist_name} " | |
text = text.strip() | |
text += f'\n<emoji document_id=5445284980978621387>🚀</emoji> <a href="https://music.yandex.ru/track/{full_track.id}">ya.music</a> | <a href="https://song.link/ya/{full_track.id}">Other</a>' | |
logger.info(f"Downloading {name}") | |
await full_track.download_async(f"{filename}.mp3") | |
audiofile = eyed3.load(f"{filename}.mp3") | |
if (audiofile.tag == None): | |
audiofile.initTag() | |
logger.info(f"Adding cover for {name}") | |
try: | |
audiofile.tag.images.set(ImageFrame.FRONT_COVER, await full_track.download_cover_bytes_async("1000x1000"), 'image/jpeg') | |
except AttributeError: | |
logger.error(f"Failed to add cover for {name}, cover cant be fetched") | |
if full_track.artists: | |
audiofile.tag.artist = full_track.artists[0].name | |
if full_track.title: | |
audiofile.tag.title = full_track.title | |
if full_track.albums: | |
audiofile.tag.album = full_track.albums[0].title | |
logger.info(f"Saving tags for {name}") | |
audiofile.tag.save() | |
logger.info(f"Uploading {name}") | |
m = await self.client.send_file( | |
self.config["PostChannel"], | |
f"{filename}.mp3", | |
caption=text, | |
) | |
logger.info(f"Removing {name}") | |
os.remove(f"{filename}.mp3") | |
downloaded[track_id] = { | |
"message_id": m.id, | |
"version": 2, | |
} | |
self.set("downloaded", downloaded) | |
except FloodWaitError as e: | |
logger.warning(f"Sleeping for {e.seconds} seconds") | |
await sleep(e.seconds) | |
except Exception as e: | |
logger.error(traceback.format_exc()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment