Created
January 22, 2025 01:08
-
-
Save mtanco/7c5be1469b3f45a94a77ee647ad88636 to your computer and use it in GitHub Desktop.
Some basic testing of throughput for calls in MLOps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import json | |
import time | |
import h2o_authn | |
import h2o_discovery | |
import h2o_mlops | |
import pandas as pd | |
from concurrent.futures import ThreadPoolExecutor | |
# Run in H2O AI Cloud Notebooks for easy authentication | |
discovery = h2o_discovery.discover() | |
token_provider = h2o_authn.TokenProvider( | |
refresh_token=os.getenv("H2O_CLOUD_CLIENT_PLATFORM_TOKEN"), | |
issuer_url=discovery.environment.issuer_url, | |
client_id=discovery.clients["platform"].oauth2_client_id, | |
) | |
mlops = h2o_mlops.Client( | |
gateway_url=discovery.services['mlops-api'].uri, | |
token_provider=token_provider | |
) | |
# Access an existing Model | |
project_name = "Telco Churn Predictions" | |
environment_name = "DEV" # one of "DEV" or "PROD" | |
deployment_runtime_name = "dai_mojo_runtime" | |
project = mlops.projects.list(name=project_name)[0] | |
experiment = project.experiments.get(uid=project.experiments.list()[0].uid) | |
model = project.models.get(uid=project.models.list()[0].uid) | |
environment = project.environments.list(name=environment_name)[0] | |
runtime = mlops.runtimes.scoring.list( | |
artifact_type=model.get_experiment().scoring_artifact_types[1], | |
uid=deployment_runtime_name, | |
)[0] | |
# Create a deployment and wait for it be ready | |
deployment = environment.deployments.create_single( | |
name=model.name, | |
model=model, | |
scoring_runtime=runtime, | |
security_options=h2o_mlops.options.SecurityOptions(disabled_security=True), | |
) | |
while not deployment.is_healthy(): | |
time.sleep(1) | |
def mlops_throughput_testing(running_deployment, n, max_rows_per_request): | |
sample_request = running_deployment.get_sample_request() | |
# Define the maximum number of rows per request | |
num_requests = (n + max_rows_per_request - 1) // max_rows_per_request # Calculate number of chunks needed | |
# Determine the optimal number of workers | |
num_workers = min(num_requests, os.cpu_count() or 4) # Use the number of CPUs or default to 4 | |
# Create a list to hold all sample_rows for each chunk | |
requests_data = [{ | |
'fields': sample_request['fields'], | |
'rows': [sample_request['rows'][0]] * min(max_rows_per_request, n - i * max_rows_per_request) | |
} for i in range(num_requests)] | |
start_time = time.time() | |
# Function to post data | |
def post_data(data): | |
response = requests.post(url=running_deployment.url_for_scoring, json=data) | |
return response | |
# Use ThreadPoolExecutor to send requests concurrently | |
with ThreadPoolExecutor(max_workers=num_workers) as executor: | |
futures = [executor.submit(post_data, data) for data in requests_data] | |
results = [future.result() for future in futures] | |
elapsed_time = time.time() - start_time | |
records_per_second = n / elapsed_time | |
return { | |
'Number of Replicas': running_deployment.kubernetes_options.replicas, | |
'Number of Sample Rows': n, | |
'Batched Row Size': max_rows_per_request, | |
'Elapsed Time (seconds)': elapsed_time, | |
'Records per Second': records_per_second, | |
'Requests Made': num_requests | |
# 'Responses': [result.status_code for result in results] # Optionally collect response statuses | |
} | |
sample_sizes = [1, 10, 100, 500, 1000, 5000, 10000, 100000] | |
replicas = [1, 2, 3] | |
results = [] | |
for r in replicas: | |
if deployment.kubernetes_options.replicas != r: | |
print(f"Updating the replicas from {deployment.kubernetes_options.replicas} to {r}") | |
deployment.update_kubernetes_options(replicas=r) | |
while not deployment.is_healthy(): | |
time.sleep(1) | |
for s in sample_sizes: | |
print(f"Begining load test of {s} rows on {r} replicas") | |
result = mlops_throughput_testing(deployment, s, 300) | |
results.append(result) | |
pd.DataFrame(results) | |
results = [] | |
for r in replicas: | |
if deployment.kubernetes_options.replicas != r: | |
print(f"Updating the replicas from {deployment.kubernetes_options.replicas} to {r}") | |
deployment.update_kubernetes_options(replicas=r) | |
while not deployment.is_healthy(): | |
time.sleep(1) | |
for s in sample_sizes: | |
print(f"Begining load test of {s} rows on {r} replicas") | |
result = mlops_throughput_testing(deployment, s) | |
results.append(result) | |
pd.DataFrame(results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment