Skip to content

Instantly share code, notes, and snippets.

@mtanco
Created January 22, 2025 01:08
Show Gist options
  • Save mtanco/7c5be1469b3f45a94a77ee647ad88636 to your computer and use it in GitHub Desktop.
Save mtanco/7c5be1469b3f45a94a77ee647ad88636 to your computer and use it in GitHub Desktop.
Some basic testing of throughput for calls in MLOps
import os
import requests
import json
import time
import h2o_authn
import h2o_discovery
import h2o_mlops
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
# Run in H2O AI Cloud Notebooks for easy authentication
discovery = h2o_discovery.discover()
token_provider = h2o_authn.TokenProvider(
refresh_token=os.getenv("H2O_CLOUD_CLIENT_PLATFORM_TOKEN"),
issuer_url=discovery.environment.issuer_url,
client_id=discovery.clients["platform"].oauth2_client_id,
)
mlops = h2o_mlops.Client(
gateway_url=discovery.services['mlops-api'].uri,
token_provider=token_provider
)
# Access an existing Model
project_name = "Telco Churn Predictions"
environment_name = "DEV" # one of "DEV" or "PROD"
deployment_runtime_name = "dai_mojo_runtime"
project = mlops.projects.list(name=project_name)[0]
experiment = project.experiments.get(uid=project.experiments.list()[0].uid)
model = project.models.get(uid=project.models.list()[0].uid)
environment = project.environments.list(name=environment_name)[0]
runtime = mlops.runtimes.scoring.list(
artifact_type=model.get_experiment().scoring_artifact_types[1],
uid=deployment_runtime_name,
)[0]
# Create a deployment and wait for it be ready
deployment = environment.deployments.create_single(
name=model.name,
model=model,
scoring_runtime=runtime,
security_options=h2o_mlops.options.SecurityOptions(disabled_security=True),
)
while not deployment.is_healthy():
time.sleep(1)
def mlops_throughput_testing(running_deployment, n, max_rows_per_request):
sample_request = running_deployment.get_sample_request()
# Define the maximum number of rows per request
num_requests = (n + max_rows_per_request - 1) // max_rows_per_request # Calculate number of chunks needed
# Determine the optimal number of workers
num_workers = min(num_requests, os.cpu_count() or 4) # Use the number of CPUs or default to 4
# Create a list to hold all sample_rows for each chunk
requests_data = [{
'fields': sample_request['fields'],
'rows': [sample_request['rows'][0]] * min(max_rows_per_request, n - i * max_rows_per_request)
} for i in range(num_requests)]
start_time = time.time()
# Function to post data
def post_data(data):
response = requests.post(url=running_deployment.url_for_scoring, json=data)
return response
# Use ThreadPoolExecutor to send requests concurrently
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(post_data, data) for data in requests_data]
results = [future.result() for future in futures]
elapsed_time = time.time() - start_time
records_per_second = n / elapsed_time
return {
'Number of Replicas': running_deployment.kubernetes_options.replicas,
'Number of Sample Rows': n,
'Batched Row Size': max_rows_per_request,
'Elapsed Time (seconds)': elapsed_time,
'Records per Second': records_per_second,
'Requests Made': num_requests
# 'Responses': [result.status_code for result in results] # Optionally collect response statuses
}
sample_sizes = [1, 10, 100, 500, 1000, 5000, 10000, 100000]
replicas = [1, 2, 3]
results = []
for r in replicas:
if deployment.kubernetes_options.replicas != r:
print(f"Updating the replicas from {deployment.kubernetes_options.replicas} to {r}")
deployment.update_kubernetes_options(replicas=r)
while not deployment.is_healthy():
time.sleep(1)
for s in sample_sizes:
print(f"Begining load test of {s} rows on {r} replicas")
result = mlops_throughput_testing(deployment, s, 300)
results.append(result)
pd.DataFrame(results)
results = []
for r in replicas:
if deployment.kubernetes_options.replicas != r:
print(f"Updating the replicas from {deployment.kubernetes_options.replicas} to {r}")
deployment.update_kubernetes_options(replicas=r)
while not deployment.is_healthy():
time.sleep(1)
for s in sample_sizes:
print(f"Begining load test of {s} rows on {r} replicas")
result = mlops_throughput_testing(deployment, s)
results.append(result)
pd.DataFrame(results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment