|
import os |
|
import json |
|
import logging |
|
import requests |
|
from contextlib import contextmanager |
|
from collections import defaultdict |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from KalturaClient import * |
|
from KalturaClient.Plugins.Core import * |
|
from KalturaClient.Plugins.Document import KalturaDocumentEntry |
|
from queue import Queue |
|
|
|
# Create a thread-safe queue for client instances |
|
client_pool = Queue() |
|
|
|
@contextmanager |
|
def get_client_from_pool(): |
|
client = client_pool.get() |
|
try: |
|
yield client |
|
finally: |
|
client_pool.put(client) |
|
|
|
# Function to create a new Kaltura client |
|
def create_kaltura_client(config, admin_secret, user_id, session_type, partner_id, expiry, session_privileges): |
|
local_client = KalturaClient(config) |
|
ks = local_client.generateSessionV2(admin_secret, user_id, session_type, partner_id, expiry, session_privileges) |
|
local_client.setKs(ks) |
|
return local_client |
|
|
|
# Function to list entries |
|
def list_entries(client, pager): |
|
try: |
|
list_filter = KalturaBaseEntryFilter() |
|
response = client.baseEntry.list(list_filter, pager).objects |
|
logging.info(f"{len(response)} entries listed successfully.") |
|
return response |
|
except Exception as e: |
|
logging.error(f"Error listing entries: {e}") |
|
return None |
|
|
|
# Function to convert media type enum to string |
|
def get_media_type_string(media_type_enum): |
|
# Reverse lookup to get the name from the value |
|
for name, value in KalturaMediaType.__dict__.items(): |
|
if isinstance(value, int) and value == media_type_enum.getValue(): |
|
return name |
|
return f"UNKNOWN-{media_type_enum.getValue()}" |
|
|
|
def get_asset_size_sum(local_client, entry_id): |
|
total_size_in_bytes = 0 |
|
missing_sizeInBytes_count = 0 # Counter for assets missing proper sizeInBytes |
|
|
|
pager = KalturaFilterPager(pageSize=500, pageIndex=1) |
|
|
|
# Filter common to all asset types |
|
common_filter = KalturaAssetFilter() |
|
common_filter.entryIdEqual = entry_id |
|
# filter for fileAsset |
|
filter = KalturaFileAssetFilter() |
|
filter.fileAssetObjectTypeEqual = KalturaFileAssetObjectType.ENTRY |
|
filter.objectIdEqual = entry_id |
|
|
|
local_client.startMultiRequest() |
|
# Get all entry's assets |
|
local_client.flavorAsset.list(common_filter, pager) |
|
local_client.thumbAsset.list(common_filter, pager) |
|
local_client.attachment.attachmentAsset.list(common_filter, pager) |
|
local_client.caption.captionAsset.list(common_filter, pager) |
|
local_client.fileAsset.list(filter, pager) |
|
# Execute the multi-request |
|
multiResult = local_client.doMultiRequest() |
|
|
|
# Extract the .objects from each response in the multiResult |
|
flavor_assets = multiResult[0].objects |
|
thumb_assets = multiResult[1].objects |
|
attachment_assets = multiResult[2].objects |
|
caption_assets = multiResult[3].objects |
|
file_assets = multiResult[4].objects |
|
|
|
# Helper function to get the size in bytes |
|
def get_size_in_bytes(asset): |
|
nonlocal missing_sizeInBytes_count |
|
if hasattr(asset, 'sizeInBytes') and asset.sizeInBytes > 0: |
|
return asset.sizeInBytes |
|
else: |
|
if hasattr(asset, 'size'): |
|
size_in_kb = asset.size |
|
if asset.sizeInBytes == 0: |
|
# logging.info(f"Asset ID {asset.id} using 'size' (in KB) instead of 'sizeInBytes'.") |
|
missing_sizeInBytes_count += 1 |
|
return size_in_kb * 1024 # Convert KB to bytes |
|
else: |
|
return 0 |
|
|
|
# Calculate storage totals |
|
sizes_by_asset_type = { |
|
'flavor': sum(get_size_in_bytes(asset) for asset in flavor_assets), |
|
'thumb': sum(get_size_in_bytes(asset) for asset in thumb_assets), |
|
'attachment': sum(get_size_in_bytes(asset) for asset in attachment_assets), |
|
'caption': sum(get_size_in_bytes(asset) for asset in caption_assets), |
|
'file': sum(get_size_in_bytes(asset) for asset in file_assets), |
|
} |
|
total_size_in_bytes = sum(sizes_by_asset_type.values()) |
|
|
|
return total_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count |
|
|
|
# get the file size of file download url |
|
def get_file_size(file_url): |
|
try: |
|
# First try with a HEAD request |
|
response = requests.head(file_url) |
|
response.raise_for_status() |
|
|
|
if 'Content-Length' in response.headers: |
|
return int(response.headers['Content-Length']) |
|
else: |
|
# Fallback to a GET request if Content-Length is not present |
|
response = requests.get(file_url, stream=True) |
|
response.raise_for_status() |
|
|
|
# Return the total size of the file |
|
return sum(len(chunk) for chunk in response.iter_content(chunk_size=8192)) |
|
except Exception as e: |
|
logging.error(f"Error getting file size: {e}") |
|
return 0 |
|
|
|
# Define handler functions for each type of entry |
|
def handle_media_entry(entry): |
|
with get_client_from_pool() as client: |
|
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(client, entry.id) |
|
media_type_string = get_media_type_string(entry.mediaType) |
|
|
|
# Check if the media type is IMAGE, if so, try to calculate the file size from dataUrl |
|
if media_type_string == "IMAGE" and hasattr(entry, 'dataUrl') and entry.dataUrl: |
|
image_size_in_bytes = get_file_size(entry.dataUrl) |
|
entry_size_in_bytes += image_size_in_bytes |
|
|
|
return media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count |
|
|
|
def handle_playlist(entry): |
|
with get_client_from_pool() as client: |
|
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(client, entry.id) |
|
return "PLAYLIST", entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count |
|
|
|
def handle_data_entry(entry): |
|
with get_client_from_pool() as local_client: |
|
try: |
|
file_url = local_client.data.serve(entry.id) |
|
# Use the get_file_size function to get the size |
|
file_size_in_bytes = get_file_size(file_url) |
|
# If get_file_size returns 0, try to use the length of entry.dataContent |
|
if file_size_in_bytes == 0 and entry.dataContent: |
|
file_size_in_bytes = len(entry.dataContent.encode('utf-8', errors='ignore')) # Fallback to UTF-8 and ignore errors |
|
# Get the size of all other assets associated with the entry |
|
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(local_client, entry.id) |
|
# Add the size of the data content file to the total |
|
entry_size_in_bytes += file_size_in_bytes |
|
return "DATA", entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing data entry: {e}") |
|
return entry.id, "ERROR", str(e), {}, 0 |
|
|
|
def handle_document_entry(entry): |
|
with get_client_from_pool() as client: |
|
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(client, entry.id) |
|
return "DOCUMENT", entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count |
|
|
|
def process_entry(entry): |
|
with get_client_from_pool() as client: |
|
try: |
|
if isinstance(entry, KalturaMediaEntry) or isinstance(entry, KalturaLiveStreamAdminEntry): |
|
handler = handle_media_entry |
|
elif isinstance(entry, KalturaPlaylist): |
|
handler = handle_playlist |
|
elif isinstance(entry, KalturaDataEntry): |
|
handler = handle_data_entry |
|
elif isinstance(entry, KalturaDocumentEntry): |
|
handler = handle_document_entry |
|
else: |
|
handler = lambda e: (f"UNKNOWN-{entry.__class__}", 0, {}, 0) |
|
|
|
media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = handler(entry) |
|
|
|
return entry.id, media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count |
|
except Exception as e: |
|
logging.error(f"Error processing entry: {e}") |
|
return entry.id, "ERROR", str(e), {}, 0 |
|
|
|
# Main script logic |
|
def main(): |
|
# Configure logging at the start of your script |
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
# Load the configuration file |
|
with open('config.json', 'r') as config_file: |
|
config_data = json.load(config_file) |
|
|
|
# Assign configuration to variables |
|
PARTNER_ID = config_data["PARTNER_ID"] |
|
SERVICE_URL = config_data["SERVICE_URL"] |
|
ADMIN_SECRET = config_data["ADMIN_SECRET"] |
|
USER_ID = config_data["USER_ID"] |
|
SESSION_PRIVILEGES = config_data["SESSION_PRIVILEGES"] |
|
EXPIRY = config_data["EXPIRY"] |
|
POOL_SIZE = config_data["POOL_SIZE"] |
|
SESSION_TYPE = KalturaSessionType.ADMIN |
|
|
|
# Use the number of CPU cores for POOL_SIZE if it is set to -1 in the config |
|
if POOL_SIZE == -1: |
|
POOL_SIZE = max((os.cpu_count() - 2), 10) # Fallback to 1 if os.cpu_count() is None |
|
|
|
# Initialize Kaltura configuration with SERVICE_URL |
|
config = KalturaConfiguration(PARTNER_ID) |
|
config.serviceUrl = SERVICE_URL |
|
|
|
logging.info(f"Creating {POOL_SIZE} threads...") |
|
for _ in range(POOL_SIZE): |
|
client_pool.put(create_kaltura_client(config, ADMIN_SECRET, USER_ID, SESSION_TYPE, PARTNER_ID, EXPIRY, SESSION_PRIVILEGES)) |
|
|
|
total_size_in_bytes = 0 |
|
pager = KalturaFilterPager(pageSize=500, pageIndex=1) |
|
entry_counter = 0 # Initialize an entry counter |
|
|
|
total_size_by_asset_type = defaultdict(int) |
|
total_size_by_media_type = defaultdict(int) |
|
|
|
# Initialize a counter for assets with missing sizeInBytes at the beginning of your main function |
|
total_missing_sizeInBytes_count = 0 |
|
|
|
with ThreadPoolExecutor(max_workers=POOL_SIZE) as executor: |
|
# Loop through all entries |
|
while True: |
|
entries = None |
|
with get_client_from_pool() as client: |
|
entries = list_entries(client, pager) |
|
|
|
if not entries: |
|
break # No more entries to process |
|
|
|
future_to_entry = {executor.submit(process_entry, entry): entry for entry in entries} |
|
for future in as_completed(future_to_entry): |
|
entry = future_to_entry[future] |
|
entry_counter += 1 # Increment the entry counter |
|
# logging.info(f"{entry_counter}) Starting process_entry for entry_id: {entry.id}") |
|
try: |
|
entry_id, media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = future.result() |
|
total_missing_sizeInBytes_count += missing_sizeInBytes_count # Increment the missing sizeInBytes count |
|
if media_type_string == "ERROR": |
|
logging.error(f"{entry_counter}) Entry {entry_id} generated an exception: {entry_size_in_bytes}") |
|
else: |
|
total_size_in_bytes += entry_size_in_bytes |
|
for asset_type, size in sizes_by_asset_type.items(): |
|
total_size_by_asset_type[asset_type] += size |
|
total_size_by_media_type[media_type_string] += entry_size_in_bytes |
|
logging.info(f"{entry_counter}) Processed entry {entry_id}: type: {media_type_string}, size: {entry_size_in_bytes} bytes") |
|
except Exception as exc: |
|
logging.error(f"Entry {entry.id} generated an exception: {exc}") |
|
|
|
pager.pageIndex += 1 # Move to the next page |
|
|
|
# Convert total size to GB and format with commas |
|
total_size_in_gb = total_size_in_bytes / (1024 ** 3) |
|
formatted_total_size_in_gb = "{:,.2f}".format(total_size_in_gb) |
|
logging.info(f"Storage totals of Partner ID: {PARTNER_ID}") |
|
logging.info(f"Total size of all flavors: {formatted_total_size_in_gb} GB") |
|
logging.info(f"Total number of entries tallied: {entry_counter}") |
|
for asset_type, total_size in total_size_by_asset_type.items(): |
|
logging.info(f"Total size for {asset_type} assets: {total_size / (1024 ** 3):.2f} GB") |
|
for media_type, total_size in total_size_by_media_type.items(): |
|
logging.info(f"Total size for {media_type} entries: {total_size / (1024 ** 3):.2f} GB") |
|
logging.info(f"Total number of assets with missing 'sizeInBytes' but a valid 'size': {total_missing_sizeInBytes_count}") |
|
|
|
# Execute the main function |
|
if __name__ == "__main__": |
|
main() |