Last active
April 23, 2024 21:01
-
-
Save leoherzog/f6ecb6d470ac24741ca9072a069f1b93 to your computer and use it in GitHub Desktop.
Bulk Download or Delete Zoom Recordings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# call main.py --user <user email> --download --delete | |
# | |
import os | |
import argparse | |
import json | |
from concurrent.futures import ThreadPoolExecutor | |
from datetime import datetime, timedelta | |
import time | |
import requests | |
from requests_oauthlib import OAuth2Session | |
import oauthlib.oauth2.rfc6749.errors | |
CLIENT_ID = 'clientidclientid' | |
CLIENT_SECRET = 'clientsecretclientsecret' | |
REDIRECT_URI = 'https://localhost:8000/callback' | |
TOKEN_FILE = 'token.json' | |
AUTHORIZATION_URL = 'https://zoom.us/oauth/authorize' | |
TOKEN_URL = 'https://zoom.us/oauth/token' | |
END = datetime.now() | |
START = END - timedelta(days=5*365) | |
def get_authorization_url(): | |
oauth2_session = OAuth2Session(CLIENT_ID, redirect_uri=REDIRECT_URI) | |
authorization_url, _ = oauth2_session.authorization_url(AUTHORIZATION_URL) | |
return authorization_url | |
def fetch_zoom_token(authorization_response): | |
oauth2_session = OAuth2Session(CLIENT_ID, redirect_uri=REDIRECT_URI) | |
token = oauth2_session.fetch_token(TOKEN_URL, authorization_response=authorization_response, client_secret=CLIENT_SECRET) | |
with open(TOKEN_FILE, 'w') as f: | |
json.dump(token, f) | |
return token | |
def load_token(): | |
try: | |
with open(TOKEN_FILE, 'r') as f: | |
return json.load(f) | |
except FileNotFoundError: | |
return None | |
def refresh_zoom_token(token): | |
extra = { | |
"client_id": CLIENT_ID, | |
"client_secret": CLIENT_SECRET, | |
} | |
oauth2_session = OAuth2Session(CLIENT_ID, token=token) | |
new_token = oauth2_session.refresh_token(TOKEN_URL, **extra) | |
with open(TOKEN_FILE, 'w') as f: | |
json.dump(new_token, f) | |
return new_token | |
def is_token_about_to_expire(token): | |
expires_at = token.get("expires_at", 0) - 60 # within 60 seconds | |
current_time = time.time() | |
return current_time >= expires_at | |
def list_meetings_with_recordings(access_token, user_id, start_date=START, end_date=END, page_size=300): | |
all_meetings = [] | |
current_date = start_date | |
while current_date <= end_date: | |
next_month_date = current_date + timedelta(days=30) | |
if next_month_date > end_date: | |
next_month_date = end_date | |
from_date_str = current_date.strftime('%Y-%m-%d') | |
to_date_str = next_month_date.strftime('%Y-%m-%d') | |
next_page_token = '' | |
while True: | |
base_url = f"https://api.zoom.us/v2/users/{user_id}/recordings?page_size={page_size}" | |
params = { | |
"from": from_date_str, | |
"to": to_date_str, | |
"next_page_token": next_page_token | |
} | |
headers = { | |
"Authorization": f"Bearer {access_token}" | |
} | |
response = requests.get(base_url, headers=headers, params=params) | |
if response.status_code == 429: | |
retry_after = int(response.headers.get('Retry-After', 10)) | |
print(f"Rate limit exceeded. Retrying in {retry_after} seconds.") | |
time.sleep(retry_after) | |
continue | |
json_response = response.json() | |
this_page_meetings = json_response.get('meetings', []) | |
all_meetings.extend(this_page_meetings) | |
next_page_token = json_response.get('next_page_token', None) | |
if not next_page_token: | |
break | |
current_date = next_month_date + timedelta(days=1) | |
return all_meetings | |
def download_single_recording(access_token, meeting_path, meeting_topic, file_info): | |
download_url = file_info.get('download_url') | |
recording_start = file_info.get('recording_start', '').replace(':', '-') | |
recording_type = file_info.get('recording_type') | |
file_extension = file_info.get('file_extension') | |
if download_url: | |
file_name = f"{meeting_topic} ({recording_start} {recording_type}).{file_extension}" | |
file_path = os.path.join(meeting_path, file_name) | |
headers = { | |
"Authorization": f"Bearer {access_token}" | |
} | |
if os.path.exists(file_path): | |
r_head = requests.head(download_url, headers=headers, allow_redirects=True) | |
reported_size = int(r_head.headers.get('Content-Length', 0)) | |
existing_file_size = os.path.getsize(file_path) | |
if existing_file_size == reported_size: | |
print(f"File {file_name} already exists. Verified.") | |
return | |
else: | |
print(f"Existing file {file_name} mismatch ({existing_file_size}/{reported_size}). Redownloading...") | |
with requests.get(download_url, headers=headers, stream=True) as r: | |
if r.status_code == 200: | |
with open(file_path, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"Download complete: {file_name}") | |
else: | |
print(f"Failed to download {file_name}") | |
def trash_all_recordings(access_token, recordings): | |
for meeting in recordings: | |
meeting_name = meeting.get('topic', 'Unknown Topic') | |
meeting_id = meeting.get('uuid') | |
if meeting_id: | |
trash_recording_url = f"https://api.zoom.us/v2/meetings/{meeting_id}/recordings?action=trash" | |
headers = { | |
"Authorization": f"Bearer {access_token}" | |
} | |
while True: | |
response = requests.delete(trash_recording_url, headers=headers) | |
if response.status_code == 429: | |
retry_after = int(response.headers.get('Retry-After', 10)) | |
print(f"Rate limit exceeded while trying to trash '{meeting_name}'. Retrying in {retry_after} seconds.") | |
time.sleep(retry_after) | |
continue | |
if response.status_code == 200 or response.status_code == 204: | |
print(f"Successfully trashed recordings for '{meeting_name}'") | |
break | |
else: | |
print(f"Failed to trash recordings for '{meeting_name}'. Status Code: {response.status_code}") | |
break | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='Zoom Cloud Recordings management.') | |
parser.add_argument('--user', type=str, help='The email ID of the user.') | |
parser.add_argument('--download', action='store_true', help='Flag to download recordings.') | |
parser.add_argument('--delete', action='store_true', help='Flag to delete recordings.') | |
args = parser.parse_args() | |
if args.user is None: | |
args.user = input("Please enter the user email: ") | |
while not args.user.strip(): | |
args.user = input("Email cannot be empty. Please enter the user email: ") | |
return args | |
def main(): | |
args = parse_args() | |
print("Refreshing token...") | |
token = load_token() | |
if token is None: | |
print(f"Visit this URL to authorize the application: {get_authorization_url()}") | |
authorization_response = input("Enter the full callback URL: ") | |
token = fetch_zoom_token(authorization_response) | |
else: | |
try: | |
token = refresh_zoom_token(token) | |
except oauthlib.oauth2.rfc6749.errors.InvalidGrantError: | |
print(f"Visit this URL to authorize the application: {get_authorization_url()}") | |
authorization_response = input("Enter the full callback URL: ") | |
fetch_zoom_token(authorization_response) | |
print("Fetching recordings...") | |
meetings = list_meetings_with_recordings(token['access_token'], args.user) | |
total_meetings = len(meetings) | |
print(f"Found {total_meetings} meetings with {sum([len(meeting.get('recording_files', [])) for meeting in meetings])} recordings.") | |
if args.download: | |
print('Starting download of recordings...') | |
with ThreadPoolExecutor(max_workers=4) as executor: | |
for meeting in meetings: | |
if is_token_about_to_expire(token): | |
print("Token is about to expire. Refreshing...") | |
token = refresh_zoom_token(token) | |
recording_files = meeting.get('recording_files', []) | |
meeting_topic = meeting.get('topic', 'Unknown Topic').replace('/', '-') | |
meeting_date = meeting.get('start_time', '').split('T')[0] | |
meeting_folder = f"{meeting_topic} ({meeting_date})" | |
meeting_path = os.path.join(args.user, meeting_folder) | |
if not os.path.exists(meeting_path): | |
os.makedirs(meeting_path) | |
executor.map(lambda recording_info, mp=meeting_path, mt=meeting_topic: download_single_recording(token['access_token'], mp, mt, recording_info), recording_files) | |
if args.delete: | |
confirm = input('Are you sure you want to trash all recordings? This action is irreversible. (y/n): ') | |
if confirm.lower() == 'y': | |
print('Starting trashing of recordings...') | |
trash_all_recordings(token['access_token'], meetings) | |
else: | |
print('Trashing of recordings cancelled.') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment