Skip to content

Instantly share code, notes, and snippets.

@zoharbabin
Last active November 7, 2023 18:29
Show Gist options
  • Save zoharbabin/e231abab5a6fc4e89eb1fdc934fa0212 to your computer and use it in GitHub Desktop.
Save zoharbabin/e231abab5a6fc4e89eb1fdc934fa0212 to your computer and use it in GitHub Desktop.
Kaltura Per-Entry Storage Usage Analysis Utility/Helper

Kaltura Per-Entry Storage Usage Analysis Utility/Helper

This script analyzes the storage usage per each entry in a given Kaltura account. The script lists entries, calculates storage usage by entry and media type, and prints totals.

Please note that this tool is intended to serve as a helper utility for analyzing storage usage associated with Kaltura entries. It is not designed to measure the total storage consumption of an account comprehensively.
Accounts utilize storage in various ways not accounted for by this tool, including but not limited to metadata files, uiConfs, system-generated files, account settings, and other miscellaneous storage. Therefore, the results provided by this script should not be considered an exhaustive or precise reflection of the total storage used by an account. Always refer to your Kaltura account's official storage metrics for accurate and complete storage utilization data.

Configuration

Before running the script, you must provide the necessary configuration in a config.json file. Here is a template for the configuration:

{
    "PARTNER_ID": "REPLACE_FROM_KMC_INTEGRATION_SETTINGS",
    "SERVICE_URL": "https://cdnapi-ev.kaltura.com/",
    "ADMIN_SECRET": "REPLACE_FROM_KMC_INTEGRATION_SETTINGS",
    "USER_ID": "check_storage",
    "SESSION_PRIVILEGES": "*,disableentitlement",
    "EXPIRY": 86400,
    "POOL_SIZE": -1
}

Replace REPLACE_FROM_KMC_INTEGRATION_SETTINGS with your Kaltura Management Console (KMC) integration settings.
The POOL_SIZE is automatically set based on the number of available CPU cores minus 2, but you can specify a positive value to override this behavior.

Features

  • Thread-safe client pooling for efficient API interaction.
  • Multi-threaded entry processing to handle large datasets.
  • Support for various Kaltura entry types including media, playlist, data, and document entries.
  • Detailed logging of the process and results.

Usage

  1. Ensure Python 3 is installed on your system.
  2. Install the KalturaClient Python package and any other dependencies.
  3. Place the config.json file in the same directory as the script.
  4. Run the script using the command:
python kaltura_storage_usage.py

License

This is provided "as-is", no guarantees nor support is offered.

