Last active
November 16, 2024 07:53
-
-
Save qrkourier/afa835b0fe111ab4eb137862afc95e29 to your computer and use it in GitHub Desktop.
download all videos from a Matrix room
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import argparse | |
import requests | |
from urllib.parse import urljoin, quote | |
import base64 | |
import hashlib | |
def get_access_token(): | |
"""Get access token for Matrix API access""" | |
token = os.environ.get('MATRIX_ACCESS_TOKEN') | |
if not token: | |
print("\nError: No Matrix access token found") | |
print_token_instructions() | |
sys.exit(1) | |
# Clean the token | |
token = token.strip() | |
# Debug token format (showing only first/last few chars) | |
if len(token) > 10: | |
if args.debug: | |
print(f"DEBUG: Token format check - starts with: {token[:5]}, ends with: {token[-5:]}") | |
print(f"DEBUG: Token length: {len(token)}") | |
else: | |
if args.debug: | |
print("WARNING: Token seems too short") | |
return token | |
def print_token_instructions(): | |
"""Print instructions for getting a Matrix access token""" | |
print("\nTo get an access token:") | |
print("1. Open Element in your web browser") | |
print("2. Click your avatar/name in the top-left corner") | |
print("3. Click 'All settings'") | |
print("4. Click 'Help & About' in the left sidebar") | |
print("5. Scroll down to the bottom") | |
print("6. Click 'Access Token'") | |
print("7. Enter your password when prompted") | |
print("8. Click 'Copy' to copy your access token") | |
print("\nThen set it in your environment:") | |
print("export MATRIX_ACCESS_TOKEN='your_token_here'") | |
print("\nNote: Access tokens are sensitive! They provide full access to your account.") | |
print("- Never share your access token") | |
print("- Don't commit it to version control") | |
print("- Generate a new one if you suspect it's been compromised") | |
def get_user_homeserver(token): | |
"""Get the user's home server from their token""" | |
try: | |
response = requests.get( | |
"https://matrix.org/_matrix/client/v3/account/whoami", | |
headers={"Authorization": f"Bearer {token}"} | |
) | |
if response.status_code == 200: | |
user_id = response.json().get('user_id') | |
if user_id: | |
return user_id.split(':')[1] | |
except Exception as e: | |
print(f"Error getting home server: {e}") | |
return None | |
def validate_token(homeserver, token): | |
"""Validate the access token before making other requests""" | |
# First get the user's home server | |
home_server = get_user_homeserver(token) | |
if not home_server: | |
print("Could not determine home server from token") | |
return False | |
print(f"Using home server: https://{home_server}") | |
# Always validate against the user's home server | |
url = f"https://{home_server}/_matrix/client/v3/account/whoami" | |
try: | |
response = requests.get(url, headers={"Authorization": f"Bearer {token}"}) | |
if response.status_code == 200: | |
user_info = response.json() | |
print(f"Successfully authenticated as: {user_info.get('user_id')}") | |
return True | |
else: | |
print(f"Authentication failed: {response.content.decode()}") | |
except Exception as e: | |
print(f"Error validating token: {e}") | |
return False | |
def resolve_room_alias(homeserver, room_alias): | |
"""Resolve a room alias to a room ID through client API or federation""" | |
token = get_access_token() | |
# First try using the client API with the user's token | |
user_homeserver = get_user_homeserver(token) | |
if user_homeserver: | |
if args.debug: | |
print(f"Trying client API with homeserver: {user_homeserver}") | |
# URL encode the room alias | |
encoded_alias = quote(room_alias) | |
client_url = f"https://{user_homeserver}/_matrix/client/v3/directory/room/{encoded_alias}" | |
try: | |
response = requests.get(client_url, headers={"Authorization": f"Bearer {token}"}) | |
if response.status_code == 200: | |
data = response.json() | |
if args.debug: | |
print("Successfully resolved room alias through client API") | |
return data.get("room_id") | |
elif args.debug: | |
print(f"Client API failed: {response.status_code}") | |
try: | |
print(f"Error: {response.json()}") | |
except: | |
print(f"Response: {response.text}") | |
except Exception as e: | |
if args.debug: | |
print(f"Client API error: {e}") | |
# If client API fails, try federation API with various servers | |
servers_to_try = [] | |
if homeserver: | |
servers_to_try.append(homeserver) | |
# Add common servers as fallbacks | |
servers_to_try.extend([ | |
"https://matrix.org", | |
"https://minnix.dev", | |
"https://tchncs.de", | |
# Extract server from room alias and try it | |
f"https://{room_alias.split(':')[1]}" if ':' in room_alias else None | |
]) | |
# Remove None and duplicates while preserving order | |
servers_to_try = list(dict.fromkeys(filter(None, servers_to_try))) | |
if args.debug: | |
print("\nTrying federation API with servers:", servers_to_try) | |
for server in servers_to_try: | |
if args.debug: | |
print(f"Trying federation API with: {server}") | |
# URL encode the room alias | |
encoded_alias = quote(room_alias) | |
fed_url = f"{server}/_matrix/federation/v1/query/directory?room_alias={encoded_alias}" | |
try: | |
response = requests.get(fed_url, timeout=10) | |
if response.status_code == 200: | |
data = response.json() | |
if args.debug: | |
print("Successfully resolved room alias through federation") | |
return data.get("room_id") | |
elif args.debug: | |
print(f"Federation API failed for {server}: {response.status_code}") | |
try: | |
print(f"Error: {response.json()}") | |
except: | |
print(f"Response: {response.text}") | |
except Exception as e: | |
if args.debug: | |
print(f"Federation API error with {server}: {e}") | |
print("Error: Could not resolve room alias. Please verify:") | |
print("1. The room alias is correct") | |
print("2. The room exists and is accessible") | |
print("3. You have permission to access the room") | |
print("4. The room's server is online and federating") | |
return None | |
def fetch_messages(homeserver, token, room_id, batch=None): | |
"""Fetch messages from a room through federation""" | |
# Get the user's home server | |
home_server = get_user_homeserver(token) | |
if not home_server: | |
print("Could not determine home server from token") | |
sys.exit(1) | |
# Always fetch through the user's home server | |
url = f"https://{home_server}/_matrix/client/v3/rooms/{room_id}/messages" | |
params = { | |
"dir": "b", | |
"limit": 100 | |
} | |
if batch: | |
params["from"] = batch | |
try: | |
if args.debug: | |
print(f"\nFetching messages from {room_id}") | |
response = requests.get( | |
url, | |
params=params, | |
headers={"Authorization": f"Bearer {token}"} | |
) | |
if response.status_code == 200: | |
data = response.json() | |
if args.debug: | |
print(f"Found {len(data.get('chunk', []))} messages") | |
return data | |
else: | |
print(f"Error fetching messages: {response.content.decode()}") | |
sys.exit(1) | |
except Exception as e: | |
print(f"Error: {e}") | |
sys.exit(1) | |
def scrub_token(text, token): | |
"""Remove access token from debug output""" | |
if not token or not text: | |
return text | |
return text.replace(token, "ACCESS_TOKEN_REDACTED") | |
def check_server_login(homeserver): | |
"""Check if the server's login endpoint is working""" | |
url = urljoin(homeserver, "/_matrix/client/r0/login") | |
try: | |
response = requests.get(url) | |
if response.status_code == 200: | |
flows = response.json().get('flows', []) | |
print("\nServer login flows:") | |
for flow in flows: | |
print(f"- {flow.get('type')}") | |
return True | |
except Exception as e: | |
print(f"\nError checking login endpoint: {e}") | |
return False | |
def discover_homeserver(domain): | |
"""Discover the actual Matrix homeserver URL using .well-known""" | |
print(f"\nDiscovering homeserver for domain: {domain}") | |
# First try .well-known | |
try: | |
well_known_url = f"https://{domain}/.well-known/matrix/client" | |
print(f"Checking well-known at: {well_known_url}") | |
response = requests.get(well_known_url) | |
if response.status_code == 200: | |
data = response.json() | |
homeserver = data.get("m.homeserver", {}).get("base_url") | |
if homeserver: | |
print(f"Found homeserver via well-known: {homeserver}") | |
return homeserver | |
else: | |
print("No homeserver URL in well-known response") | |
except Exception as e: | |
print(f"Error checking well-known: {e}") | |
# Try direct connection to possible homeserver URLs | |
possible_urls = [ | |
f"https://matrix.{domain}", | |
f"https://{domain}", | |
f"https://synapse.{domain}" | |
] | |
for url in possible_urls: | |
try: | |
versions_url = urljoin(url, "/_matrix/client/versions") | |
print(f"\nTrying homeserver at: {url}") | |
print(f"Checking versions endpoint: {versions_url}") | |
response = requests.get(versions_url) | |
if response.status_code == 200: | |
print(f"Found working homeserver at: {url}") | |
print(f"Supported versions: {response.json()}") | |
return url | |
except Exception as e: | |
print(f"Failed to connect to {url}: {e}") | |
# Default fallback | |
default_url = f"https://{domain}" | |
print(f"\nNo working homeserver found, falling back to: {default_url}") | |
return default_url | |
def verify_token(homeserver, token): | |
"""Try to verify a token against a homeserver and get user info""" | |
paths = [ | |
"/_matrix/client/v3/account/whoami", | |
"/_matrix/client/r0/account/whoami", | |
"/_synapse/client/v1/account/whoami" | |
] | |
print(f"\nVerifying token against {homeserver}") | |
print(f"Token starts with: {token[:5]}") | |
for path in paths: | |
url = urljoin(homeserver, path) | |
try: | |
# Try both header and query param | |
methods = [ | |
("header", {"Authorization": f"Bearer {token}"}), | |
("query", {"access_token": token}) | |
] | |
for method_name, auth in methods: | |
print(f"Trying {method_name} authentication...") | |
if method_name == "header": | |
response = requests.get(url, headers=auth) | |
else: | |
response = requests.get(url, params=auth) | |
if response.status_code == 200: | |
user_info = response.json() | |
print("\nSuccess! Token is valid for:") | |
print(f"Server: {homeserver}") | |
print(f"User ID: {user_info.get('user_id')}") | |
print(f"Device ID: {user_info.get('device_id')}") | |
return True | |
except Exception: | |
continue | |
print("\nToken verification failed.") | |
if hasattr(response, 'content'): | |
try: | |
error = response.json() | |
print(f"Error: {error.get('error')}") | |
print(f"Error code: {error.get('errcode')}") | |
except Exception: | |
print(f"Response: {response.content.decode()}") | |
return False | |
def decode_synapse_token(token): | |
"""Attempt to decode information from a Synapse token""" | |
print("\nAttempting to decode Synapse token:") | |
try: | |
# Remove syt_ prefix | |
token_part = token[4:] | |
# Split into parts | |
parts = token_part.split("_") | |
print(f"Token parts: {len(parts)}") | |
# Try different base64 padding lengths | |
for part in parts: | |
print(f"\nTrying to decode part: {part}") | |
for padding in ['', '=', '==', '===']: | |
try: | |
padded = part + padding | |
decoded = base64.b64decode(padded).decode('utf-8') | |
print(f"Successfully decoded: {decoded}") | |
except Exception: | |
continue | |
except Exception as e: | |
print(f"Error decoding token: {e}") | |
def inspect_token(token): | |
"""Try to determine information about a token""" | |
print("\nToken Information:") | |
print(f"Length: {len(token)}") | |
print(f"Prefix: {token[:10]}...") | |
if token.startswith('syt_'): | |
print("Type: Synapse token") | |
decode_synapse_token(token) | |
else: | |
print("Type: Standard Matrix token") | |
# Try some common homeservers | |
common_servers = [ | |
"https://matrix.org", | |
"https://matrix.minnix.dev", | |
"https://minnix.dev", | |
"https://synapse.matrix.org", | |
"https://matrix.example.com" # Add any other servers you use | |
] | |
print("\nTrying to validate against common servers...") | |
for server in common_servers: | |
print(f"\nChecking {server}...") | |
try: | |
# Try both v3 and r0 endpoints | |
for version in ['v3', 'r0']: | |
url = urljoin(server, f"/_matrix/client/{version}/account/whoami") | |
# Try both auth methods | |
for method in ['header', 'query']: | |
try: | |
if method == 'header': | |
response = requests.get(url, headers={"Authorization": f"Bearer {token}"}) | |
else: | |
response = requests.get(url, params={"access_token": token}) | |
if response.status_code == 200: | |
user_info = response.json() | |
print("\nSUCCESS! Token is valid for:") | |
print(f"Server: {server}") | |
print(f"User ID: {user_info.get('user_id')}") | |
print(f"Device ID: {user_info.get('device_id', 'unknown')}") | |
return | |
elif response.status_code == 401: | |
error = response.json() | |
if 'error' in error: | |
print(f"{method.title()} auth failed: {error['error']}") | |
except Exception as e: | |
print(f"Error with {method} auth: {e}") | |
continue | |
except Exception as e: | |
print(f"Error checking {server}: {e}") | |
continue | |
print("\nCould not determine token's origin server.") | |
print("\nPossible issues:") | |
print("1. Token might be from a different server") | |
print("2. Token might have expired") | |
print("3. Account might have been logged out") | |
print("\nTry generating a new token while logged into the correct server.") | |
def get_unique_filename(filepath): | |
"""Get a unique filename by adding a serial number if necessary""" | |
if not os.path.exists(filepath): | |
return filepath | |
base, ext = os.path.splitext(filepath) | |
counter = 1 | |
while True: | |
new_filepath = f"{base}-{counter}{ext}" | |
if not os.path.exists(new_filepath): | |
return new_filepath | |
counter += 1 | |
def sanitize_filename(filename): | |
"""Sanitize filename by replacing spaces and special characters""" | |
# Replace spaces with hyphens | |
filename = filename.replace(' ', '-') | |
# Remove or replace special characters | |
filename = ''.join(c if c.isalnum() or c in '.-_' else '-' for c in filename) | |
# Collapse multiple hyphens | |
while '--' in filename: | |
filename = filename.replace('--', '-') | |
return filename | |
def get_url_hash(url): | |
"""Get a short hash of the URL for filename uniqueness""" | |
return hashlib.sha256(url.encode()).hexdigest()[:8] | |
def get_filename_from_url(url, content_type=''): | |
"""Generate a base filename from URL and content type""" | |
# Get the last part of the URL | |
filename = url.split('/')[-1] | |
# Add extension based on content type if needed | |
if content_type == 'video/mp4' and not filename.endswith('.mp4'): | |
filename += '.mp4' | |
elif content_type == 'video/webm' and not filename.endswith('.webm'): | |
filename += '.webm' | |
elif not os.path.splitext(filename)[1]: | |
filename += '.mp4' # Default to .mp4 if no extension | |
return filename | |
def get_unique_filename(url, base_filename, output_dir): | |
"""Get a unique filename using URL hash for deterministic naming""" | |
# Sanitize the base filename | |
base_filename = sanitize_filename(base_filename) | |
# Split filename into base and extension | |
base, ext = os.path.splitext(base_filename) | |
# Get URL hash | |
url_hash = get_url_hash(url) | |
# Create filename with hash | |
filename = f"{base}-{url_hash}{ext}" | |
return os.path.join(output_dir, filename) | |
def check_existing_file(filepath, url, expected_size): | |
"""Check if existing file matches the expected download""" | |
if not os.path.exists(filepath): | |
return False | |
# Get existing file size | |
existing_size = os.path.getsize(filepath) | |
# If sizes match, consider it the same file | |
return existing_size == expected_size | |
def download_video(url, token, output_dir, debug=False): | |
"""Download a video file""" | |
headers = {"Authorization": f"Bearer {token}"} | |
if debug: | |
print(f"\nDownloading video from: {url}") | |
print(f"Output directory: {output_dir}") | |
try: | |
# First make a HEAD request to get content info without downloading | |
head_response = requests.head(url, headers=headers, allow_redirects=True, timeout=10) | |
if head_response.status_code != 200: | |
if debug: | |
print(f"HEAD request failed with status {head_response.status_code}") | |
return None | |
# Get content info | |
content_type = head_response.headers.get('content-type', '') | |
expected_size = int(head_response.headers.get('content-length', 0)) | |
# Get filename from Content-Disposition or URL | |
filename = None | |
if 'Content-Disposition' in head_response.headers: | |
cd = head_response.headers['Content-Disposition'] | |
if 'filename=' in cd: | |
filename = cd.split('filename=')[1].strip('"') | |
if not filename: | |
filename = get_filename_from_url(url, content_type) | |
# Get deterministic filepath based on URL | |
filepath = get_unique_filename(url, filename, output_dir) | |
if debug: | |
print(f"Target filepath: {filepath}") | |
# Check if file already exists with matching size | |
if check_existing_file(filepath, url, expected_size): | |
if debug: | |
print(f"File already exists with matching size, skipping download: {filepath}") | |
return filepath | |
# File doesn't exist or size doesn't match, proceed with download | |
if debug: | |
print(f"Downloading to: {filepath}") | |
print(f"Content type: {content_type}") | |
print(f"Expected size: {expected_size} bytes") | |
# Stream download with timeout | |
response = requests.get(url, headers=headers, stream=True, timeout=10) | |
if response.status_code == 200: | |
with open(filepath, 'wb') as f: | |
if expected_size > 0: | |
downloaded = 0 | |
for chunk in response.iter_content(chunk_size=8192, decode_unicode=False): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if debug and expected_size > 0: | |
percent = (downloaded / expected_size) * 100 | |
print(f"\rProgress: {percent:.1f}%", end='', flush=True) | |
if debug: | |
print() # New line after progress | |
else: | |
# If no content length, just write the data | |
for chunk in response.iter_content(chunk_size=8192, decode_unicode=False): | |
if chunk: | |
f.write(chunk) | |
if debug: | |
print(f"Download complete: {filepath}") | |
return filepath | |
else: | |
if debug: | |
print(f"Download failed with status {response.status_code}") | |
print(f"Response headers: {response.headers}") | |
return None | |
except requests.exceptions.Timeout: | |
if debug: | |
print("Request timed out") | |
return None | |
except requests.exceptions.RequestException as e: | |
if debug: | |
print(f"Request error: {e}") | |
return None | |
except Exception as e: | |
if debug: | |
print(f"Download error: {e}") | |
return None | |
def get_media_url(mxc_url, home_server, token, debug=False): | |
"""Convert an MXC URL to an HTTP URL for media access""" | |
# Remove the mxc:// prefix | |
if not mxc_url.startswith("mxc://"): | |
return None | |
# Extract server and media ID | |
parts = mxc_url[6:].split("/", 1) # Skip mxc:// and split into server/media_id | |
if len(parts) != 2: | |
return None | |
server_name, media_id = parts | |
# Try different endpoints and auth methods | |
endpoints = [ | |
("v3", "download"), | |
("r0", "download"), | |
] | |
if debug: | |
print(f"\nOriginal MXC URL: {mxc_url}") | |
print(f"Server: {server_name}") | |
print(f"Media ID: {media_id}") | |
# Try the media server from the MXC URL first | |
media_servers = [ | |
server_name, # The server hosting the media | |
home_server, # User's home server as fallback | |
] | |
for server in media_servers: | |
if debug: | |
print(f"\nTrying media server: {server}") | |
for version, endpoint in endpoints: | |
url = f"https://{server}/_matrix/media/{version}/{endpoint}/{server_name}/{media_id}" | |
if debug: | |
print(f"\nTrying URL: {url}") | |
# First try with Authorization header | |
success, error = test_media_url(url, token, debug) | |
if success: | |
if debug: | |
print("Success with Authorization header!") | |
return url | |
elif debug: | |
print(f"Failed with Authorization header: {error}") | |
# Try with access_token parameter | |
url_with_token = f"{url}?access_token={token}" | |
if debug: | |
print(f"\nTrying URL with token: {url_with_token}") | |
try: | |
response = requests.head(url_with_token, allow_redirects=True, timeout=10) | |
if response.status_code == 200: | |
if debug: | |
print("Success with access_token parameter!") | |
return url_with_token | |
elif debug: | |
print(f"Failed with status {response.status_code}") | |
print(f"Response headers: {response.headers}") | |
try: | |
print(f"Response body: {response.text}") | |
except Exception: | |
pass | |
except Exception as e: | |
if debug: | |
print(f"Error: {e}") | |
# Fall back to v3 with token parameter on media server | |
url = f"https://{server_name}/_matrix/media/v3/download/{server_name}/{media_id}?access_token={token}" | |
if debug: | |
print(f"\nFalling back to: {url}") | |
return url | |
def test_media_url(url, token, debug=False): | |
"""Test if a media URL is accessible""" | |
headers = {"Authorization": f"Bearer {token}"} | |
if debug: | |
print(f"\nTesting URL: {url}") | |
print(f"Headers: {headers}") | |
try: | |
# First try HEAD request with timeout | |
head_response = requests.head(url, headers=headers, allow_redirects=True, timeout=10) | |
if debug: | |
print(f"HEAD Status code: {head_response.status_code}") | |
print(f"HEAD Response headers: {head_response.headers}") | |
if head_response.status_code == 200: | |
return True, None | |
# If HEAD fails, try GET with timeout | |
response = requests.get(url, headers=headers, stream=True, timeout=10) | |
if debug: | |
print(f"GET Status code: {response.status_code}") | |
print(f"GET Response headers: {response.headers}") | |
print(f"Content type: {response.headers.get('content-type')}") | |
try: | |
print(f"Response body: {response.text[:200]}...") # First 200 chars | |
except: | |
pass | |
if response.status_code == 200: | |
# Read first few bytes to verify content with timeout | |
try: | |
chunk = next(response.iter_content(chunk_size=8192, decode_unicode=False)) | |
if chunk: # Filter out keep-alive chunks | |
if debug: | |
print(f"Successfully read {len(chunk)} bytes") | |
print(f"First few bytes: {chunk[:20]}") | |
return True, None | |
except StopIteration: | |
return False, "Empty response" | |
except requests.exceptions.ReadTimeout: | |
return False, "Read timeout" | |
# Try to get error details | |
try: | |
error_details = response.json() | |
return False, f"HTTP {response.status_code}: {error_details}" | |
except Exception as e: | |
return False, f"HTTP {response.status_code}: {str(e)}" | |
except requests.exceptions.Timeout: | |
if debug: | |
print("Request timed out") | |
return False, "Request timed out" | |
except requests.exceptions.RequestException as e: | |
if debug: | |
print(f"Request error: {e}") | |
return False, str(e) | |
except Exception as e: | |
if debug: | |
print(f"Exception during test: {str(e)}") | |
return False, str(e) | |
def create_mpv_script(playlist_file, files): | |
"""Create a shell script to play videos with MPV""" | |
script_file = os.path.splitext(playlist_file)[0] + '.sh' | |
with open(script_file, 'w') as f: | |
f.write('#!/bin/bash\n\n') | |
f.write('# MPV playback script\n\n') | |
for file in files: | |
f.write(f'flatpak run io.mpv.Mpv "{file}"\n') | |
# Make script executable | |
os.chmod(script_file, 0o755) | |
return script_file | |
def update_status(message, final=False): | |
"""Update status line, with line feed only on final update""" | |
print(message, end='\n' if final else '\r', flush=True) | |
def clear_status(): | |
"""Clear the status line""" | |
print(' ' * 80, end='\r', flush=True) # Clear line | |
def main(): | |
parser = argparse.ArgumentParser(description="Fetch video attachments from Matrix room") | |
parser.add_argument("--homeserver", help="Matrix homeserver URL (optional, will be discovered from room alias)") | |
parser.add_argument("--room", help="Matrix room ID or alias (overrides MATRIX_ROOM environment variable)") | |
parser.add_argument("--debug", action="store_true", help="Show debug information about messages") | |
parser.add_argument("--limit", type=int, help="Limit the number of videos to fetch") | |
parser.add_argument("--offset", type=int, default=0, help="Number of videos to skip before counting toward limit") | |
parser.add_argument("--test", action="store_true", help="Test URLs before adding to playlist (only applies with --playlist)") | |
parser.add_argument("--mpv", action="store_true", help="Generate MPV script instead of VLC playlist (requires --playlist)") | |
parser.add_argument("--output-dir", default="/tmp/matrix-videos", help="Directory to save downloaded videos (default: /tmp/matrix-videos)") | |
parser.add_argument("--stream", action="store_true", help="Stream videos instead of downloading (requires --playlist)") | |
parser.add_argument("--playlist", help="Create playlist at specified path (e.g., videos.m3u)") | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument("--whoami", action="store_true", help="Verify token and show user information") | |
group.add_argument("--inspect-token", action="store_true", help="Inspect the current token and try to determine its server") | |
global args | |
args = parser.parse_args() | |
if args.whoami: | |
# If no homeserver specified, try some common ones | |
servers_to_try = [] | |
if args.homeserver: | |
servers_to_try.append(args.homeserver) | |
else: | |
print("No homeserver specified, trying to detect...") | |
servers_to_try.extend([ | |
"https://matrix.minnix.dev", | |
"https://matrix.org", | |
"https://minnix.dev" | |
]) | |
token = os.environ.get('MATRIX_ACCESS_TOKEN') | |
if not token: | |
print("No access token found in MATRIX_ACCESS_TOKEN") | |
sys.exit(1) | |
for server in servers_to_try: | |
if verify_token(server, token): | |
sys.exit(0) | |
print("\nCould not verify token against any known servers.") | |
print("Try specifying the correct homeserver with --homeserver") | |
sys.exit(1) | |
if args.inspect_token: | |
token = os.environ.get('MATRIX_ACCESS_TOKEN') | |
if not token: | |
print("No access token found in MATRIX_ACCESS_TOKEN") | |
sys.exit(1) | |
inspect_token(token) | |
sys.exit(0) | |
room = args.room if args.room else os.environ.get('MATRIX_ROOM') | |
if not room: | |
print("Error: No room specified. Either:") | |
print("1. Use --room option") | |
print("2. Set MATRIX_ROOM environment variable") | |
sys.exit(1) | |
token = get_access_token() | |
# Get the user's home server for federation | |
home_server = get_user_homeserver(token) | |
if not home_server: | |
print("Could not determine home server from token") | |
sys.exit(1) | |
# Resolve room alias if needed | |
if room.startswith('#'): | |
if args.debug: | |
print(f"Resolving room alias {room}...") | |
try: | |
room_id = resolve_room_alias(None, room) | |
if not room_id: | |
sys.exit(1) | |
except Exception as e: | |
print(f"Error resolving room: {e}") | |
sys.exit(1) | |
else: | |
room_id = room | |
video_urls = [] | |
batch = None | |
total_messages = 0 | |
video_messages = 0 | |
total_found = 0 | |
videos_to_process = 0 # Count after offset/limit | |
try: | |
# Keep fetching until we have enough videos for offset + limit | |
target_count = args.offset | |
if args.limit: | |
target_count += args.limit | |
while True: | |
data = fetch_messages(None, token, room_id, batch) | |
if not data["chunk"]: | |
break | |
total_messages += len(data["chunk"]) | |
for event in data["chunk"]: | |
if args.debug: | |
print(f"\nMessage type: {event.get('type')}") | |
if event["type"] == "m.room.message": | |
print(f"Content type: {event['content'].get('msgtype')}") | |
print(f"MIME type: {event['content'].get('info', {}).get('mimetype', 'none')}") | |
if event["type"] == "m.room.message": | |
content = event["content"] | |
msgtype = content.get("msgtype") | |
mimetype = content.get("info", {}).get("mimetype", "") | |
if args.debug: | |
print(f"Checking message: {msgtype} / {mimetype}") | |
if "url" in content: | |
print(f"Has URL: {content['url']}") | |
if msgtype == "m.video" and mimetype.startswith("video/"): | |
video_messages += 1 | |
if "url" in content: | |
media_url = get_media_url(content["url"], home_server, token, args.debug) | |
if media_url: | |
total_found += 1 | |
if not args.debug and not args.playlist: | |
update_status(f"Found {total_found} videos...") | |
video_urls.append(media_url) | |
batch = data.get("end") | |
if not batch: | |
break | |
# Keep fetching if we need more videos | |
if target_count and len(video_urls) >= target_count: | |
break | |
if not args.debug: | |
update_status(f"Found {total_found} videos", final=True) | |
# Reverse the list to get oldest first | |
video_urls.reverse() | |
# Apply offset and limit | |
if args.offset and args.offset < len(video_urls): | |
if not args.debug: | |
update_status(f"Skipping {args.offset} videos...", final=True) | |
video_urls = video_urls[args.offset:] | |
if args.limit and args.limit < len(video_urls): | |
if not args.debug: | |
update_status(f"Limiting to {args.limit} videos...", final=True) | |
video_urls = video_urls[:args.limit] | |
videos_to_process = len(video_urls) | |
if not args.debug: | |
if args.offset or args.limit: | |
update_status(f"Processing {videos_to_process} videos...", final=True) | |
if args.debug: | |
print(f"\nAfter offset ({args.offset}) and limit ({args.limit}): {videos_to_process} videos") | |
if args.stream and not args.playlist: | |
print("Error: --stream requires --playlist") | |
sys.exit(1) | |
if args.mpv and not args.playlist: | |
print("Error: --mpv requires --playlist") | |
sys.exit(1) | |
if args.test and not args.playlist: | |
print("Error: --test requires --playlist") | |
sys.exit(1) | |
if args.playlist: | |
if args.stream: | |
if args.mpv: | |
# Create MPV script for streaming (not recommended) | |
script_file = create_mpv_script_stream(args.playlist, video_urls, token) | |
print(f"\nCreated MPV script: {script_file}") | |
print("\nWARNING: Streaming may not work reliably.") | |
print("Consider using download mode instead (remove --stream flag).") | |
print("\nTo play videos, run:") | |
print(f"./{os.path.basename(script_file)}") | |
else: | |
# VLC playlist format for streaming | |
with open(args.playlist, 'w') as f: | |
f.write("#EXTM3U\n") | |
for i, url in enumerate(video_urls, 1): | |
if not args.debug: | |
update_status(f"Building playlist {i}/{videos_to_process} ({i*100//videos_to_process}%)") | |
if args.test: | |
success, error = test_media_url(url, token, args.debug) | |
if not success: | |
if args.debug: | |
print(f"Warning: URL not accessible: {error}") | |
continue | |
f.write(f"#EXTVLCOPT:http-header=Authorization: Bearer {token}\n") | |
if "access_token=" not in url: | |
url = f"{url}?access_token={token}" | |
f.write(f"{url}\n") | |
if not args.debug: | |
update_status(f"Built streaming playlist with {videos_to_process} videos", final=True) | |
print(f"\nCreated VLC playlist: {args.playlist}") | |
print("\nWARNING: Streaming may not work reliably.") | |
print("Consider using download mode instead (remove --stream flag).") | |
print("\nTo play with VLC, run:") | |
print(f"vlc {args.playlist}") | |
else: | |
# Default behavior: download and create playlist | |
output_dir = os.path.expanduser(args.output_dir) | |
os.makedirs(output_dir, exist_ok=True) | |
# Download each video | |
downloaded_files = [] | |
for i, url in enumerate(video_urls, 1): | |
if not args.debug: | |
update_status(f"Downloading {i}/{videos_to_process} ({i*100//videos_to_process}%)") | |
filepath = download_video(url, token, output_dir, args.debug) | |
if filepath: | |
downloaded_files.append(filepath) | |
if args.debug: | |
print(f"Downloaded: {filepath}") | |
if not args.debug: | |
update_status(f"Downloaded {len(downloaded_files)} videos", final=True) | |
if downloaded_files: | |
print(f"Files saved to: {output_dir}") | |
# Create playlist with local files | |
if args.mpv: | |
# Create MPV script | |
script_file = create_mpv_script(args.playlist, downloaded_files) | |
print(f"\nCreated MPV script: {script_file}") | |
print("\nTo play videos, run:") | |
print(f"./{os.path.basename(script_file)}") | |
else: | |
# Create VLC playlist | |
with open(args.playlist, 'w') as f: | |
f.write("#EXTM3U\n") | |
for i, filepath in enumerate(downloaded_files, 1): | |
if not args.debug: | |
update_status(f"Building playlist {i}/{len(downloaded_files)} ({i*100//len(downloaded_files)}%)") | |
f.write(f"{filepath}\n") | |
if not args.debug: | |
update_status(f"Built local playlist with {len(downloaded_files)} videos", final=True) | |
print(f"\nCreated playlist with downloaded files: {args.playlist}") | |
print("\nTo play with VLC, run:") | |
print(f"vlc {args.playlist}") | |
else: | |
# Just download without playlist | |
output_dir = os.path.expanduser(args.output_dir) | |
os.makedirs(output_dir, exist_ok=True) | |
# Download each video | |
downloaded_files = [] | |
for i, url in enumerate(video_urls, 1): | |
if not args.debug: | |
update_status(f"Downloading {i}/{videos_to_process} ({i*100//videos_to_process}%)") | |
filepath = download_video(url, token, output_dir, args.debug) | |
if filepath: | |
downloaded_files.append(filepath) | |
if args.debug: | |
print(f"Downloaded: {filepath}") | |
if not args.debug: | |
update_status(f"Downloaded {len(downloaded_files)} videos", final=True) | |
if downloaded_files: | |
print(f"Files saved to: {output_dir}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment