Skip to content

Instantly share code, notes, and snippets.

@qrkourier
Last active November 16, 2024 07:53
Show Gist options
  • Save qrkourier/afa835b0fe111ab4eb137862afc95e29 to your computer and use it in GitHub Desktop.
Save qrkourier/afa835b0fe111ab4eb137862afc95e29 to your computer and use it in GitHub Desktop.
download all videos from a Matrix room
#!/usr/bin/env python3
import os
import sys
import argparse
import requests
from urllib.parse import urljoin, quote
import base64
import hashlib
def get_access_token():
"""Get access token for Matrix API access"""
token = os.environ.get('MATRIX_ACCESS_TOKEN')
if not token:
print("\nError: No Matrix access token found")
print_token_instructions()
sys.exit(1)
# Clean the token
token = token.strip()
# Debug token format (showing only first/last few chars)
if len(token) > 10:
if args.debug:
print(f"DEBUG: Token format check - starts with: {token[:5]}, ends with: {token[-5:]}")
print(f"DEBUG: Token length: {len(token)}")
else:
if args.debug:
print("WARNING: Token seems too short")
return token
def print_token_instructions():
"""Print instructions for getting a Matrix access token"""
print("\nTo get an access token:")
print("1. Open Element in your web browser")
print("2. Click your avatar/name in the top-left corner")
print("3. Click 'All settings'")
print("4. Click 'Help & About' in the left sidebar")
print("5. Scroll down to the bottom")
print("6. Click 'Access Token'")
print("7. Enter your password when prompted")
print("8. Click 'Copy' to copy your access token")
print("\nThen set it in your environment:")
print("export MATRIX_ACCESS_TOKEN='your_token_here'")
print("\nNote: Access tokens are sensitive! They provide full access to your account.")
print("- Never share your access token")
print("- Don't commit it to version control")
print("- Generate a new one if you suspect it's been compromised")
def get_user_homeserver(token):
"""Get the user's home server from their token"""
try:
response = requests.get(
"https://matrix.org/_matrix/client/v3/account/whoami",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
user_id = response.json().get('user_id')
if user_id:
return user_id.split(':')[1]
except Exception as e:
print(f"Error getting home server: {e}")
return None
def validate_token(homeserver, token):
"""Validate the access token before making other requests"""
# First get the user's home server
home_server = get_user_homeserver(token)
if not home_server:
print("Could not determine home server from token")
return False
print(f"Using home server: https://{home_server}")
# Always validate against the user's home server
url = f"https://{home_server}/_matrix/client/v3/account/whoami"
try:
response = requests.get(url, headers={"Authorization": f"Bearer {token}"})
if response.status_code == 200:
user_info = response.json()
print(f"Successfully authenticated as: {user_info.get('user_id')}")
return True
else:
print(f"Authentication failed: {response.content.decode()}")
except Exception as e:
print(f"Error validating token: {e}")
return False
def resolve_room_alias(homeserver, room_alias):
"""Resolve a room alias to a room ID through client API or federation"""
token = get_access_token()
# First try using the client API with the user's token
user_homeserver = get_user_homeserver(token)
if user_homeserver:
if args.debug:
print(f"Trying client API with homeserver: {user_homeserver}")
# URL encode the room alias
encoded_alias = quote(room_alias)
client_url = f"https://{user_homeserver}/_matrix/client/v3/directory/room/{encoded_alias}"
try:
response = requests.get(client_url, headers={"Authorization": f"Bearer {token}"})
if response.status_code == 200:
data = response.json()
if args.debug:
print("Successfully resolved room alias through client API")
return data.get("room_id")
elif args.debug:
print(f"Client API failed: {response.status_code}")
try:
print(f"Error: {response.json()}")
except:
print(f"Response: {response.text}")
except Exception as e:
if args.debug:
print(f"Client API error: {e}")
# If client API fails, try federation API with various servers
servers_to_try = []
if homeserver:
servers_to_try.append(homeserver)
# Add common servers as fallbacks
servers_to_try.extend([
"https://matrix.org",
"https://minnix.dev",
"https://tchncs.de",
# Extract server from room alias and try it
f"https://{room_alias.split(':')[1]}" if ':' in room_alias else None
])
# Remove None and duplicates while preserving order
servers_to_try = list(dict.fromkeys(filter(None, servers_to_try)))
if args.debug:
print("\nTrying federation API with servers:", servers_to_try)
for server in servers_to_try:
if args.debug:
print(f"Trying federation API with: {server}")
# URL encode the room alias
encoded_alias = quote(room_alias)
fed_url = f"{server}/_matrix/federation/v1/query/directory?room_alias={encoded_alias}"
try:
response = requests.get(fed_url, timeout=10)
if response.status_code == 200:
data = response.json()
if args.debug:
print("Successfully resolved room alias through federation")
return data.get("room_id")
elif args.debug:
print(f"Federation API failed for {server}: {response.status_code}")
try:
print(f"Error: {response.json()}")
except:
print(f"Response: {response.text}")
except Exception as e:
if args.debug:
print(f"Federation API error with {server}: {e}")
print("Error: Could not resolve room alias. Please verify:")
print("1. The room alias is correct")
print("2. The room exists and is accessible")
print("3. You have permission to access the room")
print("4. The room's server is online and federating")
return None
def fetch_messages(homeserver, token, room_id, batch=None):
"""Fetch messages from a room through federation"""
# Get the user's home server
home_server = get_user_homeserver(token)
if not home_server:
print("Could not determine home server from token")
sys.exit(1)
# Always fetch through the user's home server
url = f"https://{home_server}/_matrix/client/v3/rooms/{room_id}/messages"
params = {
"dir": "b",
"limit": 100
}
if batch:
params["from"] = batch
try:
if args.debug:
print(f"\nFetching messages from {room_id}")
response = requests.get(
url,
params=params,
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
data = response.json()
if args.debug:
print(f"Found {len(data.get('chunk', []))} messages")
return data
else:
print(f"Error fetching messages: {response.content.decode()}")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
def scrub_token(text, token):
"""Remove access token from debug output"""
if not token or not text:
return text
return text.replace(token, "ACCESS_TOKEN_REDACTED")
def check_server_login(homeserver):
"""Check if the server's login endpoint is working"""
url = urljoin(homeserver, "/_matrix/client/r0/login")
try:
response = requests.get(url)
if response.status_code == 200:
flows = response.json().get('flows', [])
print("\nServer login flows:")
for flow in flows:
print(f"- {flow.get('type')}")
return True
except Exception as e:
print(f"\nError checking login endpoint: {e}")
return False
def discover_homeserver(domain):
"""Discover the actual Matrix homeserver URL using .well-known"""
print(f"\nDiscovering homeserver for domain: {domain}")
# First try .well-known
try:
well_known_url = f"https://{domain}/.well-known/matrix/client"
print(f"Checking well-known at: {well_known_url}")
response = requests.get(well_known_url)
if response.status_code == 200:
data = response.json()
homeserver = data.get("m.homeserver", {}).get("base_url")
if homeserver:
print(f"Found homeserver via well-known: {homeserver}")
return homeserver
else:
print("No homeserver URL in well-known response")
except Exception as e:
print(f"Error checking well-known: {e}")
# Try direct connection to possible homeserver URLs
possible_urls = [
f"https://matrix.{domain}",
f"https://{domain}",
f"https://synapse.{domain}"
]
for url in possible_urls:
try:
versions_url = urljoin(url, "/_matrix/client/versions")
print(f"\nTrying homeserver at: {url}")
print(f"Checking versions endpoint: {versions_url}")
response = requests.get(versions_url)
if response.status_code == 200:
print(f"Found working homeserver at: {url}")
print(f"Supported versions: {response.json()}")
return url
except Exception as e:
print(f"Failed to connect to {url}: {e}")
# Default fallback
default_url = f"https://{domain}"
print(f"\nNo working homeserver found, falling back to: {default_url}")
return default_url
def verify_token(homeserver, token):
"""Try to verify a token against a homeserver and get user info"""
paths = [
"/_matrix/client/v3/account/whoami",
"/_matrix/client/r0/account/whoami",
"/_synapse/client/v1/account/whoami"
]
print(f"\nVerifying token against {homeserver}")
print(f"Token starts with: {token[:5]}")
for path in paths:
url = urljoin(homeserver, path)
try:
# Try both header and query param
methods = [
("header", {"Authorization": f"Bearer {token}"}),
("query", {"access_token": token})
]
for method_name, auth in methods:
print(f"Trying {method_name} authentication...")
if method_name == "header":
response = requests.get(url, headers=auth)
else:
response = requests.get(url, params=auth)
if response.status_code == 200:
user_info = response.json()
print("\nSuccess! Token is valid for:")
print(f"Server: {homeserver}")
print(f"User ID: {user_info.get('user_id')}")
print(f"Device ID: {user_info.get('device_id')}")
return True
except Exception:
continue
print("\nToken verification failed.")
if hasattr(response, 'content'):
try:
error = response.json()
print(f"Error: {error.get('error')}")
print(f"Error code: {error.get('errcode')}")
except Exception:
print(f"Response: {response.content.decode()}")
return False
def decode_synapse_token(token):
"""Attempt to decode information from a Synapse token"""
print("\nAttempting to decode Synapse token:")
try:
# Remove syt_ prefix
token_part = token[4:]
# Split into parts
parts = token_part.split("_")
print(f"Token parts: {len(parts)}")
# Try different base64 padding lengths
for part in parts:
print(f"\nTrying to decode part: {part}")
for padding in ['', '=', '==', '===']:
try:
padded = part + padding
decoded = base64.b64decode(padded).decode('utf-8')
print(f"Successfully decoded: {decoded}")
except Exception:
continue
except Exception as e:
print(f"Error decoding token: {e}")
def inspect_token(token):
"""Try to determine information about a token"""
print("\nToken Information:")
print(f"Length: {len(token)}")
print(f"Prefix: {token[:10]}...")
if token.startswith('syt_'):
print("Type: Synapse token")
decode_synapse_token(token)
else:
print("Type: Standard Matrix token")
# Try some common homeservers
common_servers = [
"https://matrix.org",
"https://matrix.minnix.dev",
"https://minnix.dev",
"https://synapse.matrix.org",
"https://matrix.example.com" # Add any other servers you use
]
print("\nTrying to validate against common servers...")
for server in common_servers:
print(f"\nChecking {server}...")
try:
# Try both v3 and r0 endpoints
for version in ['v3', 'r0']:
url = urljoin(server, f"/_matrix/client/{version}/account/whoami")
# Try both auth methods
for method in ['header', 'query']:
try:
if method == 'header':
response = requests.get(url, headers={"Authorization": f"Bearer {token}"})
else:
response = requests.get(url, params={"access_token": token})
if response.status_code == 200:
user_info = response.json()
print("\nSUCCESS! Token is valid for:")
print(f"Server: {server}")
print(f"User ID: {user_info.get('user_id')}")
print(f"Device ID: {user_info.get('device_id', 'unknown')}")
return
elif response.status_code == 401:
error = response.json()
if 'error' in error:
print(f"{method.title()} auth failed: {error['error']}")
except Exception as e:
print(f"Error with {method} auth: {e}")
continue
except Exception as e:
print(f"Error checking {server}: {e}")
continue
print("\nCould not determine token's origin server.")
print("\nPossible issues:")
print("1. Token might be from a different server")
print("2. Token might have expired")
print("3. Account might have been logged out")
print("\nTry generating a new token while logged into the correct server.")
def get_unique_filename(filepath):
"""Get a unique filename by adding a serial number if necessary"""
if not os.path.exists(filepath):
return filepath
base, ext = os.path.splitext(filepath)
counter = 1
while True:
new_filepath = f"{base}-{counter}{ext}"
if not os.path.exists(new_filepath):
return new_filepath
counter += 1
def sanitize_filename(filename):
"""Sanitize filename by replacing spaces and special characters"""
# Replace spaces with hyphens
filename = filename.replace(' ', '-')
# Remove or replace special characters
filename = ''.join(c if c.isalnum() or c in '.-_' else '-' for c in filename)
# Collapse multiple hyphens
while '--' in filename:
filename = filename.replace('--', '-')
return filename
def get_url_hash(url):
"""Get a short hash of the URL for filename uniqueness"""
return hashlib.sha256(url.encode()).hexdigest()[:8]
def get_filename_from_url(url, content_type=''):
"""Generate a base filename from URL and content type"""
# Get the last part of the URL
filename = url.split('/')[-1]
# Add extension based on content type if needed
if content_type == 'video/mp4' and not filename.endswith('.mp4'):
filename += '.mp4'
elif content_type == 'video/webm' and not filename.endswith('.webm'):
filename += '.webm'
elif not os.path.splitext(filename)[1]:
filename += '.mp4' # Default to .mp4 if no extension
return filename
def get_unique_filename(url, base_filename, output_dir):
"""Get a unique filename using URL hash for deterministic naming"""
# Sanitize the base filename
base_filename = sanitize_filename(base_filename)
# Split filename into base and extension
base, ext = os.path.splitext(base_filename)
# Get URL hash
url_hash = get_url_hash(url)
# Create filename with hash
filename = f"{base}-{url_hash}{ext}"
return os.path.join(output_dir, filename)
def check_existing_file(filepath, url, expected_size):
"""Check if existing file matches the expected download"""
if not os.path.exists(filepath):
return False
# Get existing file size
existing_size = os.path.getsize(filepath)
# If sizes match, consider it the same file
return existing_size == expected_size
def download_video(url, token, output_dir, debug=False):
"""Download a video file"""
headers = {"Authorization": f"Bearer {token}"}
if debug:
print(f"\nDownloading video from: {url}")
print(f"Output directory: {output_dir}")
try:
# First make a HEAD request to get content info without downloading
head_response = requests.head(url, headers=headers, allow_redirects=True, timeout=10)
if head_response.status_code != 200:
if debug:
print(f"HEAD request failed with status {head_response.status_code}")
return None
# Get content info
content_type = head_response.headers.get('content-type', '')
expected_size = int(head_response.headers.get('content-length', 0))
# Get filename from Content-Disposition or URL
filename = None
if 'Content-Disposition' in head_response.headers:
cd = head_response.headers['Content-Disposition']
if 'filename=' in cd:
filename = cd.split('filename=')[1].strip('"')
if not filename:
filename = get_filename_from_url(url, content_type)
# Get deterministic filepath based on URL
filepath = get_unique_filename(url, filename, output_dir)
if debug:
print(f"Target filepath: {filepath}")
# Check if file already exists with matching size
if check_existing_file(filepath, url, expected_size):
if debug:
print(f"File already exists with matching size, skipping download: {filepath}")
return filepath
# File doesn't exist or size doesn't match, proceed with download
if debug:
print(f"Downloading to: {filepath}")
print(f"Content type: {content_type}")
print(f"Expected size: {expected_size} bytes")
# Stream download with timeout
response = requests.get(url, headers=headers, stream=True, timeout=10)
if response.status_code == 200:
with open(filepath, 'wb') as f:
if expected_size > 0:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192, decode_unicode=False):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if debug and expected_size > 0:
percent = (downloaded / expected_size) * 100
print(f"\rProgress: {percent:.1f}%", end='', flush=True)
if debug:
print() # New line after progress
else:
# If no content length, just write the data
for chunk in response.iter_content(chunk_size=8192, decode_unicode=False):
if chunk:
f.write(chunk)
if debug:
print(f"Download complete: {filepath}")
return filepath
else:
if debug:
print(f"Download failed with status {response.status_code}")
print(f"Response headers: {response.headers}")
return None
except requests.exceptions.Timeout:
if debug:
print("Request timed out")
return None
except requests.exceptions.RequestException as e:
if debug:
print(f"Request error: {e}")
return None
except Exception as e:
if debug:
print(f"Download error: {e}")
return None
def get_media_url(mxc_url, home_server, token, debug=False):
"""Convert an MXC URL to an HTTP URL for media access"""
# Remove the mxc:// prefix
if not mxc_url.startswith("mxc://"):
return None
# Extract server and media ID
parts = mxc_url[6:].split("/", 1) # Skip mxc:// and split into server/media_id
if len(parts) != 2:
return None
server_name, media_id = parts
# Try different endpoints and auth methods
endpoints = [
("v3", "download"),
("r0", "download"),
]
if debug:
print(f"\nOriginal MXC URL: {mxc_url}")
print(f"Server: {server_name}")
print(f"Media ID: {media_id}")
# Try the media server from the MXC URL first
media_servers = [
server_name, # The server hosting the media
home_server, # User's home server as fallback
]
for server in media_servers:
if debug:
print(f"\nTrying media server: {server}")
for version, endpoint in endpoints:
url = f"https://{server}/_matrix/media/{version}/{endpoint}/{server_name}/{media_id}"
if debug:
print(f"\nTrying URL: {url}")
# First try with Authorization header
success, error = test_media_url(url, token, debug)
if success:
if debug:
print("Success with Authorization header!")
return url
elif debug:
print(f"Failed with Authorization header: {error}")
# Try with access_token parameter
url_with_token = f"{url}?access_token={token}"
if debug:
print(f"\nTrying URL with token: {url_with_token}")
try:
response = requests.head(url_with_token, allow_redirects=True, timeout=10)
if response.status_code == 200:
if debug:
print("Success with access_token parameter!")
return url_with_token
elif debug:
print(f"Failed with status {response.status_code}")
print(f"Response headers: {response.headers}")
try:
print(f"Response body: {response.text}")
except Exception:
pass
except Exception as e:
if debug:
print(f"Error: {e}")
# Fall back to v3 with token parameter on media server
url = f"https://{server_name}/_matrix/media/v3/download/{server_name}/{media_id}?access_token={token}"
if debug:
print(f"\nFalling back to: {url}")
return url
def test_media_url(url, token, debug=False):
"""Test if a media URL is accessible"""
headers = {"Authorization": f"Bearer {token}"}
if debug:
print(f"\nTesting URL: {url}")
print(f"Headers: {headers}")
try:
# First try HEAD request with timeout
head_response = requests.head(url, headers=headers, allow_redirects=True, timeout=10)
if debug:
print(f"HEAD Status code: {head_response.status_code}")
print(f"HEAD Response headers: {head_response.headers}")
if head_response.status_code == 200:
return True, None
# If HEAD fails, try GET with timeout
response = requests.get(url, headers=headers, stream=True, timeout=10)
if debug:
print(f"GET Status code: {response.status_code}")
print(f"GET Response headers: {response.headers}")
print(f"Content type: {response.headers.get('content-type')}")
try:
print(f"Response body: {response.text[:200]}...") # First 200 chars
except:
pass
if response.status_code == 200:
# Read first few bytes to verify content with timeout
try:
chunk = next(response.iter_content(chunk_size=8192, decode_unicode=False))
if chunk: # Filter out keep-alive chunks
if debug:
print(f"Successfully read {len(chunk)} bytes")
print(f"First few bytes: {chunk[:20]}")
return True, None
except StopIteration:
return False, "Empty response"
except requests.exceptions.ReadTimeout:
return False, "Read timeout"
# Try to get error details
try:
error_details = response.json()
return False, f"HTTP {response.status_code}: {error_details}"
except Exception as e:
return False, f"HTTP {response.status_code}: {str(e)}"
except requests.exceptions.Timeout:
if debug:
print("Request timed out")
return False, "Request timed out"
except requests.exceptions.RequestException as e:
if debug:
print(f"Request error: {e}")
return False, str(e)
except Exception as e:
if debug:
print(f"Exception during test: {str(e)}")
return False, str(e)
def create_mpv_script(playlist_file, files):
"""Create a shell script to play videos with MPV"""
script_file = os.path.splitext(playlist_file)[0] + '.sh'
with open(script_file, 'w') as f:
f.write('#!/bin/bash\n\n')
f.write('# MPV playback script\n\n')
for file in files:
f.write(f'flatpak run io.mpv.Mpv "{file}"\n')
# Make script executable
os.chmod(script_file, 0o755)
return script_file
def update_status(message, final=False):
"""Update status line, with line feed only on final update"""
print(message, end='\n' if final else '\r', flush=True)
def clear_status():
"""Clear the status line"""
print(' ' * 80, end='\r', flush=True) # Clear line
def main():
parser = argparse.ArgumentParser(description="Fetch video attachments from Matrix room")
parser.add_argument("--homeserver", help="Matrix homeserver URL (optional, will be discovered from room alias)")
parser.add_argument("--room", help="Matrix room ID or alias (overrides MATRIX_ROOM environment variable)")
parser.add_argument("--debug", action="store_true", help="Show debug information about messages")
parser.add_argument("--limit", type=int, help="Limit the number of videos to fetch")
parser.add_argument("--offset", type=int, default=0, help="Number of videos to skip before counting toward limit")
parser.add_argument("--test", action="store_true", help="Test URLs before adding to playlist (only applies with --playlist)")
parser.add_argument("--mpv", action="store_true", help="Generate MPV script instead of VLC playlist (requires --playlist)")
parser.add_argument("--output-dir", default="/tmp/matrix-videos", help="Directory to save downloaded videos (default: /tmp/matrix-videos)")
parser.add_argument("--stream", action="store_true", help="Stream videos instead of downloading (requires --playlist)")
parser.add_argument("--playlist", help="Create playlist at specified path (e.g., videos.m3u)")
group = parser.add_mutually_exclusive_group()
group.add_argument("--whoami", action="store_true", help="Verify token and show user information")
group.add_argument("--inspect-token", action="store_true", help="Inspect the current token and try to determine its server")
global args
args = parser.parse_args()
if args.whoami:
# If no homeserver specified, try some common ones
servers_to_try = []
if args.homeserver:
servers_to_try.append(args.homeserver)
else:
print("No homeserver specified, trying to detect...")
servers_to_try.extend([
"https://matrix.minnix.dev",
"https://matrix.org",
"https://minnix.dev"
])
token = os.environ.get('MATRIX_ACCESS_TOKEN')
if not token:
print("No access token found in MATRIX_ACCESS_TOKEN")
sys.exit(1)
for server in servers_to_try:
if verify_token(server, token):
sys.exit(0)
print("\nCould not verify token against any known servers.")
print("Try specifying the correct homeserver with --homeserver")
sys.exit(1)
if args.inspect_token:
token = os.environ.get('MATRIX_ACCESS_TOKEN')
if not token:
print("No access token found in MATRIX_ACCESS_TOKEN")
sys.exit(1)
inspect_token(token)
sys.exit(0)
room = args.room if args.room else os.environ.get('MATRIX_ROOM')
if not room:
print("Error: No room specified. Either:")
print("1. Use --room option")
print("2. Set MATRIX_ROOM environment variable")
sys.exit(1)
token = get_access_token()
# Get the user's home server for federation
home_server = get_user_homeserver(token)
if not home_server:
print("Could not determine home server from token")
sys.exit(1)
# Resolve room alias if needed
if room.startswith('#'):
if args.debug:
print(f"Resolving room alias {room}...")
try:
room_id = resolve_room_alias(None, room)
if not room_id:
sys.exit(1)
except Exception as e:
print(f"Error resolving room: {e}")
sys.exit(1)
else:
room_id = room
video_urls = []
batch = None
total_messages = 0
video_messages = 0
total_found = 0
videos_to_process = 0 # Count after offset/limit
try:
# Keep fetching until we have enough videos for offset + limit
target_count = args.offset
if args.limit:
target_count += args.limit
while True:
data = fetch_messages(None, token, room_id, batch)
if not data["chunk"]:
break
total_messages += len(data["chunk"])
for event in data["chunk"]:
if args.debug:
print(f"\nMessage type: {event.get('type')}")
if event["type"] == "m.room.message":
print(f"Content type: {event['content'].get('msgtype')}")
print(f"MIME type: {event['content'].get('info', {}).get('mimetype', 'none')}")
if event["type"] == "m.room.message":
content = event["content"]
msgtype = content.get("msgtype")
mimetype = content.get("info", {}).get("mimetype", "")
if args.debug:
print(f"Checking message: {msgtype} / {mimetype}")
if "url" in content:
print(f"Has URL: {content['url']}")
if msgtype == "m.video" and mimetype.startswith("video/"):
video_messages += 1
if "url" in content:
media_url = get_media_url(content["url"], home_server, token, args.debug)
if media_url:
total_found += 1
if not args.debug and not args.playlist:
update_status(f"Found {total_found} videos...")
video_urls.append(media_url)
batch = data.get("end")
if not batch:
break
# Keep fetching if we need more videos
if target_count and len(video_urls) >= target_count:
break
if not args.debug:
update_status(f"Found {total_found} videos", final=True)
# Reverse the list to get oldest first
video_urls.reverse()
# Apply offset and limit
if args.offset and args.offset < len(video_urls):
if not args.debug:
update_status(f"Skipping {args.offset} videos...", final=True)
video_urls = video_urls[args.offset:]
if args.limit and args.limit < len(video_urls):
if not args.debug:
update_status(f"Limiting to {args.limit} videos...", final=True)
video_urls = video_urls[:args.limit]
videos_to_process = len(video_urls)
if not args.debug:
if args.offset or args.limit:
update_status(f"Processing {videos_to_process} videos...", final=True)
if args.debug:
print(f"\nAfter offset ({args.offset}) and limit ({args.limit}): {videos_to_process} videos")
if args.stream and not args.playlist:
print("Error: --stream requires --playlist")
sys.exit(1)
if args.mpv and not args.playlist:
print("Error: --mpv requires --playlist")
sys.exit(1)
if args.test and not args.playlist:
print("Error: --test requires --playlist")
sys.exit(1)
if args.playlist:
if args.stream:
if args.mpv:
# Create MPV script for streaming (not recommended)
script_file = create_mpv_script_stream(args.playlist, video_urls, token)
print(f"\nCreated MPV script: {script_file}")
print("\nWARNING: Streaming may not work reliably.")
print("Consider using download mode instead (remove --stream flag).")
print("\nTo play videos, run:")
print(f"./{os.path.basename(script_file)}")
else:
# VLC playlist format for streaming
with open(args.playlist, 'w') as f:
f.write("#EXTM3U\n")
for i, url in enumerate(video_urls, 1):
if not args.debug:
update_status(f"Building playlist {i}/{videos_to_process} ({i*100//videos_to_process}%)")
if args.test:
success, error = test_media_url(url, token, args.debug)
if not success:
if args.debug:
print(f"Warning: URL not accessible: {error}")
continue
f.write(f"#EXTVLCOPT:http-header=Authorization: Bearer {token}\n")
if "access_token=" not in url:
url = f"{url}?access_token={token}"
f.write(f"{url}\n")
if not args.debug:
update_status(f"Built streaming playlist with {videos_to_process} videos", final=True)
print(f"\nCreated VLC playlist: {args.playlist}")
print("\nWARNING: Streaming may not work reliably.")
print("Consider using download mode instead (remove --stream flag).")
print("\nTo play with VLC, run:")
print(f"vlc {args.playlist}")
else:
# Default behavior: download and create playlist
output_dir = os.path.expanduser(args.output_dir)
os.makedirs(output_dir, exist_ok=True)
# Download each video
downloaded_files = []
for i, url in enumerate(video_urls, 1):
if not args.debug:
update_status(f"Downloading {i}/{videos_to_process} ({i*100//videos_to_process}%)")
filepath = download_video(url, token, output_dir, args.debug)
if filepath:
downloaded_files.append(filepath)
if args.debug:
print(f"Downloaded: {filepath}")
if not args.debug:
update_status(f"Downloaded {len(downloaded_files)} videos", final=True)
if downloaded_files:
print(f"Files saved to: {output_dir}")
# Create playlist with local files
if args.mpv:
# Create MPV script
script_file = create_mpv_script(args.playlist, downloaded_files)
print(f"\nCreated MPV script: {script_file}")
print("\nTo play videos, run:")
print(f"./{os.path.basename(script_file)}")
else:
# Create VLC playlist
with open(args.playlist, 'w') as f:
f.write("#EXTM3U\n")
for i, filepath in enumerate(downloaded_files, 1):
if not args.debug:
update_status(f"Building playlist {i}/{len(downloaded_files)} ({i*100//len(downloaded_files)}%)")
f.write(f"{filepath}\n")
if not args.debug:
update_status(f"Built local playlist with {len(downloaded_files)} videos", final=True)
print(f"\nCreated playlist with downloaded files: {args.playlist}")
print("\nTo play with VLC, run:")
print(f"vlc {args.playlist}")
else:
# Just download without playlist
output_dir = os.path.expanduser(args.output_dir)
os.makedirs(output_dir, exist_ok=True)
# Download each video
downloaded_files = []
for i, url in enumerate(video_urls, 1):
if not args.debug:
update_status(f"Downloading {i}/{videos_to_process} ({i*100//videos_to_process}%)")
filepath = download_video(url, token, output_dir, args.debug)
if filepath:
downloaded_files.append(filepath)
if args.debug:
print(f"Downloaded: {filepath}")
if not args.debug:
update_status(f"Downloaded {len(downloaded_files)} videos", final=True)
if downloaded_files:
print(f"Files saved to: {output_dir}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment