Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Last active August 16, 2025 04:11
Show Gist options
  • Save metadaddy/ec9e645fa0929321b626d8be6e11162e to your computer and use it in GitHub Desktop.
Save metadaddy/ec9e645fa0929321b626d8be6e11162e to your computer and use it in GitHub Desktop.
Query the Backblaze Drive Stats data set from Daft (www.getdaft.io)
# Use these values for read-only access to the Drive Stats data set:
AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com
AWS_ACCESS_KEY_ID=0045f0571db506a0000000017
AWS_SECRET_ACCESS_KEY=K004Fs/bgmTk5dgo6GAVm2Waj3Ka+TE
import datetime
import time
from typing import Tuple
import daft
import logging
import os
import re
from daft import col
from dotenv import load_dotenv
from pyarrow import fs
from pyiceberg.catalog import LOCATION, load_catalog
from pyiceberg.exceptions import NoSuchNamespaceError
# Never put credentials in source code!
# .env file should contain AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# The .env file in this gist (https://gist.github.com/metadaddy/ec9e645fa0929321b626d8be6e11162e)
# contains credentials with read-only access to the Drive Stats data set
load_dotenv(override=True)
DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_REQUEST_TIMEOUT = 20
BYTES_IN_EXABYTE = 1_000_000_000_000_000_000.0
ENDPOINT_URL = os.environ['AWS_ENDPOINT_URL']
BUCKET = 'drivestats-iceberg'
NAMESPACE = 'default'
TABLE_NAME = 'drivestats'
def get_region(endpoint: str):
"""
Extract region from endpoint - you could just configure it separately
"""
endpoint_pattern = re.compile(r'^https://s3\.([a-zA-Z0-9-]+)\.backblazeb2\.com$')
region_match = endpoint_pattern.match(endpoint)
region_name = region_match.group(1)
return region_name
REGION = get_region(ENDPOINT_URL)
def get_metadata_location(bucket, table_name) -> str | None:
"""
Given a bucket and table name, return a URI for the most recent Iceberg metadata JSON file
"""
s3fs = fs.S3FileSystem(
# We need to explicitly set endpoint and region since the underlying
# AWS SDK for C++ does not support the environment variables. See
# https://github.com/aws/aws-sdk-cpp/issues/2587
endpoint_override=ENDPOINT_URL,
region=REGION,
request_timeout=DEFAULT_REQUEST_TIMEOUT,
connect_timeout=DEFAULT_CONNECT_TIMEOUT,
)
# List files at the metadata location
prefix = f"{bucket}/{table_name}/metadata/"
files = s3fs.get_file_info(fs.FileSelector(prefix, True, True))
# Metadata files have suffix '.metadata.json' and are named sequentially with numeric prefixes,
# so we can simply filter the listing, sort it, and take the last element
if len(metadata_locations := sorted([file.path for file in files if file.path.endswith('.metadata.json')])) > 0:
return f's3://{metadata_locations[-1]}'
return None
def time_collect(df: daft.DataFrame) -> Tuple[daft.DataFrame, float]:
"""
Helper function to time collect() call on a dataframe
"""
start = time.perf_counter()
result = df.collect()
return result, time.perf_counter() - start
def main():
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.info('Setting log level for daft.iceberg.iceberg_scan to ERROR to suppress warning about unspecified partition filter')
logging.getLogger('daft.iceberg.iceberg_scan').setLevel(logging.ERROR)
# Create an in-memory catalog for the purposes of accessing the data. We could
# equally use an online catalog.
catalog = load_catalog(
'iceberg',
**{
'uri': 'sqlite:///:memory:',
# The underlying AWS SDK for C++ does not support AWS_ENDPOINT_URL
# and AWS_REGION environment variables (see
# https://github.com/aws/aws-sdk-cpp/issues/2587) so we have to pass
# them explicitly
's3.endpoint': ENDPOINT_URL,
's3.region': REGION,
's3.request-timeout': DEFAULT_REQUEST_TIMEOUT,
's3.connect-timeout': DEFAULT_CONNECT_TIMEOUT,
}
)
catalog.create_namespace(NAMESPACE, { LOCATION: f's3://{BUCKET}/'})
metadata_location = get_metadata_location(BUCKET, TABLE_NAME)
logger.info(f'Metadata located at {metadata_location}')
table = catalog.register_table(f'{NAMESPACE}.{TABLE_NAME}', metadata_location)
drivestats = daft.read_iceberg(table)
# How many records are in the current Drive Stats dataset?
count, elapsed_time = time_collect(drivestats.count())
print(f'Total record count: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)')
# How many hard drives was Backblaze spinning on a given date?
most_recent_day = drivestats.where(col("date") == datetime.date(2025, 3, 31))
count, elapsed_time = time_collect(most_recent_day.count())
print(f'Record count for 2025-03-31: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)')
# How many exabytes of raw storage was Backblaze managing on a given date?
total_capacity, elapsed_time = time_collect(
most_recent_day.agg(
col('capacity_bytes').sum()
)
)
print(f'Capacity: {total_capacity.to_pydict()['capacity_bytes'][0] / BYTES_IN_EXABYTE:.3f} EB ({elapsed_time:.2f} seconds)')
# What are the top 10 most common drive models in the dataset?
result, elapsed_time = time_collect(
drivestats.groupby('model').agg(
col('serial_number').count_distinct().alias('count')
).sort(col('count'), desc=True).limit(10)
)
print(f'Top 10 most common drive models:')
result.show(10)
print(f'({elapsed_time:.2f} seconds)')
if __name__ == "__main__":
main()
@kevinMEH
Copy link

Hello @metadaddy,
This is an automated message generated to inform you that we have detected potentially active AWS S3 credentials exposed in your GitHub Gist: https://gist.github.com/owner/ec9e645fa0929321b626d8be6e11162e.
Please revoke the key immediately and delete the gist.
This action was performed by kevinMEH/keyscan. For more information, please contact @kevinMEH. If you find our service helpful, please consider following and shouting out @kevinMEH.

@metadaddy
Copy link
Author

@kevinMEH These are read-only credentials, deliberately shared so that the community can access the Drive Stats data set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment