Last active
October 26, 2025 13:30
-
-
Save felipeadeildo/961587b1f5660a5db42102666e9884d0 to your computer and use it in GitHub Desktop.
Rocketseat Downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import os | |
| import pickle | |
| import queue | |
| import random | |
| import re | |
| import threading | |
| from pathlib import Path | |
| from typing import Optional | |
| from urllib.parse import parse_qs | |
| import m3u8 | |
| import requests | |
| from bs4 import BeautifulSoup | |
| BASE_API = "https://skylab-api.rocketseat.com.br" | |
| BASE_URL = "https://app.rocketseat.com.br" | |
| SESSION_PATH = Path(".session.pkl") | |
| def clear_screen(): | |
| os.system("cls" if os.name == "nt" else "clear") | |
| def sanitize_string(string: str): | |
| return re.sub(r'[@#$%&*/:^{}<>?"]', "", string).strip() | |
| class PandaVideo: | |
| def __init__(self, url: str, save_path: str, threads_count=10): | |
| def create_session(headers: dict = {}): | |
| session = requests.session() | |
| session.headers.update( | |
| { | |
| "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", | |
| **headers, | |
| } | |
| ) | |
| return session | |
| qs = parse_qs(url.split("?")[-1]) | |
| if "v" not in qs: | |
| raise Exception("ID do vídeo não encontrado na querystring") | |
| self.video_id = qs["v"][0] | |
| _match = re.search(r"https://([^/]+)/", url) | |
| if not _match: | |
| raise Exception("Domínio não encontrado na URL") | |
| self.domain = _match.group(1).replace("player", "b") | |
| self.session = create_session( | |
| {"referer": url, "origin": f"https://{self.domain}"} | |
| ) | |
| self.save_path = save_path | |
| self.threads_count = threads_count | |
| def _create_temp_folder(self): | |
| foldername = "".join(random.choices("ABCDEFGHIJKLMNOPQRSTUVWXYZ", k=5)) | |
| self.temp_folder = Path(".temp") / foldername | |
| if not os.path.exists(self.temp_folder): | |
| os.makedirs(self.temp_folder) | |
| return self.temp_folder | |
| def __convert_segments(self): | |
| ffmpeg_cmd = f'ffmpeg -hide_banner -loglevel error -stats -y -i "{self.temp_folder}/playlist.m3u8" -c copy -bsf:a aac_adtstoasc "{self.save_path}"' | |
| os.system(ffmpeg_cmd) | |
| for filename in os.listdir(self.temp_folder): # type: ignore [StrPath is str :D] | |
| os.remove(self.temp_folder / filename) | |
| os.removedirs(self.temp_folder) | |
| def __download_playlist(self, playlist_url: str): | |
| self._create_temp_folder() | |
| playlist_content = self.session.get(playlist_url).text | |
| # writing the local playlist | |
| with open(self.temp_folder / "playlist.m3u8", "w") as file: | |
| for line in playlist_content.splitlines(): | |
| line = line.split("/")[-1] if line.startswith("https") else line | |
| file.write(f"{line}\n") | |
| playlist = m3u8.loads(playlist_content) | |
| threads = [] | |
| segment_queue = queue.Queue() | |
| self.downloaded_segments = 0 | |
| self.total_segments = len(playlist.segments) | |
| for segment in playlist.segments: | |
| segment_queue.put(segment) | |
| def worker(): | |
| while not segment_queue.empty(): | |
| segment = segment_queue.get() | |
| filename = segment.uri.split("/")[-1] | |
| with open(self.temp_folder / filename, "wb") as file: | |
| file.write(self.session.get(segment.uri).content) | |
| segment_queue.task_done() | |
| self.downloaded_segments += 1 | |
| print( | |
| "\r\t\t\t\t\tBaixando segmento %d de %d" | |
| % (self.downloaded_segments, self.total_segments), | |
| end="", | |
| flush=True, | |
| ) | |
| for _ in range(self.threads_count): | |
| thread = threading.Thread(target=worker) | |
| thread.start() | |
| threads.append(thread) | |
| for thread in threads: | |
| thread.join() | |
| segment_queue.join() | |
| print("\n\t\t\t\t\tConcluído!", flush=True) | |
| self.__convert_segments() | |
| def download(self): | |
| if os.path.exists(self.save_path): | |
| print("\t\t\t\tArquivo já existe (PANDA)") | |
| return | |
| playlists_url = f"https://{self.domain}/{self.video_id}/playlist.m3u8" | |
| playlists_content = self.session.get(playlists_url).text | |
| playlists_loaded = m3u8.loads(playlists_content) | |
| best_playlist = max( | |
| playlists_loaded.playlists, | |
| key=lambda x: x.stream_info.resolution[0] * x.stream_info.resolution[1], | |
| ) | |
| best_playlist_url = ( | |
| (f"https://{self.domain}/{self.video_id}/{best_playlist.uri}") | |
| if not best_playlist.uri.startswith("http") | |
| else best_playlist.uri | |
| ) | |
| self.__download_playlist(best_playlist_url) | |
| class RocketseatProperties: | |
| specialization: dict | |
| module: dict | |
| levels: list | |
| level: dict | |
| groups: list | |
| group: dict = dict() | |
| lesson: dict | |
| @property | |
| def specialization_name(self) -> str: | |
| return self.specialization["title"] | |
| @property | |
| def specialization_uri(self) -> str: | |
| return self.specialization["uri"] | |
| @property | |
| def specialization_slug(self) -> str: | |
| return self.specialization["slug"] | |
| @property | |
| def modules(self) -> list[dict]: | |
| return self.level["contents"] | |
| @modules.setter | |
| def modules(self, value: list[dict]): | |
| self.level["contents"] = value | |
| @property | |
| def modules_count(self) -> int: | |
| return len(self.modules) | |
| @property | |
| def module_name(self) -> str: | |
| return self.module["title"] | |
| @property | |
| def module_index(self) -> int: | |
| return self.module["script_index"] | |
| @property | |
| def f_module_name(self) -> str: | |
| return f"{self.module_index} - {sanitize_string(self.module_name)}" | |
| @property | |
| def module_slug(self) -> str: | |
| return self.module["slug"] | |
| @property | |
| def levels_count(self) -> int: | |
| return len(self.levels) | |
| @property | |
| def level_name(self) -> str: | |
| return self.level["title"] | |
| @property | |
| def level_type(self) -> str: | |
| return self.level["type"] | |
| @property | |
| def level_index(self) -> int: | |
| return self.level["script_index"] | |
| @property | |
| def f_level_name(self) -> str: | |
| return f"{self.level_index} - {sanitize_string(self.level_name)}" | |
| @property | |
| def group_name(self) -> str: | |
| return self.group["title"] | |
| @property | |
| def group_index(self) -> int: | |
| return self.group["script_index"] | |
| @property | |
| def f_group_name(self) -> str: | |
| return f"{self.group_index} - {sanitize_string(self.group_name)}" | |
| @property | |
| def groups_count(self) -> int: | |
| return len(self.groups) | |
| @property | |
| def lessons(self) -> list: | |
| return self.group["lessons"] | |
| @lessons.setter | |
| def lessons(self, value: list): | |
| self.group["lessons"] = value | |
| @property | |
| def lessons_count(self) -> int: | |
| return len(self.lessons) | |
| @property | |
| def lesson_name(self) -> str: | |
| return self.lesson["last"]["title"] | |
| @property | |
| def lesson_index(self) -> int: | |
| return self.lesson["script_index"] | |
| @property | |
| def f_lesson_name(self) -> str: | |
| return f"{self.lesson_index} - {sanitize_string(self.lesson_name)}" | |
| @property | |
| def lesson_type(self) -> str: | |
| return self.lesson["type"] | |
| @property | |
| def lesson_video_id(self) -> str: | |
| return self.lesson["last"]["resource"] | |
| @property | |
| def lesson_description(self) -> Optional[str]: | |
| return self.lesson["last"].get("description") | |
| @property | |
| def lesson_attachments(self) -> list[dict]: | |
| return self.lesson["last"].get("downloads", []) or [] | |
| @property | |
| def save_path(self) -> Path: | |
| _path = Path("Cursos") / self.specialization_name / self.f_level_name | |
| if self.level_type != "lesson": | |
| _path /= self.f_module_name | |
| if self.modules_count == 1 or self.groups_count == 1: | |
| _path /= self.f_lesson_name | |
| else: | |
| _path /= self.f_group_name | |
| _path /= self.f_lesson_name | |
| _path.mkdir(exist_ok=True, parents=True) | |
| return _path | |
| class Rocketseat(RocketseatProperties): | |
| def __init__(self): | |
| self._session_exists = SESSION_PATH.exists() | |
| if self._session_exists: | |
| print( | |
| f"Carregando sessão salva (se ocorrer algum erro, considere apagar o arquivo {SESSION_PATH})..." | |
| ) | |
| self.session = pickle.load(SESSION_PATH.open("rb")) | |
| else: | |
| self.session = requests.session() | |
| self.session.headers["User-Agent"] = ( | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36" | |
| ) | |
| self.session.headers["Referer"] = "https://app.rocketseat.com.br/" | |
| def _download_video(self): | |
| """essa disgraça vai baixar o video, vai basicamente pegar o módulo id e instanciar a classe de download passando como argumento o save_path""" | |
| PandaVideo( | |
| f"https://player-vz-762f4670-e04.tv.pandavideo.com.br/embed/?v={self.lesson_video_id}", | |
| str(self.save_path / "aulinha.mp4"), | |
| ).download() | |
| def _save_lesson_description(self): | |
| """Salva a descrição da aula se houver num aquivo txt""" | |
| if self.lesson_description: | |
| with (self.save_path / "descricao.txt").open("w") as file: | |
| file.write(self.lesson_description) | |
| def _download_lesson_attachments(self): | |
| """Baixa, se houver, anexos para a aula (só iterar sobre downloads e dar get em file_url)""" | |
| for attach in self.lesson_attachments: | |
| filename = f"{sanitize_string(attach['title'])} - {attach['file']}" | |
| file_path = self.save_path / filename | |
| if file_path.exists(): | |
| print(f"\t\t\t\tAnexo {filename} já existe. Pulando...") | |
| continue | |
| with (self.save_path / filename).open("wb") as f: | |
| f.write(self.session.get(attach["file_url"]).content) | |
| print(f"\t\t\t\tAnexo baixado: {filename}") | |
| def _download_lesson(self): | |
| """finalmente sapoha vai indentifcicar o tipo de conteúdo e chamar a função que trata do tipo específico da entrega de copnteúdo da lessson""" | |
| print( | |
| f"\t\t\t\tBaixando aula {self.lesson_index} de {self.lessons_count}: {self.lesson_name}" | |
| ) | |
| factory = { | |
| "video": self._download_video, | |
| } | |
| factory.get( | |
| self.lesson_type, | |
| lambda: print(f"Tipo de aula desconhecido: {self.lesson_type}"), | |
| )() | |
| self._save_lesson_description() | |
| self._download_lesson_attachments() | |
| def _download_group(self): | |
| """Só itera sobre as lessons de dentro do grupo e chama o downloader""" | |
| print( | |
| f"\t\t\tBaixando grupo {self.group_index} de {self.groups_count}: {self.group_name}" | |
| ) | |
| for i, self.lesson in enumerate(self.lessons, 1): | |
| self.lesson.update(script_index=i) | |
| self._download_lesson() | |
| def _download_module(self): | |
| """essa disgraça lista os grupos, itera sobre eles e chama o downloader group""" | |
| print( | |
| f"\t\tBaixando módulo {self.module_index} de {self.modules_count}: {self.module_name}" | |
| ) | |
| data = self.session.get(f"{BASE_API}/journey-nodes/{self.module_slug}").json() | |
| if data["cluster"] is None: | |
| self.groups = [data["group"]] | |
| else: | |
| self.groups = data["cluster"]["groups"] | |
| for i, self.group in enumerate(self.groups, 1): | |
| self.group.update(script_index=i) | |
| self._download_group() | |
| def _download_level(self): | |
| """Itera sobre os níveis e seus módulos e bota pra baixar topado""" | |
| print( | |
| f"\tBaixando level {self.level_index} de {self.levels_count}: {self.level_name}" | |
| ) | |
| if self.level_type == "lesson": | |
| self.lessons = [self.level["lesson"]] | |
| self.lesson = self.lessons[0] | |
| self.lesson.update(script_index=1) | |
| self._download_lesson() | |
| elif self.level_type == "cluster": | |
| self.modules = [self.level] | |
| self.module = self.modules[0] | |
| self._download_module() | |
| else: | |
| print( | |
| f"\tO tipo de level é {self.level_type} o qual não foi (talvez nem será) implementado neste script." | |
| ) | |
| def __load_nodes(self) -> list[dict]: | |
| """Esta função (agora) vai carregar o json vindo em embedding num javascript renderizado no html pelo servidor.""" | |
| res = self.session.get(f"{BASE_URL}/{self.specialization_uri}/contents") | |
| soup = BeautifulSoup(res.text, "html.parser") | |
| try: | |
| script_tag = next( | |
| script | |
| for script in soup.find_all("script") | |
| if "journeyId" in script.text | |
| ) | |
| except StopIteration: | |
| print("O script contendo as aulas não foi encontrado...") | |
| with open("server_response.html", "wb") as f: | |
| f.write(res.content) | |
| exit(1) | |
| script_text = script_tag.text.strip().replace('\\"', '"').replace("\\\\", "\\") | |
| start = script_text.index('{"journey') | |
| json_content = json.loads(script_text[start:].split(',"children')[0] + "}") | |
| with open("json_content.json", "w", encoding="utf-8") as f: | |
| json.dump(json_content, f, indent=4, ensure_ascii=False) | |
| return json_content["journey"]["nodes"] | |
| def _download_courses(self): | |
| """Lista levels (são nodes agora) e itera sobre elas chamando download modules""" | |
| clear_screen() | |
| print(f"Baixando especialização {self.specialization_name}") | |
| self.levels = self.__load_nodes() | |
| for i, self.level in enumerate(self.levels, 1): | |
| self.level.update(script_index=i) | |
| self._download_level() | |
| def login(self, username: str, password: str): | |
| """Faz login setando a sessão utilizando username e password fornecidos na instancialização""" | |
| payload = {"email": username, "password": password} | |
| res = self.session.post(f"{BASE_API}/sessions", json=payload).json() | |
| self.session.headers["Authorization"] = ( | |
| f"{res['type'].capitalize()} {res['token']}" | |
| ) | |
| self.session.cookies.update( | |
| { | |
| "skylab_next_access_token_v3": res["token"], | |
| "skylab_next_refresh_token_v3": res["refreshToken"], | |
| } | |
| ) | |
| account_infos = self.session.get(f"{BASE_API}/account").json() | |
| print("Welcome, {}!".format(account_infos["name"])) | |
| pickle.dump(self.session, SESSION_PATH.open("wb")) | |
| def select_specializations(self): | |
| """Permite seleconar uma ou todas as especializações para download dos cursos""" | |
| params = { | |
| "types[0]": "SPECIALIZATION", | |
| "limit": "12", | |
| "offset": "0", | |
| "page": "1", | |
| "sort_by": "relevance", | |
| } | |
| specializations = self.session.get( | |
| f"{BASE_API}/catalog/list", params=params | |
| ).json()["items"] | |
| clear_screen() | |
| print("Selecione uma formação ou 0 para selecionar todas:") | |
| for i, specialization in enumerate(specializations, 1): | |
| print(f"[{i}] - {specialization['title']}") | |
| choice = int(input(">> ")) | |
| if choice == 0: | |
| for self.specialization in specializations: | |
| self._download_courses() | |
| else: | |
| self.specialization = specializations[choice - 1] | |
| self._download_courses() | |
| def run(self): | |
| """Inicia a execução do script""" | |
| if not self._session_exists: | |
| self.login( | |
| username=input("Seu email Rocketseat: "), password=input("Sua senha: ") | |
| ) | |
| self.select_specializations() | |
| if __name__ == "__main__": | |
| agent = Rocketseat() | |
| agent.run() |
Acho que quebrou novamente, sempre estou recebendo a mesma mensagem ao tentar baixar um módulo:
...
Processando módulo: 'Fundamentos do Angular' (Tipo: cluster)
Módulo 'Fundamentos do Angular' não é um cluster de aulas ou não possui slug. Pulando.
Fiz algumas mudanças e agora está funcionando perfeitamente com todos os cursos da Rocketseat.
https://github.com/jplinharescosta/rocketseat-downloader
- Novidades
- Suporte a nós type=cluster e type=group (parsing de cluster.groups e group.lessons).
- Requisições com retries/backoff e timeout configurável.
- Execução do yt-dlp via subprocess com captura de logs e melhor tratamento de erro.
- Variáveis de ambiente para configurar credenciais, timeout, domínio da CDN e diretório da sessão.
- Logs de debug em logs/{slug}_cluster_details.json.
- Ajustes na seção de uso (python main.py, senha mascarada, sessão persistida).
Se você testar e estiver funcionando perfeitamente eu fico a disposição para fazer um PR e deletar o meu repositório. @Kazbonfim
@jplinharescosta ficou excelente! Fiz um teste em um curso que não baixava nem ferrando, e agora está funcional; mantenha em seu repositório, e pode abrir a PR no meu, pra aprovar, e unificamos - vai te servir como um projeto pequeno pro futuro também!
Obrigado pelas melhorias, bora que bora 🐦🔥
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Tive que mudar um pouco a lógica do Script; agora, ao invés de procurar por dados apenas no HTML, e em alguns dados retornados, ele vai procurar pelos respectivos .jsons que são baixados ao se acessar as páginas dos referidos cursos.
Não sei se vai funcionar EM TODOS, mas o curso de Banco de Dados pode ser baixado tranquilamente - se puderem testar em outros cursos, e reportar, agradeço.
https://github.com/Kazbonfim/rocketseat-downloader2/tree/Kazbonfim-patch-1
De nada