{
"PARTNER_ID": REPLACE_FROM_KMC_INTEGRATION_SETTINGS,
"SERVICE_URL": "https://cdnapi-ev.kaltura.com/",
"ADMIN_SECRET": "REPLACE_FROM_KMC_INTEGRATION_SETTINGS",
"USER_ID": "check_storage",
"SESSION_PRIVILEGES": "*,disableentitlement",
"EXPIRY": 86400,
"POOL_SIZE": -1 //-1 means auto set based on number of available cpu cores minus 2, value above 0 will use the value provided in the config
}
import os
import json
import logging
import requests
from contextlib import contextmanager
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from KalturaClient import *
from KalturaClient.Plugins.Core import *
from KalturaClient.Plugins.Document import KalturaDocumentEntry
from queue import Queue
# Create a thread-safe queue for client instances
client_pool = Queue()
@contextmanager
def get_client_from_pool():
client = client_pool.get()
try:
yield client
finally:
client_pool.put(client)
# Function to create a new Kaltura client
def create_kaltura_client(config, admin_secret, user_id, session_type, partner_id, expiry, session_privileges):
local_client = KalturaClient(config)
ks = local_client.generateSessionV2(admin_secret, user_id, session_type, partner_id, expiry, session_privileges)
local_client.setKs(ks)
return local_client
# Function to list entries
def list_entries(client, pager):
try:
list_filter = KalturaBaseEntryFilter()
response = client.baseEntry.list(list_filter, pager).objects
logging.info(f"{len(response)} entries listed successfully.")
return response
except Exception as e:
logging.error(f"Error listing entries: {e}")
return None
# Function to convert media type enum to string
def get_media_type_string(media_type_enum):
# Reverse lookup to get the name from the value
for name, value in KalturaMediaType.__dict__.items():
if isinstance(value, int) and value == media_type_enum.getValue():
return name
return f"UNKNOWN-{media_type_enum.getValue()}"
def get_asset_size_sum(local_client, entry_id):
total_size_in_bytes = 0
missing_sizeInBytes_count = 0 # Counter for assets missing proper sizeInBytes
pager = KalturaFilterPager(pageSize=500, pageIndex=1)
# Filter common to all asset types
common_filter = KalturaAssetFilter()
common_filter.entryIdEqual = entry_id
# filter for fileAsset
filter = KalturaFileAssetFilter()
filter.fileAssetObjectTypeEqual = KalturaFileAssetObjectType.ENTRY
filter.objectIdEqual = entry_id
local_client.startMultiRequest()
# Get all entry's assets
local_client.flavorAsset.list(common_filter, pager)
local_client.thumbAsset.list(common_filter, pager)
local_client.attachment.attachmentAsset.list(common_filter, pager)
local_client.caption.captionAsset.list(common_filter, pager)
local_client.fileAsset.list(filter, pager)
# Execute the multi-request
multiResult = local_client.doMultiRequest()
# Extract the .objects from each response in the multiResult
flavor_assets = multiResult[0].objects
thumb_assets = multiResult[1].objects
attachment_assets = multiResult[2].objects
caption_assets = multiResult[3].objects
file_assets = multiResult[4].objects
# Helper function to get the size in bytes
def get_size_in_bytes(asset):
nonlocal missing_sizeInBytes_count
if hasattr(asset, 'sizeInBytes') and asset.sizeInBytes > 0:
return asset.sizeInBytes
else:
if hasattr(asset, 'size'):
size_in_kb = asset.size
if asset.sizeInBytes == 0:
# logging.info(f"Asset ID {asset.id} using 'size' (in KB) instead of 'sizeInBytes'.")
missing_sizeInBytes_count += 1
return size_in_kb * 1024 # Convert KB to bytes
else:
return 0
# Calculate storage totals
sizes_by_asset_type = {
'flavor': sum(get_size_in_bytes(asset) for asset in flavor_assets),
'thumb': sum(get_size_in_bytes(asset) for asset in thumb_assets),
'attachment': sum(get_size_in_bytes(asset) for asset in attachment_assets),
'caption': sum(get_size_in_bytes(asset) for asset in caption_assets),
'file': sum(get_size_in_bytes(asset) for asset in file_assets),
}
total_size_in_bytes = sum(sizes_by_asset_type.values())
return total_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count
# get the file size of file download url
def get_file_size(file_url):
try:
# First try with a HEAD request
response = requests.head(file_url)
response.raise_for_status()
if 'Content-Length' in response.headers:
return int(response.headers['Content-Length'])
else:
# Fallback to a GET request if Content-Length is not present
response = requests.get(file_url, stream=True)
response.raise_for_status()
# Return the total size of the file
return sum(len(chunk) for chunk in response.iter_content(chunk_size=8192))
except Exception as e:
logging.error(f"Error getting file size: {e}")
return 0
# Define handler functions for each type of entry
def handle_media_entry(entry):
with get_client_from_pool() as client:
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(client, entry.id)
media_type_string = get_media_type_string(entry.mediaType)
# Check if the media type is IMAGE, if so, try to calculate the file size from dataUrl
if media_type_string == "IMAGE" and hasattr(entry, 'dataUrl') and entry.dataUrl:
image_size_in_bytes = get_file_size(entry.dataUrl)
entry_size_in_bytes += image_size_in_bytes
return media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count
def handle_playlist(entry):
with get_client_from_pool() as client:
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(client, entry.id)
return "PLAYLIST", entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count
def handle_data_entry(entry):
with get_client_from_pool() as local_client:
try:
file_url = local_client.data.serve(entry.id)
# Use the get_file_size function to get the size
file_size_in_bytes = get_file_size(file_url)
# If get_file_size returns 0, try to use the length of entry.dataContent
if file_size_in_bytes == 0 and entry.dataContent:
file_size_in_bytes = len(entry.dataContent.encode('utf-8', errors='ignore')) # Fallback to UTF-8 and ignore errors
# Get the size of all other assets associated with the entry
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(local_client, entry.id)
# Add the size of the data content file to the total
entry_size_in_bytes += file_size_in_bytes
return "DATA", entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count
except Exception as e:
logging.error(f"Error processing data entry: {e}")
return entry.id, "ERROR", str(e), {}, 0
def handle_document_entry(entry):
with get_client_from_pool() as client:
entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = get_asset_size_sum(client, entry.id)
return "DOCUMENT", entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count
def process_entry(entry):
with get_client_from_pool() as client:
try:
if isinstance(entry, KalturaMediaEntry) or isinstance(entry, KalturaLiveStreamAdminEntry):
handler = handle_media_entry
elif isinstance(entry, KalturaPlaylist):
handler = handle_playlist
elif isinstance(entry, KalturaDataEntry):
handler = handle_data_entry
elif isinstance(entry, KalturaDocumentEntry):
handler = handle_document_entry
else:
handler = lambda e: (f"UNKNOWN-{entry.__class__}", 0, {}, 0)
media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = handler(entry)
return entry.id, media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count
except Exception as e:
logging.error(f"Error processing entry: {e}")
return entry.id, "ERROR", str(e), {}, 0
# Main script logic
def main():
# Configure logging at the start of your script
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load the configuration file
with open('config.json', 'r') as config_file:
config_data = json.load(config_file)
# Assign configuration to variables
PARTNER_ID = config_data["PARTNER_ID"]
SERVICE_URL = config_data["SERVICE_URL"]
ADMIN_SECRET = config_data["ADMIN_SECRET"]
USER_ID = config_data["USER_ID"]
SESSION_PRIVILEGES = config_data["SESSION_PRIVILEGES"]
EXPIRY = config_data["EXPIRY"]
POOL_SIZE = config_data["POOL_SIZE"]
SESSION_TYPE = KalturaSessionType.ADMIN
# Use the number of CPU cores for POOL_SIZE if it is set to -1 in the config
if POOL_SIZE == -1:
POOL_SIZE = max((os.cpu_count() - 2), 10) # Fallback to 1 if os.cpu_count() is None
# Initialize Kaltura configuration with SERVICE_URL
config = KalturaConfiguration(PARTNER_ID)
config.serviceUrl = SERVICE_URL
logging.info(f"Creating {POOL_SIZE} threads...")
for _ in range(POOL_SIZE):
client_pool.put(create_kaltura_client(config, ADMIN_SECRET, USER_ID, SESSION_TYPE, PARTNER_ID, EXPIRY, SESSION_PRIVILEGES))
total_size_in_bytes = 0
pager = KalturaFilterPager(pageSize=500, pageIndex=1)
entry_counter = 0 # Initialize an entry counter
total_size_by_asset_type = defaultdict(int)
total_size_by_media_type = defaultdict(int)
# Initialize a counter for assets with missing sizeInBytes at the beginning of your main function
total_missing_sizeInBytes_count = 0
with ThreadPoolExecutor(max_workers=POOL_SIZE) as executor:
# Loop through all entries
while True:
entries = None
with get_client_from_pool() as client:
entries = list_entries(client, pager)
if not entries:
break # No more entries to process
future_to_entry = {executor.submit(process_entry, entry): entry for entry in entries}
for future in as_completed(future_to_entry):
entry = future_to_entry[future]
entry_counter += 1 # Increment the entry counter
# logging.info(f"{entry_counter}) Starting process_entry for entry_id: {entry.id}")
try:
entry_id, media_type_string, entry_size_in_bytes, sizes_by_asset_type, missing_sizeInBytes_count = future.result()
total_missing_sizeInBytes_count += missing_sizeInBytes_count # Increment the missing sizeInBytes count
if media_type_string == "ERROR":
logging.error(f"{entry_counter}) Entry {entry_id} generated an exception: {entry_size_in_bytes}")
else:
total_size_in_bytes += entry_size_in_bytes
for asset_type, size in sizes_by_asset_type.items():
total_size_by_asset_type[asset_type] += size
total_size_by_media_type[media_type_string] += entry_size_in_bytes
logging.info(f"{entry_counter}) Processed entry {entry_id}: type: {media_type_string}, size: {entry_size_in_bytes} bytes")
except Exception as exc:
logging.error(f"Entry {entry.id} generated an exception: {exc}")
pager.pageIndex += 1 # Move to the next page
# Convert total size to GB and format with commas
total_size_in_gb = total_size_in_bytes / (1024 ** 3)
formatted_total_size_in_gb = "{:,.2f}".format(total_size_in_gb)
logging.info(f"Storage totals of Partner ID: {PARTNER_ID}")
logging.info(f"Total size of all flavors: {formatted_total_size_in_gb} GB")
logging.info(f"Total number of entries tallied: {entry_counter}")
for asset_type, total_size in total_size_by_asset_type.items():
logging.info(f"Total size for {asset_type} assets: {total_size / (1024 ** 3):.2f} GB")
for media_type, total_size in total_size_by_media_type.items():
logging.info(f"Total size for {media_type} entries: {total_size / (1024 ** 3):.2f} GB")
logging.info(f"Total number of assets with missing 'sizeInBytes' but a valid 'size': {total_missing_sizeInBytes_count}")
# Execute the main function
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment