Last active
November 4, 2019 07:51
-
-
Save ryanjdillon/a5ac4f1fb9fe66ed5f0e7cb7b705f13c to your computer and use it in GitHub Desktop.
Load test AzureML Workspace handling of MLflow logging calls -- results in rate limit errors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Load test MLFlow logging to AzureML Workspace (as a tracking backend) | |
Warning: Azure charges apply! | |
The following environment variables must be set: | |
* SUBSCRIPTION_ID - The Azure Subscription ID the Workspace is created under | |
* RESOURCE_GROUP - The Azure ResourceGroup ID the Workspace is created under | |
* WORKSPACE_NAME - The name of the AzureML Worskpace to log to | |
""" | |
from typing import Callable, List | |
from azureml.core.authentication import InteractiveLoginAuthentication | |
from azureml.core import Workspace | |
import concurrent.futures | |
import logging | |
import mlflow | |
from mlflow.entities import Metric, Param | |
import os | |
import pendulum | |
import random | |
import time | |
import uuid | |
logging.basicConfig() | |
logger = logging.getLogger(__name__) | |
logger.setLevel("DEBUG") | |
def get_tracking_client( | |
auth: InteractiveLoginAuthentication | |
) -> mlflow.tracking.MlflowClient: | |
""" | |
Set remote tracking URI for mlflow to AzureML workspace | |
Returns | |
------- | |
client: mlflow.tracking.MlflowClient | |
Client with tracking uri set to AzureML | |
""" | |
logging.info("Create AzureML Workspace object") | |
subscription_id = os.getenv("SUBSCRIPTION_ID") | |
resource_group = os.getenv("RESOURCE_GROUP") | |
workspace_name = os.getenv("WORKSPACE_NAME") | |
ws = Workspace(subscription_id, resource_group, workspace_name, auth=auth) | |
logging.info("Creating MLflow tracking client with AzureML Workspace tracking URI") | |
return mlflow.tracking.MlflowClient(ws.get_mlflow_tracking_uri()) | |
def new_experiment(client: mlflow.tracking.MlflowClient) -> str: | |
""" | |
Create a new experiment with a random name on the remote backend | |
Parameters | |
---------- | |
client: mlflow.tracking.MlflowClient | |
Client with tracking uri set to AzureML. | |
Returns | |
------- | |
experiment_id: str | |
Experiment ID of new experiment. | |
""" | |
return client.create_experiment(f"test_{uuid.uuid4()}") | |
def delete_experiments(client: mlflow.tracking.MlflowClient, prefix: str = "test_"): | |
""" | |
Delete experiments beginning with provided prefix | |
Parameters | |
---------- | |
client: mlflow.tracking.MlflowClient | |
Client with tracking uri set to AzureML. | |
prefix: str | |
Prefix of experiment names to delete. | |
""" | |
experiments = client.list_experiments() | |
for exp in experiments: | |
if exp.name.startswith(prefix): | |
client.delete_experiment(exp.experiment_id) | |
def log_single_build(client: mlflow.tracking.MlflowClient, run_id: str): | |
""" | |
Make a series of log calls representing a single model build | |
Parameters | |
---------- | |
client: mlflow.tracking.MlflowClient | |
Client with tracking uri set to AzureML. | |
run_id: str | |
The unique ID of the run to log to. | |
""" | |
def random_float(): | |
return random.random() | |
def random_int(): | |
return round(random.random() * 10) | |
# Random wait between 0-5s | |
time.sleep(random.random() * 5) | |
name = f"rowfilter-drop-column-ff{random_int()}" | |
model_id = "".join([str(random_int()) for _ in range(15)]) | |
start_date = "2017-{}-{}T00:00:00+00:00:00".format(random_int, random_int) | |
end_date = "2018-{}-{}T00:00:00+00:00:00".format(random_int, random_int) | |
model_config = f""" | |
gordo_components.model.anomaly.diff.DiffBasedAnomalyDetector: | |
base_estimator: | |
sklearn.compose.TransformedTargetRegressor: | |
transformer: sklearn.preprocessing.data.MinMaxScaler | |
regressor: | |
sklearn.pipeline.Pipeline: | |
steps: | |
- sklearn.compose.ColumnTransformer: | |
transformers: | |
- - dropper # Name of this transformer | |
- drop # Action to perform | |
- TRA-35TT8567.PV # Column to apply this action to. | |
remainder: passthrough # What do do with the rest | |
- sklearn.preprocessing.data.MinMaxScaler | |
- gordo_components.model.models.KerasAutoEncoder: | |
kind: feedforward_hourglass | |
""" | |
dataset_config = f""" | |
tags: | |
- TRA-35TT856{random_int()}.PV | |
- TRA-35TT856{random_int()}.PV | |
- TRA-35TT856{random_int()}.PV | |
- TRA-35TT856{random_int()}.PV | |
target_tag_list: | |
- TRA-35TT8566.PV | |
- TRA-35TT8568.PV | |
- TRA-35TT8569.PV | |
train_end_date: '{start_date}' | |
train_start_date: '{end_date}' | |
row_filter: "(`TRA-35TT8567.PV` > 30) & (`TRA-35TT8567.PV` < 40)" | |
type: TimeSeriesDataset | |
""" | |
metadata = """ | |
information: 'Use row filtering and dropping the column inside the pipeline' | |
""" | |
evaluation_config = """ | |
cv_mode: full_build | |
""" | |
params = [ | |
(f"name", name), | |
(f"model_id", model_id), | |
(f"start_date", start_date), | |
(f"end_date", end_date), | |
(f"model_config", model_config), | |
(f"dataset_config", dataset_config), | |
(f"metadata", metadata), | |
(f"evaluation_config", evaluation_config), | |
] | |
metrics = [ | |
("dataset_duration", random_float()), | |
("training_duration", random_float()), | |
("mean_squared_errors", random_float()), | |
("explained_variance", random_float()), | |
("r_2", random_float()), | |
("variable_threshold", random_float()), | |
] | |
# Create MLflow logging instances for batch log | |
# Metric timestamp must be in milliseconds since Unix epoch | |
now = round(pendulum.now("utc").float_timestamp) * 1000 | |
metrics = [Metric(k, v, timestamp=now, step=0) for k, v in metrics] | |
params = [Param(k, v) for k, v in params] | |
client.log_batch(run_id, metrics=metrics, params=params) | |
def log_project_builds( | |
auth: InteractiveLoginAuthentication, n_deployments: int = 1 | |
) -> float: | |
""" | |
Perfom a run(s) to AzureML Workspace for a given machine/project | |
A new client is created before each series of runs for handling the mlflow | |
client in a thread safe manner. Multiple runs would correspond to runs from | |
subsequent deployments for the same machine/project. | |
If a rate-limit warning is received this threads logging pauses for 60s, as | |
per the Azure request response's suggestions. | |
Parameters | |
---------- | |
auth: azureml.core.authentication.InteractiveLoginAuthentication | |
Auth object for generating new clients. | |
n_deployments: int | |
Number of runs with associated logs to generated. | |
Returns | |
------- | |
duration: float | |
Duration of mlflow "run". | |
""" | |
t0 = pendulum.now("utc") | |
client = get_tracking_client(auth) | |
experiment_id = new_experiment(client) | |
for _ in range(n_deployments): | |
run_id = client.create_run( | |
experiment_id, tags={"model_key": str(uuid.uuid4())} | |
).info.run_id | |
log_submitted = False | |
while not log_submitted: | |
try: | |
log_single_build(client, run_id) | |
except Warning: | |
time.sleep(60) | |
else: | |
log_submitted = True | |
# Manually set run as finished | |
client.set_terminated(run_id) | |
del client | |
return (pendulum.now("utc") - t0).total_seconds() | |
def marauder( | |
n_threads: int, n_calls: int, func: Callable, *args, **kwargs: dict | |
) -> List[int]: | |
""" | |
Perform threaded execution of a function | |
Parameters | |
---------- | |
n_threads: int | |
Number of threads in to launch all of the calls. | |
n_calls: int | |
Number of calls to 'func'. | |
func: Callable | |
The function called by each marauder. | |
*args: dict | |
Arguments to func. | |
**kwargs: dict | |
Keyword arguments to func. | |
Returns | |
------- | |
durations_sec: List[int] | |
""" | |
logger.info(f"{pendulum.now('utc')} ; Maraud initiated") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor: | |
future_to_call = { | |
executor.submit(func, *args, **kwargs): call for call in range(n_calls) | |
} | |
for future in concurrent.futures.as_completed(future_to_call): | |
durations_sec = list() | |
try: | |
call = future_to_call[future] | |
durations_sec.append(future.result()) | |
logger.info(f"{pendulum.now('utc')} ; Marauder {call} succeeded!!") | |
except Exception as exc: | |
logger.exception(exc) | |
return durations_sec | |
if __name__ == "__main__": | |
# Authentication is handled here and passed to all threads, as to only login once for the session. | |
auth = InteractiveLoginAuthentication(force=True) | |
t0 = pendulum.now("utc") | |
n_threads = 100 | |
n_builds = 1000 | |
n_deployments = 2 | |
durations = marauder( | |
n_threads, n_builds, log_project_builds, auth, n_deployments=n_deployments | |
) | |
logger.info( | |
f"Total duration of {(pendulum.now('utc') - t0).total_seconds()}s for " | |
f"{n_deployments} deployments of {n_builds} builds, processed on {n_threads} " | |
"threads" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment