Last active
July 16, 2024 12:40
-
-
Save danaspiegel/c33004e52ffacb60c24215abf8301680 to your computer and use it in GitHub Desktop.
Zoom Recording Downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import requests | |
import itertools | |
from dateutil.parser import parse | |
API_KEY = '' | |
API_SECRET = '' | |
ACCESS_TOKEN = '' | |
API_ENDPOINT_USER_LIST = 'https://api.zoom.us/v1/user/list' | |
API_ENDPOINT_RECORDING_LIST = 'https://api.zoom.us/v1/recording/list' | |
DOWNLOAD_DIRECTORY = 'downloads' | |
COMPLETED_MEETING_IDS_LOG = 'completed_downloads.txt' | |
COMPLETED_MEETING_IDS = set() | |
def get_credentials(host_id, page_number): | |
return { | |
'api_key': API_KEY, | |
'api_secret': API_SECRET, | |
'host_id': host_id, | |
'page_size': 300, | |
'page_number': page_number, | |
} | |
def get_user_ids(): | |
response = requests.post(API_ENDPOINT_USER_LIST, data=get_credentials(1, 1)) | |
user_data = response.json() | |
user_ids = [(user['id'], user['email'], ) for user in user_data['users']] | |
return user_ids | |
def format_filename(recording, file_type): | |
uuid = recording['uuid'] | |
topic = recording['topic'].replace('/', '&') | |
meeting_time = parse(recording['start_time']) | |
return '{} - {} UTC - {}.{}'.format( | |
meeting_time.strftime('%Y.%m.%d'), meeting_time.strftime('%I.%M %p'), topic, file_type.lower()) | |
def get_downloads(recording): | |
downloads = [] | |
for download in recording['recording_files']: | |
file_type = download['file_type'] | |
download_url = download['download_url'] | |
downloads.append((file_type, download_url, )) | |
return downloads | |
def list_recordings(user_id): | |
post_data = get_credentials(user_id, 1) | |
response = requests.post(API_ENDPOINT_RECORDING_LIST, data=post_data) | |
recordings_data = response.json() | |
total_records = recordings_data['total_records'] | |
page_count = recordings_data['page_count'] | |
if total_records == 0: | |
return [] | |
if page_count <= 1: | |
return recordings_data['meetings'] | |
recordings = recordings_data['meetings'] | |
for i in range(1, page_count): # start at page index 1 since we already have the first page | |
current_page = i + 1 | |
print('Getting page {} of {}'.format(current_page, page_count)) | |
post_data = get_credentials(user_id, current_page) | |
response = requests.post(API_ENDPOINT_RECORDING_LIST, data=post_data) | |
recordings_data = response.json() | |
if recordings_data: | |
recordings.extend(recordings_data['meetings']) | |
return recordings | |
def download_recording(download_url, email, filename): | |
dl_dir = os.sep.join([DOWNLOAD_DIRECTORY, email]) | |
full_filename = os.sep.join([dl_dir, filename]) | |
os.makedirs(dl_dir, exist_ok=True) | |
response = requests.get(download_url, stream=True) | |
try: | |
with open(full_filename, 'wb') as fd: | |
for chunk in response.iter_content(chunk_size=128): | |
fd.write(chunk) | |
return True | |
except Exception as e: | |
# if there was some exception, print the error and return False | |
print(e) | |
return False | |
def load_completed_meeting_ids(): | |
with open(COMPLETED_MEETING_IDS_LOG, 'r') as fd: | |
for line in fd: | |
COMPLETED_MEETING_IDS.add(line.strip()) | |
def main(): | |
load_completed_meeting_ids() | |
users = get_user_ids() | |
for user_id, email in users: | |
print('Getting recording list for {}'.format(email)) | |
recordings = list_recordings(user_id) | |
total_count = len(recordings) | |
print('Found {} recordings'.format(total_count)) | |
for index, recording in enumerate(recordings): | |
success = False | |
meeting_id = recording['uuid'] | |
# import ipdb;ipdb.set_trace() | |
if meeting_id in COMPLETED_MEETING_IDS: | |
print('Skipping already downloaded meeting: {}'.format(meeting_id)) | |
continue | |
downloads = get_downloads(recording) | |
for file_type, download_url in downloads: | |
filename = format_filename(recording, file_type) | |
print('Downloading ({} of {}): {}: {}'.format(index+1, total_count, meeting_id, download_url)) | |
success |= download_recording(download_url, email, filename) | |
# success = True | |
if success: | |
# if successful, write the ID of this recording to the completed file | |
with open(COMPLETED_MEETING_IDS_LOG, 'a') as log: | |
COMPLETED_MEETING_IDS.add(meeting_id) | |
log.write(meeting_id) | |
log.write('\n') | |
log.flush() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Does this download for all users or just your login?